1. Introduction
This document describes how to control an external model using the orientation of your Muse device using the Bluetooth Low Energy (BLE) functionalities provided by Python. This example is based on the standard Muse communication protocol [A] and Python packages:
- Muse API: https://gitlab.com/221e-Repositories/muse/python/muse_api
- Bleak package (version 0.21.1): https://bleak.readthedocs.io/en/latest/
- Struct module: https://docs.python.org/3/library/struct.html
- Numpy package (version 1.24.1): https://pypi.org/project/numpy/
- Matplotlib package (version 3.8.2): https://matplotlib.org/3.8.2/
2. Collect Data from your Muse
First, check that the BLE device is powered on and support connections by finding it using BleakScanner.discover function, as described in the tutorial Connection And Control . Then connect to the Muse using BleakClient. Change my_device_name variable with the name of your device.
# Device name to be searched for my_device_name = 'muse_v3' # Device Enumeration devices = await BleakScanner.discover() myDevice = None for d in devices: print(d) if d.name == my_device_name: myDevice = d
Check the device status and if the Muse is in IDLE state, start the streaming acquisition.
await client.write_gatt_char(char_specifier=CMD_UUID, data=bytearray([0x82,0x00]), response=True) response = await client.read_gatt_char(CMD_UUID) if (response[0] == 0x00 response[2] == 0x82 and response[3] == 0x00 and response[4] == 0x02): print('System in IDLE state')
Subscribe to command and data characteristics, using the corresponding UUIDs.
# Command and Data characteristics CMD_UUID = "d5913036-2d8a-41ee-85b9-4e361aa5c8a7" DATA_UUID = "09bf2c52-d1d9-c0b7-4145-475964544307" # Subscribe to Command and Data characteristic await client.start_notify(CMD_UUID, cmd_notification_handler) await client.start_notify(DATA_UUID, data_notification_handler)
Start a streaming acquisition of quaternions data, as described in the Muse communication protocol. In this example, the sampling frequency is set to 25 Hz. The acquisition length is set to 30 seconds and then automatically stopped.
# Start BUFFERED streaming of accelerometer data and read data # streaming type: 0x08 (direct streaming) # acquisition mode: 0x10 (orientation only) # acquisition frequency: 0x01 (25 Hz) print("Start Streaming ...") await client.write_gatt_char(CMD_UUID, data=bytearray([0x02,0x05,0x08,0x10,0x00,0x00,0x01]), True) # Make the streaming last for 30 seconds await asyncio.sleep(30) # Stop the streaming await client.write_gatt_char(CMD_UUID, data=bytearray([0x02,0x01,0x02]), True) print("End Streaming ...")
The data acquisition is managed through a dedicated function called data_notification_handler. See the Insights section for further information.
The setup of the plot environment for real time visualization is described below.
# Set up the figure for real-time plot fig = plt.figure() ax = fig.add_subplot(111, projection='3d') # Fix the axes scaling ax.set_box_aspect([1, 1, 1]) ax.view_init(40,20) # Define arrow starting points x_start = [0, 0, 0] y_start = [0, 0, 0] z_start = [0, 0, 0] # Define arrow directions and lengths arrow_length = 20 arrow_directions = np.array([[arrow_length, 0, 0], [0, arrow_length, 0], [0, 0, arrow_length]]) # Plot arrows quiv = ax.quiver(x_start, y_start, z_start, arrow_directions[0,:], arrow_directions[1,:], arrow_directions[2,:], color=['b', 'r', 'y'], arrow_length_ratio=0.1) # Set axis limits ax.set_xlim([-25, 25]) ax.set_ylim([-25, 25]) ax.set_zlim([-25, 25]) # Set axis labels ax.set_xlabel('X') ax.set_ylabel('Y') ax.set_zlabel('Z') fig.canvas.draw() plt.pause(3.0)
The result will be a reference system represented as a set of three arrows. Such a representation of the reference frame will move based on device orientation.
3. Insights
For what concern message decoding, we provide in the following some code snippets that show how to decode orientation quaternion received from the device. Each notification has 8-bytes header containing the timestamp information, so we are not considering that bytes. To apply the transformation to the model, we need:
- quaternion_product function, that computes the product of the current quaternion and the previous one.
- quaternion_conjugate function, that computes the conjugate of a quaternion.
- quat2_rotation_matrix function, that from a quaternion computes the rotation matrix that will be applied to the model.
For full implementation details, see the provided script.
def data_notification_handler(sender, data): """Decode data""" global q_curr, q_prev, q_curr_list global ax, arrow_directions,quiv, i, fig qi_bytes = data[8:10] qj_bytes = data[10:12] qk_bytes = data[12:14] # Converting bytes to int16 qi_val = struct.unpack('<h', bytes(qi_bytes))[0] qj_val = struct.unpack('<h', bytes(qj_bytes))[0] qk_val = struct.unpack('<h', bytes(qk_bytes))[0] # Converting int16 to single and normalizing by 32767 qi_val = qi_val / 32767.0 qj_val = qj_val / 32767.0 qk_val = qk_val / 32767.0 # Compute (unit) quaternion real component qw_val = 1.0 if (1 - (qi_val**2 + qj_val**2 + qk_val**2)) > 0: qw_val = math.sqrt(1 - (qi_val**2 + qj_val**2 + qk_val**2)) # Update orientation quaternion variables q_curr = np.array([qw_val, qi_val, qj_val, qk_val]) # Compute angular difference from quaternions qdelta = quaternion_product(q_curr, quaternion_conjugate(q_prev)) Rquat = quat2_rotation_matrix(qdelta) q_prev = q_curr # Apply transformation to the model arrow_directions = np.transpose(np.matmul(Rquat, arrow_directions.T)) quiv.remove() #remove previous quiver quiv = ax.quiver([0, 0, 0],[0, 0, 0],[0, 0, 0], arrow_directions[0,:], arrow_directions[1, :], arrow_directions[2,:], color=['b', 'r', 'y'], arrow_length_ratio=0.1) fig.canvas.draw() plt.pause(0.0001) return
The result should be something like those represented below.