The cb (“Core Bluetooth”) module enables you –to connect to Bluetooth LE (“Low Energy”) peripherals like the TI SensorTag (a small and affordable BTLE device that contains various sensors like an IR thermometer, accelerometer, etc.).
The examples in this documentation assume that you either have a standard Bluetooth LE heartrate monitor, or a TI SensorTag, but the module also works with other types of Bluetooth LE peripherals. Note that “classic” Bluetooth (used for headsets, keyboards, etc.) is NOT supported.
The module is based on the CoreBluetooth framework, but doesn’t expose all of its functionality – for example, it only implements the central role, i.e. you can connect to peripherals but not have the device act as a peripheral itself.
The basic procedure for connecting to a Bluetooth LE device and reading data from it involves the following steps:
The following code shows how to connect to a Bluetooth LE heartrate monitor. This particular example assumes that you have a Polar device, but it should be easy to adapt this for a different brand (just change the did_discover_peripheral method accordingly):
import cb import sound import time import struct class HeartRateManager (object): def __init__(self): self.peripheral = None def did_discover_peripheral(self, p): if p.name and 'Polar' in p.name and not self.peripheral: self.peripheral = p print('Connecting to heart rate monitor...') cb.connect_peripheral(p) def did_connect_peripheral(self, p): print('Connected:', p.name) print('Discovering services...') p.discover_services() def did_fail_to_connect_peripheral(self, p, error): print('Failed to connect: %s' % (error,)) def did_disconnect_peripheral(self, p, error): print('Disconnected, error: %s' % (error,)) self.peripheral = None def did_discover_services(self, p, error): for s in p.services: if s.uuid == '180D': print('Discovered heart rate service, discovering characteristitcs...') p.discover_characteristics(s) def did_discover_characteristics(self, s, error): print('Did discover characteristics...') for c in s.characteristics: if c.uuid == '2A37': self.peripheral.set_notify_value(c, True) def did_update_value(self, c, error): heart_rate = struct.unpack('<B', c.value) self.values.append(heart_rate) print('Heart rate: %i' % heart_rate) mngr = HeartRateManager() cb.set_central_delegate(mngr) print('Scanning for peripherals...') cb.scan_for_peripherals() try: while True: pass except KeyboardInterrupt: cb.reset()
The following example shows how to do similar things with a TI SensorTag. It will log the value of the temperature sensor (about once per second, very similar to the heartrate monitor example above), and play a sound when any of the two buttons on the SensorTag is pressed:
import cb import sound import struct class MyCentralManagerDelegate (object): def __init__(self): self.peripheral = None def did_discover_peripheral(self, p): print('+++ Discovered peripheral: %s (%s)' % (p.name, p.uuid)) if p.name and 'Sensor Tag' in p.name and not self.peripheral: # Keep a reference to the peripheral, so it doesn't get garbage-collected: self.peripheral = p cb.connect_peripheral(self.peripheral) def did_connect_peripheral(self, p): print('*** Connected: %s' % p.name) print('Discovering services...') p.discover_services() def did_fail_to_connect_peripheral(self, p, error): print('Failed to connect') def did_disconnect_peripheral(self, p, error): print('Disconnected, error: %s' % (error,)) self.peripheral = None def did_discover_services(self, p, error): for s in p.services: if 'AA00' in s.uuid: print('+++ IR Thermometer found') p.discover_characteristics(s) elif 'FFE0' in s.uuid: print('+++ Simple Key Service found') p.discover_characteristics(s) def did_discover_characteristics(self, s, error): if 'AA00' in s.uuid: for c in s.characteristics: if 'AA02' in c.uuid: print('Enabling temperature sensor...') self.peripheral.write_characteristic_value(c, chr(0x01), True) elif 'AA01' in c.uuid: # Enable notification for the temperature sensor: print('Enabling temperature sensor notifications...') self.peripheral.set_notify_value(c, True) elif 'FFE0' in s.uuid: print('Enabling notifications for Simple Key Service...') key_characteristic = s.characteristics self.peripheral.set_notify_value(key_characteristic, True) def did_write_value(self, c, error): # The temperature sensor has been activated (see did_discover_characteristic) print('Did enable temperature sensor') def did_update_value(self, c, error): if 'FFE1' == c.uuid: # A button on the SensorTag was pressed (or released): print('Button value: %s' % c.value.encode('hex')) sound.play_effect('Beep') else: # The temperature sensor has sent an updated value: tobj, mtmpamb = self.convert_temperature(c.value) print('Object temperature: %f -- Ambient: %f' % (tobj, mtmpamb)) def convert_temperature(self, raw_data): # This will convert the raw sensor data to temperature values in Celsius. # The details of this algorithm aren't important for this example, you can # find more information about this in the SensorTag user guide: # http://processors.wiki.ti.com/index.php/SensorTag_User_Guide rawT = struct.unpack('<h', raw_data[:2]) tmpAmb = struct.unpack('<H', raw_data[2:]) vobj2 = float(rawT) * 0.00000015625 mtmpamb = float(tmpAmb) / 128.0 tdie2 = mtmpamb + 273.15 s0, a1, a2 = 6.4E-14, 1.75E-3, -1.678E-5 b0, b1, b2, c2 = -2.94E-5, -5.7E-7, 4.63E-9, 13.4 Tref = 298.15 S = s0 * (1.0+a1*(tdie2 - Tref) + a2 * pow((tdie2 - Tref), 2)) vos = b0 + b1*(tdie2-Tref) + b2*pow((tdie2-Tref), 2) fobj = (vobj2 - vos) + c2 * pow((vobj2 - vos), 2) tobj = pow(pow(tdie2, 4) + (fobj/S), 0.25) - 273.15 return tobj, mtmpamb delegate = MyCentralManagerDelegate() print('Scanning for peripherals...') cb.set_central_delegate(delegate) cb.scan_for_peripherals() # Keep the connection alive until the 'Stop' button is pressed: try: while True: pass except KeyboardInterrupt: # Disconnect everything: cb.reset()
Set the object that should receive callbacks for events like discovered peripherals, received data, etc.
The delegate can implement the following methods, all of which are optional:
class MyDelegate (object): def did_update_state(self): # State was updated (e.g. Bluetooth powered on/off) pass def did_discover_peripheral(self, p): # You would typically check the peripheral's name/uuid here, # and connect via cb.connect_peripheral(p). # You should also keep a reference to p (e.g. self.peripheral = p), # so that it doesn't get garbage-collected. # Note that this may get called multiple times for a single peripheral. pass def did_connect_peripheral(self, p): # You would typically call p.discover_services() here pass def did_fail_to_connect_peripheral(self, p, error): # `error` is a tuple of error code (integer) and description (string) pass def did_disconnect_peripheral(self, p, error): # error is a tuple of error code (integer) and description (string) pass def did_discover_services(self, p, error): # Here you would typically call discover_characteristics # for the services you're interested in. pass def did_discover_characteristics(self, s, error): # You can now read or write the characteristic's value pass def did_write_value(self, c, error): pass def did_update_value(self, c, error): # You can now access the characteristic value with c.value pass
When set to True, all callback events are logged to the console, regardless of whether the delegate (see set_central_delegate()) implements them or not – this can be useful for debugging.
Start scanning for peripherals that are advertising services.
Stop scanning for peripherals.
Establish a connection to a peripheral. This will typcially be called from your implementation of the did_discover_peripheral callback.
Cancel an active or pending local connection to a peripheral.
Disconnect all peripherals. This also sets the central delegate to None, so you will no longer receive any callbacks.
The classes in this module are not intended to be instantiated directly. Instead, you will get instances of the different classes as parameters to callback functions that your delegate implements (see set_central_delegate()).
Discover the services of the peripheral. The central delegate will receive a did_discover_services callback when the discovery has finished.
Discover the characteristics for the given Service. This will typically be called from the did_discover_services callback. When the characteristics have been discovered, the central delegate will receive a did_discover_characteristics callback.
Enable or disable notifications for the given Characteristic (note: not all characteristics support notifications). When notifications are enabled, you may receive did_update_value callbacks in your central delegate (see set_central_delegate()).
Write the value of a Characteristic. data should be a byte string. with_response determines whether the did_write_value callback will be called when the write has completed (or failed). Note that not all characteristics support writing values without response.
Retrieve the value of the given Characteristic. When the value has been read, your central delegate will receive a did_update_value callback, at which point you can access the value with the characteristics’s value attribute. Not all characteristics are guaranteed to have a readable value.
The peripheral’s name (either a string or None)
The peripheral’s unique identifier (a hex string)
The peripheral connection state (0 = disconnected, 1 = connecting, 2 = connected)
The service’s characteristics (a list of Characteristic objects which may be empty if the characteristics haven’t been discovered yet)
A boolean flag that indicates whether the service is primary or secondary
The service’s unique identifier (a hex string)
The characteristic’s unique identifier (a hex string).
The characteristic’s current value (either a byte string, or None if the value hasn’t been read yet or there is no value)
A flag that determines whether notifications are enabled for this characteristic (read-only, set via Peripheral.set_notify_value())
A bitmask of the characteristic’s properties (see CH_PROP_* constants below)
Possible return values for get_state():
Characteristic properties (note: these values can be combined as a bitmask):