1. Introduction
This document describes how to collect data from your Muse device using the Bluetooth Low Energy (BLE) functionalities provided by Python. This example is based on the standard Muse communication protocol [A] and Python packages:
- Muse API: https://gitlab.com/221e-Repositories/muse/python/muse_api
- Bleak package (version 0.21.1): https://bleak.readthedocs.io/en/latest/
- Frozendict package (version 2.3.4): https://pypi.org/project/frozendict/2.3.4/
- Matplotlib package (version 3.7.1): https://matplotlib.org/3.7.1/
- Numpy package (version 1.24.1): https://pypi.org/project/numpy/
- Struct module: https://docs.python.org/3/library/struct.html
2. Collect Data from your Muse
This example shows how to collect and plot accelerometer data from your Muse device using the BLE communication support provided by Python functions.
First, check that the BLE device is powered on and support connections by finding it using BleakScanner.discover function, as described in the previous tutorial (Connection And Control ). Then connect to the Muse using BleakClient.
# Device Enumeration my_device_name = 'muse_v3' devices = await BleakScanner.discover() myDevice = None for d in devices: print(d) if d.name == my_device_name: myDevice = d if(myDevice != None): # Device Connection async with BleakClient(str(myDevice.address)) as client:
Check the device status. If the Muse is in IDLE state, then you can start a streaming acquisition.
# Check device status await client.write_gatt_char(char_specifier=CMD_UUID, data=bytearray([0x82,0x00]), response=True) response = await client.read_gatt_char(CMD_UUID) if (response[2] == 0x82 and response[3] == 0x00 and response[4] == 0x02): print('System in IDLE state')
Subscribe to command and data characteristics, using the corresponding UUIDs.
# Command and Data characteristics CMD_UUID = "d5913036-2d8a-41ee-85b9-4e361aa5c8a7" DATA_UUID = "09bf2c52-d1d9-c0b7-4145-475964544307" # Subscribe to Data characteristic await client.start_notify(CMD_UUID, cmd_notification_handler) await client.start_notify(DATA_UUID, data_notification_handler)
Start a buffered streaming acquisition of accelerometer data, as described in the Muse communication protocol. In this example, the sampling frequency is set to 25 Hz. The acquisition length is set to 30 seconds and then automatically stopped.
# Start BUFFERED streaming of accelerometer data and read data # streaming type: 0x06 (buffered streaming) # acquisition mode: 0x02 (accelerometer only) # acquisition frequency: 0x01 (25 Hz) await client.write_gatt_char(CMD_UUID, data=bytearray([0x02,0x05,0x06,0x02,0x00,0x00,0x01]), True) # Make the streaming last for 30 seconds await asyncio.sleep(30) # Stop the streaming await client.write_gatt_char(CMD_UUID, data=bytearray([0x02,0x01,0x02]), True)
You can retrieve the Muse configuration by decoding the response to the start streaming command. This is performed by the DecodeMEMSFullScales function:
Gyroscope_resolution = ({0x00: 0.00875, 0x01: 0.0175, 0x02: 0.035, 0x03: 0.070}) Accelerometer_resolution = ({0x00: 0.122, 0x08: 0.244, 0x0C: 0.488, 0x04: 0.976}) HDR_resolution = ({0x00: 49.0, 0x10: 98.0, 0x30: 195.0,}) Magnetometer_resolution = ({0x00: 1000.0/6842.0, 0x40: 1000.0/3421.0, 0x80: 1000.0/2281.0, 0xC0: 1000.0/1711.0}) def DecodeMEMSFullScales(code: int): #Apply bitwise mask to get sensor full scale code (i.e., gyr, axl, hdr, mag LSB-order) gyrCode = (code & 0x03) axlCode = (code & 0x0C) hdrCode = (code & 0x30) magCode = (code & 0xC0) gyrRes = Gyroscope_resolution[gyrCode] # Gyroscope axlRes = Accelerometer_resolution[axlCode] # Accelerometer hdrRes = HDR_resolution[hdrCode] # HDR Accelerometer magRes = Magnetometer_resolution[magCode] # Magnetometer return gyrRes, axlRes, magRes, hdrRes
The function is applied to the data decoded from the response to start streaming command:
tmp = bytearray(4) tmp = data[4:7] code = struct.unpack("<I", tmp + b"\x00")[0] # padding necessary (function requires at least 4 bytes) gyrResolution, axlResolution, magResolution, hdrResolution = DecodeMEMSFullScales(code)
Setup a plot environment for real time visualization, just before starting the streaming acquisition.
PLOT_NUM_OF_SAMPLES = 500 # maximum number of samples to be displayed in the plot DELTA_TIME = 0.04 # time difference in [sec] between consecutive samples (@25 Hz) fig, ax = plt.subplots() canvas = np.zeros((480,640)) x = np.linspace(1, PLOT_NUM_OF_SAMPLES, PLOT_NUM_OF_SAMPLES) x *= DELTA_TIME # Time in [sec] acceleration_x = np.zeros(PLOT_NUM_OF_SAMPLES) * np.nan acceleration_y = np.zeros(PLOT_NUM_OF_SAMPLES) * np.nan acceleration_z = np.zeros(PLOT_NUM_OF_SAMPLES) * np.nan ax.set_xlim(x[0],x[(PLOT_NUM_OF_SAMPLES-1)]) ax.plot(x,acceleration_x,'b.',linestyle='dotted') ax.plot(x,acceleration_y,'r.',linestyle='dotted') ax.plot(x,acceleration_z,'y.',linestyle='dotted') ax.set_xlabel('Time [sec]') ax.set_ylabel('Acceleration [mg]') ax.set_title('Muse Streaming') fig.canvas.draw()
The data acquisition is managed through a dedicated callback function (data_notification_handler), where the received data are decoded. It is worth noting that each notification has a 8-bytes header containing the timestamp information. Each notification has a dimension of 128 bytes; therefore, excluding the header, 120 bytes will contain a certain integer number of samples containing the sensors’ data. In this particular case, where the only accelerometer is chosen, each sample will be contained into 6 bytes; therefore, in a 128 bytes notification there will be (128-8)/6=20 samples of accelerometer data.
DATA_SIZE_AXL = 6; # dimension of AXL packet AXL_BUFFER_SIZE = int((128 - 8) / DATA_SIZE_AXL) # number of packets for each 128-bytes notification notification_counter = 0 axl_buffer = np.zeros((AXL_BUFFER_SIZE,3)) # buffer to store accelerometer data def data_notification_handler(sender, data): """Decode data""" global notification_counter, axlResolution, acceleration_x, acceleration_y, acceleration_z, x, ax, fig, canvas # The first 8 bytes of the data buffer correspond to the Timestamp: we ignore them in this example start_idx = 8 # Define acceleration buffer to store decoded data axl_buffer = np.zeros((AXL_BUFFER_SIZE,3)) for k in range(AXL_BUFFER_SIZE): #Decode Accelerometer reading current_packet = bytearray(6) current_packet[:] = data[start_idx : start_idx + DATA_SIZE_AXL + 1] axl = [0.0] * 3 for i in range(3): # Extract channel raw value (i.e., 2-bytes each) raw_value = current_packet[2*i:2*(i+1)] # Convert to Int16 and apply sensitivity scaling axl[i] = int.from_bytes(raw_value, byteorder='little', signed=True) * axlResolution axl_buffer[k][0] = axl[0] axl_buffer[k][1] = axl[1] axl_buffer[k][2] = axl[2] start_idx += DATA_SIZE_AXL notification_counter += 1 """Update plot""" if (notification_counter > PLOT_NUM_OF_SAMPLES): # The data points filled all the plot area, so perform shift of old data acceleration_x = np.roll(acceleration_x,-AXL_BUFFER_SIZE) acceleration_y = np.roll(acceleration_y,-AXL_BUFFER_SIZE) acceleration_z = np.roll(acceleration_z,-AXL_BUFFER_SIZE) x = np.roll(x,-AXL_BUFFER_SIZE) for i in range(AXL_BUFFER_SIZE): acceleration_x[(PLOT_NUM_OF_SAMPLES-1)-AXL_BUFFER_SIZE+1+i] = axl_buffer[i][0] acceleration_y[(PLOT_NUM_OF_SAMPLES-1)-AXL_BUFFER_SIZE+1+i] = axl_buffer[i][1] acceleration_z[(PLOT_NUM_OF_SAMPLES-1)-AXL_BUFFER_SIZE+1+i] = axl_buffer[i][2] x[(PLOT_NUM_OF_SAMPLES-1)-AXL_BUFFER_SIZE+1+i] = x[(PLOT_NUM_OF_SAMPLES-1)-AXL_BUFFER_SIZE+i] + DELTA_TIME else: for i in range(AXL_BUFFER_SIZE): acceleration_x[notification_counter-AXL_BUFFER_SIZE+i] = axl_buffer[i][0] acceleration_y[notification_counter-AXL_BUFFER_SIZE+i] = axl_buffer[i][1] acceleration_z[notification_counter-AXL_BUFFER_SIZE+i] = axl_buffer[i][2] # update maximum and minimum limits max_yp = np.nanmax([np.nanmax(acceleration_x), np.nanmax(acceleration_y), np.nanmax(acceleration_z)]) + 50 min_yp = np.nanmin([np.nanmin(acceleration_x), np.nanmin(acceleration_y), np.nanmin(acceleration_z)]) - 50 ax.plot(x,acceleration_x,'b.',linestyle='dotted') ax.plot(x,acceleration_y,'r.',linestyle='dotted') ax.plot(x,acceleration_z,'y.',linestyle='dotted') ax.set_xlim(x[0],x[(PLOT_NUM_OF_SAMPLES-1)]) ax.set_ylim(min_yp,max_yp) plt.tight_layout() fig.canvas.draw() plt.pause(0.0001) return
The result should be domething like those represented in FIGURE 1.
FIGURE 1: PLOT OF ACCELEROMETER DATA EXAMPLE.