cb
— Connecting to Bluetooth LE Peripherals#
The cb
(“Core Bluetooth”) module enables you –to connect to Bluetooth LE (“Low Energy”) peripherals like the TI SensorTag (a small and affordable BTLE device that contains various sensors like an IR thermometer, accelerometer, etc.).
The examples in this documentation assume that you either have a standard Bluetooth LE heartrate monitor, or a TI SensorTag, but the module also works with other types of Bluetooth LE peripherals. Note that “classic” Bluetooth (used for headsets, keyboards, etc.) is NOT supported.
The module is based on the CoreBluetooth framework, but doesn’t expose all of its functionality – for example, it only implements the central role, i.e. you can connect to peripherals but not have the device act as a peripheral itself.
Quickstart#
The basic procedure for connecting to a Bluetooth LE device and reading data from it involves the following steps:
Define a delegate object that is responsible for handling notifications/callbacks about discovered devices (peripherals), services, and service characteristics. The callback methods you can implement are described later on.
Call
scan_for_peripherals()
to start scanning. In many cases, you will also need to make your device discoverable. For the TI SensorTag you would press the side button.When your delegate receives the
did_discover_peripheral
callback, check its name or uuid, and if you found the peripheral you want to connect to, callconnect_peripheral()
. You should also keep a reference to the discovered peripheral, because if the object is garbage-collected, you will also lose the connection.If everything went smoothly, the
did_connect_peripheral
callback will be called. At this point, you can start discovering the peripheral’s services by callingPeripheral.discover_services()
.When the services of the peripheral have been discovered, you will receive a
did_discover_services
callback. You can now access the peripheral’sservices
attribute, and – for the services you’re interested in – callPeripheral.discover_characteristics()
.As you might have guessed, this will result in a
did_discover_characteristics
callback. You can now read the value of a characteristic by callingPeripheral.read_characteristic_value()
(which will result in another callback when the value has been read successfully), or enable notifications for the characteristic by callingPeripheral.set_notify_value()
.
The following code shows how to connect to a Bluetooth LE heartrate monitor. This particular example assumes that you have a Polar device, but it should be easy to adapt this for a different brand (just change the did_discover_peripheral
method accordingly):
import cb
import sound
import time
import struct
from __future__ import print_function
class HeartRateManager (object):
def __init__(self):
self.peripheral = None
def did_discover_peripheral(self, p):
if p.name and 'Polar' in p.name and not self.peripheral:
self.peripheral = p
print('Connecting to heart rate monitor...')
cb.connect_peripheral(p)
def did_connect_peripheral(self, p):
print('Connected:', p.name)
print('Discovering services...')
p.discover_services()
def did_fail_to_connect_peripheral(self, p, error):
print('Failed to connect: %s' % (error,))
def did_disconnect_peripheral(self, p, error):
print('Disconnected, error: %s' % (error,))
self.peripheral = None
def did_discover_services(self, p, error):
for s in p.services:
if s.uuid == '180D':
print('Discovered heart rate service, discovering characteristitcs...')
p.discover_characteristics(s)
def did_discover_characteristics(self, s, error):
print('Did discover characteristics...')
for c in s.characteristics:
if c.uuid == '2A37':
self.peripheral.set_notify_value(c, True)
def did_update_value(self, c, error):
heart_rate = struct.unpack('<B', c.value[1])[0]
self.values.append(heart_rate)
print('Heart rate: %i' % heart_rate)
mngr = HeartRateManager()
cb.set_central_delegate(mngr)
print('Scanning for peripherals...')
cb.scan_for_peripherals()
try:
while True: pass
except KeyboardInterrupt:
cb.reset()
The following example shows how to do similar things with a TI SensorTag. It will log the value of the temperature sensor (about once per second, very similar to the heartrate monitor example above), and play a sound when any of the two buttons on the SensorTag is pressed:
import cb
import sound
import struct
class MyCentralManagerDelegate (object):
def __init__(self):
self.peripheral = None
def did_discover_peripheral(self, p):
print('+++ Discovered peripheral: %s (%s)' % (p.name, p.uuid))
if p.name and 'Sensor Tag' in p.name and not self.peripheral:
# Keep a reference to the peripheral, so it doesn't get garbage-collected:
self.peripheral = p
cb.connect_peripheral(self.peripheral)
def did_connect_peripheral(self, p):
print('*** Connected: %s' % p.name)
print('Discovering services...')
p.discover_services()
def did_fail_to_connect_peripheral(self, p, error):
print('Failed to connect')
def did_disconnect_peripheral(self, p, error):
print('Disconnected, error: %s' % (error,))
self.peripheral = None
def did_discover_services(self, p, error):
for s in p.services:
if 'AA00' in s.uuid:
print('+++ IR Thermometer found')
p.discover_characteristics(s)
elif 'FFE0' in s.uuid:
print('+++ Simple Key Service found')
p.discover_characteristics(s)
def did_discover_characteristics(self, s, error):
if 'AA00' in s.uuid:
for c in s.characteristics:
if 'AA02' in c.uuid:
print('Enabling temperature sensor...')
self.peripheral.write_characteristic_value(c, chr(0x01), True)
elif 'AA01' in c.uuid:
# Enable notification for the temperature sensor:
print('Enabling temperature sensor notifications...')
self.peripheral.set_notify_value(c, True)
elif 'FFE0' in s.uuid:
print('Enabling notifications for Simple Key Service...')
key_characteristic = s.characteristics[0]
self.peripheral.set_notify_value(key_characteristic, True)
def did_write_value(self, c, error):
# The temperature sensor has been activated (see did_discover_characteristic)
print('Did enable temperature sensor')
def did_update_value(self, c, error):
if 'FFE1' == c.uuid:
# A button on the SensorTag was pressed (or released):
print('Button value: %s' % c.value.encode('hex'))
sound.play_effect('Beep')
else:
# The temperature sensor has sent an updated value:
tobj, mtmpamb = self.convert_temperature(c.value)
print('Object temperature: %f -- Ambient: %f' % (tobj, mtmpamb))
def convert_temperature(self, raw_data):
# This will convert the raw sensor data to temperature values in Celsius.
# The details of this algorithm aren't important for this example, you can
# find more information about this in the SensorTag user guide:
# http://processors.wiki.ti.com/index.php/SensorTag_User_Guide
rawT = struct.unpack('<h', raw_data[:2])[0]
tmpAmb = struct.unpack('<H', raw_data[2:])[0]
vobj2 = float(rawT) * 0.00000015625
mtmpamb = float(tmpAmb) / 128.0
tdie2 = mtmpamb + 273.15
s0, a1, a2 = 6.4E-14, 1.75E-3, -1.678E-5
b0, b1, b2, c2 = -2.94E-5, -5.7E-7, 4.63E-9, 13.4
Tref = 298.15
S = s0 * (1.0+a1*(tdie2 - Tref) + a2 * pow((tdie2 - Tref), 2))
vos = b0 + b1*(tdie2-Tref) + b2*pow((tdie2-Tref), 2)
fobj = (vobj2 - vos) + c2 * pow((vobj2 - vos), 2)
tobj = pow(pow(tdie2, 4) + (fobj/S), 0.25) - 273.15
return tobj, mtmpamb
delegate = MyCentralManagerDelegate()
print('Scanning for peripherals...')
cb.set_central_delegate(delegate)
cb.scan_for_peripherals()
# Keep the connection alive until the 'Stop' button is pressed:
try:
while True: pass
except KeyboardInterrupt:
# Disconnect everything:
cb.reset()
Functions#
- cb.set_central_delegate(delegate)#
Set the object that should receive callbacks for events like discovered peripherals, received data, etc.
The delegate can implement the following methods, all of which are optional:
class MyDelegate (object): def did_update_state(self): # State was updated (e.g. Bluetooth powered on/off) pass def did_discover_peripheral(self, p): # You would typically check the peripheral's name/uuid here, # and connect via cb.connect_peripheral(p). # You should also keep a reference to p (e.g. self.peripheral = p), # so that it doesn't get garbage-collected. # Note that this may get called multiple times for a single peripheral. pass def did_connect_peripheral(self, p): # You would typically call p.discover_services() here pass def did_fail_to_connect_peripheral(self, p, error): # `error` is a tuple of error code (integer) and description (string) pass def did_disconnect_peripheral(self, p, error): # error is a tuple of error code (integer) and description (string) pass def did_discover_services(self, p, error): # Here you would typically call discover_characteristics # for the services you're interested in. pass def did_discover_characteristics(self, s, error): # You can now read or write the characteristic's value pass def did_write_value(self, c, error): pass def did_update_value(self, c, error): # You can now access the characteristic value with c.value pass
- cb.set_verbose(flag)#
When set to True, all callback events are logged to the console, regardless of whether the delegate (see
set_central_delegate()
) implements them or not – this can be useful for debugging.
- cb.scan_for_peripherals()#
Start scanning for peripherals that are advertising services.
- cb.stop_scan()#
Stop scanning for peripherals.
- cb.connect_peripheral(peripheral)#
Establish a connection to a peripheral. This will typcially be called from your implementation of the
did_discover_peripheral
callback.
- cb.cancel_peripheral_connection(peripheral)#
Cancel an active or pending local connection to a peripheral.
- cb.reset()#
Disconnect all peripherals. This also sets the central delegate to None, so you will no longer receive any callbacks.
Classes#
Note
The classes in this module are not intended to be instantiated directly. Instead, you will get instances of the different classes as parameters to callback functions that your delegate implements (see set_central_delegate()
).
Peripheral#
- class cb.Peripheral#
The
Peripheral
class represents peripheral devices that have been discovered viascan_for_peripherals()
. Peripherals are identified by universally unique identifiers (UUIDs). Peripherals may contain one or more services (represented byService
objects).
Peripheral Methods#
- Peripheral.discover_services()#
Discover the services of the peripheral. The central delegate will receive a
did_discover_services
callback when the discovery has finished.
- Peripheral.discover_characteristics(service)#
Discover the characteristics for the given
Service
. This will typically be called from thedid_discover_services
callback. When the characteristics have been discovered, the central delegate will receive adid_discover_characteristics
callback.
- Peripheral.set_notify_value(characteristic, flag=True)#
Enable or disable notifications for the given
Characteristic
(note: not all characteristics support notifications). When notifications are enabled, you may receivedid_update_value
callbacks in your central delegate (seeset_central_delegate()
).
- Peripheral.write_characteristic_value(characteristic, data, with_response)#
Write the value of a
Characteristic
. data should be a byte string. with_response determines whether thedid_write_value
callback will be called when the write has completed (or failed). Note that not all characteristics support writing values without response.
- Peripheral.read_characteristic_value(characteristic)#
Retrieve the value of the given
Characteristic
. When the value has been read, your central delegate will receive adid_update_value
callback, at which point you can access the value with the characteristics’svalue
attribute. Not all characteristics are guaranteed to have a readable value.
Peripheral Attributes#
- Peripheral.manufacturer_data#
Manufacturer-specific data that is part of the peripheral’s advertising data. You can read this without connecting to the peripheral (typically in the
did_discover_peripheral
callback).
- Peripheral.name#
The peripheral’s name (either a string or None)
- Peripheral.uuid#
The peripheral’s unique identifier (a hex string)
- Peripheral.state#
The peripheral connection state (0 = disconnected, 1 = connecting, 2 = connected)
- Peripheral.services#
The peripheral’s discovered services (a list of
Service
objects). Note that this will usually be empty until you have successfully calledPeripheral.discover_services()
(and received thedid_discover_services
callback).
Service#
- class cb.Service#
Service
objects represent aPeripheral
’s service – a collection of data and associated behaviors for accomplishing a function or feature of a device (or portions of that device). Services are either primary or secondary and may contain a number of :class:`Characteristic`s.
Service Attributes#
- Service.characteristics#
The service’s characteristics (a list of
Characteristic
objects which may be empty if the characteristics haven’t been discovered yet)
- Service.primary#
A boolean flag that indicates whether the service is primary or secondary
- Service.uuid#
The service’s unique identifier (a hex string)
Characteristic#
- class cb.Characteristic#
Characteristic
objects represent further information about a peripheral’s service. A characteristic contains a single value. The properties of a characteristic determine how the value of the characteristic can be used.
Characteristic Attributes#
- Characteristic.uuid#
The characteristic’s unique identifier (a hex string).
- Characteristic.value#
The characteristic’s current value (either a byte string, or None if the value hasn’t been read yet or there is no value)
- Characteristic.notifying#
A flag that determines whether notifications are enabled for this characteristic (read-only, set via
Peripheral.set_notify_value()
)
- Characteristic.properties#
A bitmask of the characteristic’s properties (see
CH_PROP_*
constants below)
Constants#
Possible return values for get_state()
:
- cb.CM_STATE_UNKNOWN#
- cb.CM_STATE_RESETTING#
- cb.CM_STATE_UNSUPPORTED#
- cb.CM_STATE_UNAUTHORIZED#
- cb.CM_STATE_POWERED_OFF#
- cb.CM_STATE_POWERED_ON#
Characteristic
properties (note: these values can be combined as a bitmask):
- cb.CH_PROP_BROADCAST#
- cb.CH_PROP_READ#
- cb.CH_PROP_WRITE_WITHOUT_RESPONSE#
- cb.CH_PROP_WRITE#
- cb.CH_PROP_NOTIFY#
- cb.CH_PROP_INDICATE#
- cb.CH_PROP_AUTHENTICATED_SIGNED_WRITES#
- cb.CH_PROP_EXTENDED_PROPERTIES#
- cb.CH_PROP_NOTIFY_ENCRYPTION_REQUIRED#
- cb.CH_PROP_INDICATE_ENCRYPTION_REQUIRED#