1. Introduction
This document provides a brief introduction to the use of Python functions to enumerate, connect and control a Muse v3 device over Bluetooth Low Energy (BLE). BLE is a wireless communication protocol that uses Bluetooth with low-power peripheral devices.
All the examples work with the API and the Python packages, available at the following link:
- Muse API: https://gitlab.com/221e-Repositories/muse/python/muse_api
- Bleak (version 0.21.1): https://bleak.readthedocs.io/en/latest/
- Frozendict (version 2.3.4): https://pypi.org/project/frozendict/2.3.4/
2. Function
To connect to peripheral devices that support BLE, your computer needs to have a built-in or external Bluetooth 4.0 adapter or higher. After you connect to a device, use Python to read from or write data to it. You can work with both standard and custom services, characteristics, and descriptors.
2.1 Device Enumeration
The BleakScanner.discover function, included into the bleak package, allows you to scan for nearby Bluetooth Low Energy peripheral devices and look at the device advertisement data.
from bleak import BleakScanner, BleakClient import asyncio async def main(): # Device enumeration devices = await BleakScanner.discover(timeout=5.0, return_adv=True) for obj in devices: d = devices[obj][0] # BLEDevice adv = devices[obj][1] # AdvertisementData if d.name == None: print(d) else: print(d,adv) asyncio.run(main())
1D:5C:62:F6:79:45: None 52:49:E8:BD:33:97: None 10:6D:91:A1:55:27: None 2F:96:62:FC:24:C0: None 00:80:E1:26:D4:8A: muse_v3 AdvertisementData(local_name='muse_v3', tx_power=4, rssi=-58) 3B:03:E5:34:BB:DF: None 2C:96:89:FD:12:00: None 69:A9:71:E2:70:C5: None 03:18:7B:B6:D1:FE: None
If you prefer to scan and show only BLE address, you can simplify the code as follows:
from bleak import BleakScanner, BleakClient import asyncio async def main(): # Device enumeration devices = await BleakScanner.discover() for d in devices: print(d) asyncio.run(main())
In this case, the devices are listed in list form, while in the previous case the devices were listed in dictionary form.
2.2 Device Connection
The BleakClient class allows to connect to a selected BLE device, by specifying its address. You can also list all services, as described below:
async def main(): # Device name to be searched for my_device_name = 'muse_v3' # Device Enumeration devices = await BleakScanner.discover() myDevice = None for d in devices: if d.name == my_device_name: myDevice = d if(myDevice != None): # Device Connection async with BleakClient(str(myDevice.address)) as client: print('List of all characteristics: ') for characteristic in client.services.characteristics: x = client.services.characteristics.get(characteristic) print(client.services.get_service(x.service_handle).description, x.service_uuid, x.description, x.uuid, x.properties) asyncio.run(main())
List of all characteristics:
Generic Attribute Profile 00001801-0000-1000-8000-00805f9b34fb Service Changed 00002a05-0000-1000-8000-00805f9b34fb ['indicate'] Generic Access Profile 00001800-0000-1000-8000-00805f9b34fb Device Name 00002a00-0000-1000-8000-00805f9b34fb ['read', 'write'] Generic Access Profile 00001800-0000-1000-8000-00805f9b34fb Appearance 00002a01-0000-1000-8000-00805f9b34fb ['read'] Generic Access Profile 00001800-0000-1000-8000-00805f9b34fb Peripheral Preferred Connection Parameters 00002a04-0000-1000-8000-00805f9b34fb ['read'] Unknown c8c0a708-e361-4b5e-a365-98fa6b0a836f Unknown d5913036-2d8a-41ee-85b9-4e361aa5c8a7 ['read', 'write', 'notify'] Unknown c8c0a708-e361-4b5e-a365-98fa6b0a836f Unknown 09bf2c52-d1d9-c0b7-4145-475964544307 ['read', 'write-without-response', 'notify'] Device Information 0000180a-0000-1000-8000-00805f9b34fb Manufacturer Name String 00002a29-0000-1000-8000-00805f9b34fb ['read'] Device Information 0000180a-0000-1000-8000-00805f9b34fb Firmware Revision String 00002a26-0000-1000-8000-00805f9b34fb ['read'] Device Information 0000180a-0000-1000-8000-00805f9b34fb Hardware Revision String 00002a27-0000-1000-8000-00805f9b34fb ['read'] Device Information 0000180a-0000-1000-8000-00805f9b34fb Serial Number String 00002a25-0000-1000-8000-00805f9b34fb ['read'] Device Information 0000180a-0000-1000-8000-00805f9b34fb System ID 00002a23-0000-1000-8000-00805f9b34fb ['read']
If read or write are supported in the object properties, you can read characteristic values using read_gatt_char or you can write characteristic values using write_gatt_char.
2.3 Write and Read from command characteristic
The write_gatt_char and read_gatt_char functions allow to write commands and read the responses from a characteristic. In this example it is shown how to send the get state command and read the corresponding response.
# Command characteristic UUID CMD_UUID = "d5913036-2d8a-41ee-85b9-4e361aa5c8a7" # Send the GET STATE command await client.write_gatt_char(char_specifier=CMD_UUID, data=bytearray([0x82,0x00]), response=True) # Read the response from the characteristic and print to the console response = await client.read_gatt_char(CMD_UUID) print(f"command response: {response}")
If the characteristic supports notify it is also possible to create a callback function that accesses the characteristic content on your peripheral device. In the following, a generic example of a callback function and its usage:
# Command characteristics CMD_UUID = "d5913036-2d8a-41ee-85b9-4e361aa5c8a7" from bleak import BleakScanner, BleakClient import asyncio def command_characteristic_callback(sender, data): print(f"response using callback function: {data}") async def main(): # Device name to be searched for my_device_name = 'muse_v3' # Device Enumeration devices = await BleakScanner.discover() myDevice = None for d in devices: if d.name == my_device_name: myDevice = d if(myDevice != None): # Device Connection async with BleakClient(str(myDevice.address)) as client: await client.write_gatt_char(CMD_UUID, bytearray([0x82,0x00]), response=True) response = await client.read_gatt_char(CMD_UUID) print(f"command response: {response}") # Subscribe to command characteristic and send GET STATE command await client.start_notify(CMD_UUID, command_characteristic_callback) await client.write_gatt_char(CMD_UUID, bytearray([0x82,0x00]), response=True) asyncio.run(main())
Here you can look at the same response using the two different methods:
command response: bytearray(b'\x00\x03\x82\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') response using callback function: bytearray(b'\x00\x03\x82\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00')
2.4 Read from data characteristic
The data characteristic allows to read data when performing streaming acquisitions. In order to do so, you need to subscribe to the data characteristic and specify the callback function (where you could also decode data)
DATA_UUID = "09bf2c52-d1d9-c0b7-4145-475964544307" def data_characteristic_callback(sender,data): print(f"data: {data}")
Here, you can find an example of how to start and stop the streaming of accelerometer data (for further details refer to the Muse communication protocol) and a typical response as printed by the callback function:
# Subscribe to Data characteristic await client.start_notify(DATA_UUID, data_characteristic_callback) # Start buffered streaming of accelerometer data and read data await client.write_gatt_char(CMD_UUID, bytearray([0x02,0x05,0x06,0x02,0x00,0x00,0x01]),response=True) # Make the streaming last for 5 seconds await asyncio.sleep(5) # Stop the streaming await client.write_gatt_char(CMD_UUID, bytearray([0x02,0x01,0x02]), response=True)
data:bytearray(b'\xb8`\x1f\xd0\x1b\x0f\x00\x00$\x00\x81\x00\xe7\x074\x00s\x00\xf9\x07A\x00m\x00\xfa\x07-\x00u\x00\x02\x08@\x00r\x00\xfa\x07D\x00y\x00\xf3\x076\x00l\x00\xfa\x078\x00_\x00\x0c\x08*\x00e\x00\xfb\x07;\x00j\x00\xf5\x077\x00i\x00\xf8\x07)\x00{\x00\x00\x08?\x00\x81\x00\xf8\x07=\x00j\x00\x04\x084\x00e\x00\xf8\x076\x00o\x00\xfb\x072\x00p\x00\x03\x085\x00j\x00\xfe\x07;\x00y\x00\xf9\x07.\x00m\x00\xfa\x07') data:bytearray(b'\xd8c\x1f\xd0\x1b\x0f\x00\x008\x00x\x00\xfe\x07<\x00p\x00\xf9\x07.\x00i\x00\xfd\x07E\x00i\x00\xfe\x07C\x00s\x00\xfd\x07;\x00q\x00\x05\x08J\x00t\x00\xf3\x078\x00^\x00\xea\x075\x00d\x00\x1b\x08=\x00n\x00\x06\x080\x00j\x00\xf6\x07G\x00i\x00\x07\x08K\x00e\x00\x06\x08H\x00u\x00\xf6\x07C\x00o\x00\n\x081\x00m\x00\x01\x08B\x00}\x00\xf9\x07D\x00}\x00\xf9\x072\x00i\x00\xf5\x07-\x00Y\x00\x01\x08')
2.5 Unsubscribe
The start_notify function allows to activate notifications on a characteristic. When you finish working with the characteristic, disable notifications using stop_notify.
await client.stop_notify(CMD_UUID) await client.stop_notify(DATA_UUID)