cb — Connecting to Bluetooth LE Peripherals

The cb (“Core Bluetooth”) module enables you –to connect to Bluetooth LE (“Low Energy”) peripherals like the TI SensorTag (a small and affordable BTLE device that contains various sensors like an IR thermometer, accelerometer, etc.).

The examples in this documentation assume that you either have a standard Bluetooth LE heartrate monitor, or a TI SensorTag, but the module also works with other types of Bluetooth LE peripherals. Note that “classic” Bluetooth (used for headsets, keyboards, etc.) is NOT supported.

The module is based on the CoreBluetooth framework, but doesn’t expose all of its functionality – for example, it only implements the central role, i.e. you can connect to peripherals but not have the device act as a peripheral itself.

Quickstart

The basic procedure for connecting to a Bluetooth LE device and reading data from it involves the following steps:

  1. Define a delegate object that is responsible for handling notifications/callbacks about discovered devices (peripherals), services, and service characteristics. The callback methods you can implement are described later on.
  2. Call scan_for_peripherals() to start scanning. In many cases, you will also need to make your device discoverable. For the TI SensorTag you would press the side button.
  3. When your delegate receives the did_discover_peripheral callback, check its name or uuid, and if you found the peripheral you want to connect to, call connect_peripheral(). You should also keep a reference to the discovered peripheral, because if the object is garbage-collected, you will also lose the connection.
  4. If everything went smoothly, the did_connect_peripheral callback will be called. At this point, you can start discovering the peripheral’s services by calling Peripheral.discover_services().
  5. When the services of the peripheral have been discovered, you will receive a did_discover_services callback. You can now access the peripheral’s services attribute, and – for the services you’re interested in – call Peripheral.discover_characteristics().
  6. As you might have guessed, this will result in a did_discover_characteristics callback. You can now read the value of a characteristic by calling Peripheral.read_characteristic_value() (which will result in another callback when the value has been read successfully), or enable notifications for the characteristic by calling Peripheral.set_notify_value().

The following code shows how to connect to a Bluetooth LE heartrate monitor. This particular example assumes that you have a Polar device, but it should be easy to adapt this for a different brand (just change the did_discover_peripheral method accordingly):

import cb
import sound
import time
import struct

class HeartRateManager (object):
    def __init__(self):
        self.peripheral = None

    def did_discover_peripheral(self, p):
        if p.name and 'Polar' in p.name and not self.peripheral:
            self.peripheral = p
            print 'Connecting to heart rate monitor...'
            cb.connect_peripheral(p)

    def did_connect_peripheral(self, p):
        print 'Connected:', p.name
        print 'Discovering services...'
        p.discover_services()

    def did_fail_to_connect_peripheral(self, p, error):
        print 'Failed to connect: %s' % (error,)

    def did_disconnect_peripheral(self, p, error):
        print 'Disconnected, error: %s' % (error,)
        self.peripheral = None

    def did_discover_services(self, p, error):
        for s in p.services:
            if s.uuid == '180D':
                print 'Discovered heart rate service, discovering characteristitcs...'
                p.discover_characteristics(s)

    def did_discover_characteristics(self, s, error):
        print 'Did discover characteristics...'
        for c in s.characteristics:
            if c.uuid == '2A37':
                self.peripheral.set_notify_value(c, True)

    def did_update_value(self, c, error):
        heart_rate = struct.unpack('<B', c.value[1])[0]
        self.values.append(heart_rate)
        print 'Heart rate: %i' % heart_rate

mngr = HeartRateManager()
cb.set_central_delegate(mngr)
print 'Scanning for peripherals...'
cb.scan_for_peripherals()

try:
    while True: pass
except KeyboardInterrupt:
    cb.reset()

The following example shows how to do similar things with a TI SensorTag. It will log the value of the temperature sensor (about once per second, very similar to the heartrate monitor example above), and play a sound when any of the two buttons on the SensorTag is pressed:

import cb
import sound
import struct

class MyCentralManagerDelegate (object):
    def __init__(self):
        self.peripheral = None

    def did_discover_peripheral(self, p):
        print '+++ Discovered peripheral: %s (%s)' % (p.name, p.uuid)
        if p.name and 'Sensor Tag' in p.name and not self.peripheral:
            # Keep a reference to the peripheral, so it doesn't get garbage-collected:
            self.peripheral = p
            cb.connect_peripheral(self.peripheral)

    def did_connect_peripheral(self, p):
        print '*** Connected: %s' % p.name
        print 'Discovering services...'
        p.discover_services()

    def did_fail_to_connect_peripheral(self, p, error):
        print 'Failed to connect'

    def did_disconnect_peripheral(self, p, error):
        print 'Disconnected, error: %s' % (error,)
        self.peripheral = None

    def did_discover_services(self, p, error):
        for s in p.services:
            if 'AA00' in s.uuid:
                print '+++ IR Thermometer found'
                p.discover_characteristics(s)
            elif 'FFE0' in s.uuid:
                print '+++ Simple Key Service found'
                p.discover_characteristics(s)

    def did_discover_characteristics(self, s, error):
        if 'AA00' in s.uuid:
            for c in s.characteristics:
                if 'AA02' in c.uuid:
                    print 'Enabling temperature sensor...'
                    self.peripheral.write_characteristic_value(c, chr(0x01), True)
                elif 'AA01' in c.uuid:
                    # Enable notification for the temperature sensor:
                    print 'Enabling temperature sensor notifications...'
                    self.peripheral.set_notify_value(c, True)
        elif 'FFE0' in s.uuid:
            print 'Enabling notifications for Simple Key Service...'
            key_characteristic = s.characteristics[0]
            self.peripheral.set_notify_value(key_characteristic, True)

    def did_write_value(self, c, error):
        # The temperature sensor has been activated (see did_discover_characteristic)
        print 'Did enable temperature sensor'

    def did_update_value(self, c, error):
        if 'FFE1' == c.uuid:
            # A button on the SensorTag was pressed (or released):
            print 'Button value: %s' % c.value.encode('hex')
            sound.play_effect('Beep')
        else:
            # The temperature sensor has sent an updated value:
            tobj, mtmpamb = self.convert_temperature(c.value)
            print 'Object temperature: %f -- Ambient: %f' % (tobj, mtmpamb)

    def convert_temperature(self, raw_data):
        # This will convert the raw sensor data to temperature values in Celsius.
        # The details of this algorithm aren't important for this example, you can
        # find more information about this in the SensorTag user guide:
        # http://processors.wiki.ti.com/index.php/SensorTag_User_Guide
        rawT = struct.unpack('<h', raw_data[:2])[0]
        tmpAmb = struct.unpack('<H', raw_data[2:])[0]
        vobj2 = float(rawT) * 0.00000015625
        mtmpamb = float(tmpAmb) / 128.0
        tdie2 = mtmpamb + 273.15
        s0, a1, a2 = 6.4E-14, 1.75E-3, -1.678E-5
        b0, b1, b2, c2 = -2.94E-5, -5.7E-7, 4.63E-9, 13.4
        Tref = 298.15
        S = s0 * (1.0+a1*(tdie2 - Tref) + a2 * pow((tdie2 - Tref), 2))
        vos = b0 + b1*(tdie2-Tref) + b2*pow((tdie2-Tref), 2)
        fobj = (vobj2 - vos) + c2 * pow((vobj2 - vos), 2)
        tobj = pow(pow(tdie2, 4) + (fobj/S), 0.25) - 273.15
        return tobj, mtmpamb

delegate = MyCentralManagerDelegate()
print 'Scanning for peripherals...'
cb.set_central_delegate(delegate)
cb.scan_for_peripherals()

# Keep the connection alive until the 'Stop' button is pressed:
try:
    while True: pass
except KeyboardInterrupt:
    # Disconnect everything:
    cb.reset()

Functions

cb.set_central_delegate(delegate)

Set the object that should receive callbacks for events like discovered peripherals, received data, etc.

The delegate can implement the following methods, all of which are optional:

class MyDelegate (object):
    def did_update_state(self):
        # State was updated (e.g. Bluetooth powered on/off)
        pass

    def did_discover_peripheral(self, p):
        # You would typically check the peripheral's name/uuid here,
        # and connect via cb.connect_peripheral(p).
        # You should also keep a reference to p (e.g. self.peripheral = p),
        # so that it doesn't get garbage-collected.
        # Note that this may get called multiple times for a single peripheral.
        pass

    def did_connect_peripheral(self, p):
        # You would typically call p.discover_services() here
        pass

    def did_fail_to_connect_peripheral(self, p, error):
        # `error` is a tuple of error code (integer) and description (string)
        pass

    def did_disconnect_peripheral(self, p, error):
        # error is a tuple of error code (integer) and description (string)
        pass

    def did_discover_services(self, p, error):
        # Here you would typically call discover_characteristics
        # for the services you're interested in.
        pass

    def did_discover_characteristics(self, s, error):
        # You can now read or write the characteristic's value
        pass

    def did_write_value(self, c, error):
        pass

    def did_update_value(self, c, error):
        # You can now access the characteristic value with c.value
        pass
cb.set_verbose(flag)

When set to True, all callback events are logged to the console, regardless of whether the delegate (see set_central_delegate()) implements them or not – this can be useful for debugging.

cb.scan_for_peripherals()

Start scanning for peripherals that are advertising services.

cb.stop_scan()

Stop scanning for peripherals.

cb.connect_peripheral(peripheral)

Establish a connection to a peripheral. This will typcially be called from your implementation of the did_discover_peripheral callback.

cb.cancel_peripheral_connection(peripheral)

Cancel an active or pending local connection to a peripheral.

cb.get_state()

Return the current state of the central manager (see Constants for possible values)

cb.reset()

Disconnect all peripherals. This also sets the central delegate to None, so you will no longer receive any callbacks.

Classes

Note

The classes in this module are not intended to be instantiated directly. Instead, you will get instances of the different classes as parameters to callback functions that your delegate implements (see set_central_delegate()).

Peripheral

class cb.Peripheral

The Peripheral class represents peripheral devices that have been discovered via scan_for_peripherals(). Peripherals are identified by universally unique identifiers (UUIDs). Peripherals may contain one or more services (represented by Service objects).

Peripheral Methods

Peripheral.discover_services()

Discover the services of the peripheral. The central delegate will receive a did_discover_services callback when the discovery has finished.

Peripheral.discover_characteristics(service)

Discover the characteristics for the given Service. This will typically be called from the did_discover_services callback. When the characteristics have been discovered, the central delegate will receive a did_discover_characteristics callback.

Peripheral.set_notify_value(characteristic, flag=True)

Enable or disable notifications for the given Characteristic (note: not all characteristics support notifications). When notifications are enabled, you may receive did_update_value callbacks in your central delegate (see set_central_delegate()).

Peripheral.write_characteristic_value(characteristic, data, with_response)

Write the value of a Characteristic. data should be a byte string. with_response determines whether the did_write_value callback will be called when the write has completed (or failed). Note that not all characteristics support writing values without response.

Peripheral.read_characteristic_value(characteristic)

Retrieve the value of the given Characteristic. When the value has been read, your central delegate will receive a did_update_value callback, at which point you can access the value with the characteristics’s value attribute. Not all characteristics are guaranteed to have a readable value.

Peripheral Attributes

Peripheral.name

The peripheral’s name (either a string or None)

Peripheral.uuid

The peripheral’s unique identifier (a hex string)

Peripheral.state

The peripheral connection state (0 = disconnected, 1 = connecting, 2 = connected)

Peripheral.services

The peripheral’s discovered services (a list of Service objects). Note that this will usually be empty until you have successfully called Peripheral.discover_services() (and received the did_discover_services callback).

Service

class cb.Service

Service objects represent a Peripheral’s service – a collection of data and associated behaviors for accomplishing a function or feature of a device (or portions of that device). Services are either primary or secondary and may contain a number of :class:`Characteristic`s.

Service Attributes

Service.characteristics

The service’s characteristics (a list of Characteristic objects which may be empty if the characteristics haven’t been discovered yet)

Service.primary

A boolean flag that indicates whether the service is primary or secondary

Service.uuid

The service’s unique identifier (a hex string)

Characteristic

class cb.Characteristic

Characteristic objects represent further information about a peripheral’s service. A characteristic contains a single value. The properties of a characteristic determine how the value of the characteristic can be used.

Characteristic Attributes

Characteristic.uuid

The characteristic’s unique identifier (a hex string).

Characteristic.value

The characteristic’s current value (either a byte string, or None if the value hasn’t been read yet or there is no value)

Characteristic.notifying

A flag that determines whether notifications are enabled for this characteristic (read-only, set via Peripheral.set_notify_value())

Characteristic.properties

A bitmask of the characteristic’s properties (see CH_PROP_* constants below)

Constants

Possible return values for get_state():

cb.CM_STATE_UNKNOWN
cb.CM_STATE_RESETTING
cb.CM_STATE_UNSUPPORTED
cb.CM_STATE_UNAUTHORIZED
cb.CM_STATE_POWERED_OFF
cb.CM_STATE_POWERED_ON

Characteristic properties (note: these values can be combined as a bitmask):

cb.CH_PROP_BROADCAST
cb.CH_PROP_READ
cb.CH_PROP_WRITE_WITHOUT_RESPONSE
cb.CH_PROP_WRITE
cb.CH_PROP_NOTIFY
cb.CH_PROP_INDICATE
cb.CH_PROP_AUTHENTICATED_SIGNED_WRITES
cb.CH_PROP_EXTENDED_PROPERTIES
cb.CH_PROP_NOTIFY_ENCRYPTION_REQUIRED
cb.CH_PROP_INDICATE_ENCRYPTION_REQUIRED