cb (“Core Bluetooth”) module enables you –to connect to Bluetooth LE (“Low Energy”) peripherals like the TI SensorTag (a small and affordable BTLE device that contains various sensors like an IR thermometer, accelerometer, etc.).
The examples in this documentation assume that you either have a standard Bluetooth LE heartrate monitor, or a TI SensorTag, but the module also works with other types of Bluetooth LE peripherals. Note that “classic” Bluetooth (used for headsets, keyboards, etc.) is NOT supported.
The module is based on the CoreBluetooth framework, but doesn’t expose all of its functionality – for example, it only implements the central role, i.e. you can connect to peripherals but not have the device act as a peripheral itself.
The basic procedure for connecting to a Bluetooth LE device and reading data from it involves the following steps:
Define a delegate object that is responsible for handling notifications/callbacks about discovered devices (peripherals), services, and service characteristics. The callback methods you can implement are described later on.
scan_for_peripherals()to start scanning. In many cases, you will also need to make your device discoverable. For the TI SensorTag you would press the side button.
When your delegate receives the
did_discover_peripheralcallback, check its name or uuid, and if you found the peripheral you want to connect to, call
connect_peripheral(). You should also keep a reference to the discovered peripheral, because if the object is garbage-collected, you will also lose the connection.
If everything went smoothly, the
did_connect_peripheralcallback will be called. At this point, you can start discovering the peripheral’s services by calling
When the services of the peripheral have been discovered, you will receive a
did_discover_servicescallback. You can now access the peripheral’s
servicesattribute, and – for the services you’re interested in – call
As you might have guessed, this will result in a
did_discover_characteristicscallback. You can now read the value of a characteristic by calling
Peripheral.read_characteristic_value()(which will result in another callback when the value has been read successfully), or enable notifications for the characteristic by calling
The following code shows how to connect to a Bluetooth LE heartrate monitor. This particular example assumes that you have a Polar device, but it should be easy to adapt this for a different brand (just change the
did_discover_peripheral method accordingly):
import cb import sound import time import struct from __future__ import print_function class HeartRateManager (object): def __init__(self): self.peripheral = None def did_discover_peripheral(self, p): if p.name and 'Polar' in p.name and not self.peripheral: self.peripheral = p print('Connecting to heart rate monitor...') cb.connect_peripheral(p) def did_connect_peripheral(self, p): print('Connected:', p.name) print('Discovering services...') p.discover_services() def did_fail_to_connect_peripheral(self, p, error): print('Failed to connect: %s' % (error,)) def did_disconnect_peripheral(self, p, error): print('Disconnected, error: %s' % (error,)) self.peripheral = None def did_discover_services(self, p, error): for s in p.services: if s.uuid == '180D': print('Discovered heart rate service, discovering characteristitcs...') p.discover_characteristics(s) def did_discover_characteristics(self, s, error): print('Did discover characteristics...') for c in s.characteristics: if c.uuid == '2A37': self.peripheral.set_notify_value(c, True) def did_update_value(self, c, error): heart_rate = struct.unpack('<B', c.value) self.values.append(heart_rate) print('Heart rate: %i' % heart_rate) mngr = HeartRateManager() cb.set_central_delegate(mngr) print('Scanning for peripherals...') cb.scan_for_peripherals() try: while True: pass except KeyboardInterrupt: cb.reset()
The following example shows how to do similar things with a TI SensorTag. It will log the value of the temperature sensor (about once per second, very similar to the heartrate monitor example above), and play a sound when any of the two buttons on the SensorTag is pressed:
import cb import sound import struct class MyCentralManagerDelegate (object): def __init__(self): self.peripheral = None def did_discover_peripheral(self, p): print('+++ Discovered peripheral: %s (%s)' % (p.name, p.uuid)) if p.name and 'Sensor Tag' in p.name and not self.peripheral: # Keep a reference to the peripheral, so it doesn't get garbage-collected: self.peripheral = p cb.connect_peripheral(self.peripheral) def did_connect_peripheral(self, p): print('*** Connected: %s' % p.name) print('Discovering services...') p.discover_services() def did_fail_to_connect_peripheral(self, p, error): print('Failed to connect') def did_disconnect_peripheral(self, p, error): print('Disconnected, error: %s' % (error,)) self.peripheral = None def did_discover_services(self, p, error): for s in p.services: if 'AA00' in s.uuid: print('+++ IR Thermometer found') p.discover_characteristics(s) elif 'FFE0' in s.uuid: print('+++ Simple Key Service found') p.discover_characteristics(s) def did_discover_characteristics(self, s, error): if 'AA00' in s.uuid: for c in s.characteristics: if 'AA02' in c.uuid: print('Enabling temperature sensor...') self.peripheral.write_characteristic_value(c, chr(0x01), True) elif 'AA01' in c.uuid: # Enable notification for the temperature sensor: print('Enabling temperature sensor notifications...') self.peripheral.set_notify_value(c, True) elif 'FFE0' in s.uuid: print('Enabling notifications for Simple Key Service...') key_characteristic = s.characteristics self.peripheral.set_notify_value(key_characteristic, True) def did_write_value(self, c, error): # The temperature sensor has been activated (see did_discover_characteristic) print('Did enable temperature sensor') def did_update_value(self, c, error): if 'FFE1' == c.uuid: # A button on the SensorTag was pressed (or released): print('Button value: %s' % c.value.encode('hex')) sound.play_effect('Beep') else: # The temperature sensor has sent an updated value: tobj, mtmpamb = self.convert_temperature(c.value) print('Object temperature: %f -- Ambient: %f' % (tobj, mtmpamb)) def convert_temperature(self, raw_data): # This will convert the raw sensor data to temperature values in Celsius. # The details of this algorithm aren't important for this example, you can # find more information about this in the SensorTag user guide: # http://processors.wiki.ti.com/index.php/SensorTag_User_Guide rawT = struct.unpack('<h', raw_data[:2]) tmpAmb = struct.unpack('<H', raw_data[2:]) vobj2 = float(rawT) * 0.00000015625 mtmpamb = float(tmpAmb) / 128.0 tdie2 = mtmpamb + 273.15 s0, a1, a2 = 6.4E-14, 1.75E-3, -1.678E-5 b0, b1, b2, c2 = -2.94E-5, -5.7E-7, 4.63E-9, 13.4 Tref = 298.15 S = s0 * (1.0+a1*(tdie2 - Tref) + a2 * pow((tdie2 - Tref), 2)) vos = b0 + b1*(tdie2-Tref) + b2*pow((tdie2-Tref), 2) fobj = (vobj2 - vos) + c2 * pow((vobj2 - vos), 2) tobj = pow(pow(tdie2, 4) + (fobj/S), 0.25) - 273.15 return tobj, mtmpamb delegate = MyCentralManagerDelegate() print('Scanning for peripherals...') cb.set_central_delegate(delegate) cb.scan_for_peripherals() # Keep the connection alive until the 'Stop' button is pressed: try: while True: pass except KeyboardInterrupt: # Disconnect everything: cb.reset()
Set the object that should receive callbacks for events like discovered peripherals, received data, etc.
The delegate can implement the following methods, all of which are optional:
class MyDelegate (object): def did_update_state(self): # State was updated (e.g. Bluetooth powered on/off) pass def did_discover_peripheral(self, p): # You would typically check the peripheral's name/uuid here, # and connect via cb.connect_peripheral(p). # You should also keep a reference to p (e.g. self.peripheral = p), # so that it doesn't get garbage-collected. # Note that this may get called multiple times for a single peripheral. pass def did_connect_peripheral(self, p): # You would typically call p.discover_services() here pass def did_fail_to_connect_peripheral(self, p, error): # `error` is a tuple of error code (integer) and description (string) pass def did_disconnect_peripheral(self, p, error): # error is a tuple of error code (integer) and description (string) pass def did_discover_services(self, p, error): # Here you would typically call discover_characteristics # for the services you're interested in. pass def did_discover_characteristics(self, s, error): # You can now read or write the characteristic's value pass def did_write_value(self, c, error): pass def did_update_value(self, c, error): # You can now access the characteristic value with c.value pass
When set to True, all callback events are logged to the console, regardless of whether the delegate (see
set_central_delegate()) implements them or not – this can be useful for debugging.
Start scanning for peripherals that are advertising services.
Stop scanning for peripherals.
Establish a connection to a peripheral. This will typcially be called from your implementation of the
Cancel an active or pending local connection to a peripheral.
Disconnect all peripherals. This also sets the central delegate to None, so you will no longer receive any callbacks.
The classes in this module are not intended to be instantiated directly. Instead, you will get instances of the different classes as parameters to callback functions that your delegate implements (see
Discover the services of the peripheral. The central delegate will receive a
did_discover_servicescallback when the discovery has finished.
Discover the characteristics for the given
Service. This will typically be called from the
did_discover_servicescallback. When the characteristics have been discovered, the central delegate will receive a
- Peripheral.set_notify_value(characteristic, flag=True)#
Enable or disable notifications for the given
Characteristic(note: not all characteristics support notifications). When notifications are enabled, you may receive
did_update_valuecallbacks in your central delegate (see
- Peripheral.write_characteristic_value(characteristic, data, with_response)#
Write the value of a
Characteristic. data should be a byte string. with_response determines whether the
did_write_valuecallback will be called when the write has completed (or failed). Note that not all characteristics support writing values without response.
Retrieve the value of the given
Characteristic. When the value has been read, your central delegate will receive a
did_update_valuecallback, at which point you can access the value with the characteristics’s
valueattribute. Not all characteristics are guaranteed to have a readable value.
Manufacturer-specific data that is part of the peripheral’s advertising data. You can read this without connecting to the peripheral (typically in the
The peripheral’s name (either a string or None)
The peripheral’s unique identifier (a hex string)
The peripheral connection state (0 = disconnected, 1 = connecting, 2 = connected)
The service’s characteristics (a list of
Characteristicobjects which may be empty if the characteristics haven’t been discovered yet)
A boolean flag that indicates whether the service is primary or secondary
The service’s unique identifier (a hex string)
The characteristic’s unique identifier (a hex string).
The characteristic’s current value (either a byte string, or None if the value hasn’t been read yet or there is no value)
A flag that determines whether notifications are enabled for this characteristic (read-only, set via
A bitmask of the characteristic’s properties (see
Possible return values for
Characteristic properties (note: these values can be combined as a bitmask):