To communicate with the ICOtronic system use the Network class:
Basic class to communicate with STU and sensor devices
We recommend you use the context manager to open and close the connection (to the STU):
>>> from asyncio import run
>>> from mytoolit.can import Network
>>> async def create_and_shutdown_network():
... async with Network() as network:
... pass # ← Your code goes here
>>> run(create_and_shutdown_network())
To connect to an STH use the coroutine Network.connect_sensor_device()
Connect to a sensor device (e.g. SHA, SMH or STH)
The
MAC address (EUI),
name (str), or
device number (int)
of the sensor device we want to connect to
>>> from asyncio import run
Connect to the sensor device with device number 0
>>> async def connect_sensor_device():
... async with Network() as network:
... await network.connect_sensor_device(0)
... return await network.is_connected()
>>> run(connect_sensor_device())
True
After your are connected to the sensor device you can read its (advertisement) name using the coroutine Network.get_name().
Retrieve the name of a Bluetooth device
You can use this method to name of both
disconnected and
connected
devices.
For disconnected devices (STHs) you will usually use the STU (e.g. STU 1) and the device number at the STU (in the range 0 up to the number of devices - 1) to retrieve the name.
For connected devices you will use the device name and the special “self addressing” device number (0xff) to ask a device about its own name. Note: A connected STH will return its own name, regardless of the value of the device number.
The node which has access to the Bluetooth device
The number of the Bluetooth device (0 up to the number of available devices - 1; 0xff for self addressing).
The (Bluetooth broadcast) name of the device
>>> from asyncio import run, sleep
>>> from platform import system
Get Bluetooth advertisement name of device “0” from STU 1
>>> if system() == 'Linux':
... async def reset():
... async with Network() as network:
... await network.reset_node('STU 1')
... run(reset())
>>> async def get_bluetooth_device_name():
... async with Network() as network:
... await network.activate_bluetooth('STU 1')
... # Wait for device scan in node STU 1 to take place
... await sleep(2)
... # We assume that at least one STH is available
... return await network.get_name('STU 1', device_number=0)
>>> sth_name = run(get_bluetooth_device_name())
>>> isinstance(sth_name, str)
True
>>> 0 <= len(sth_name) <= 8
True
If you want to read the name of the sensor device then the parameter node should have the value "STH 1", if you want to read the name of the STU use the default value "STU 1".
>>> from asyncio import run
>>> from mytoolit.can import Network
>>> async def read_sensor_name(name):
... async with Network() as network:
... await network.connect_sensor_device(name)
... sensor_name = await network.get_name("STH 1")
... return sensor_name
>>> sensor_name = "Test-STH"
>>> run(read_sensor_name(sensor_name))
'Test-STH'
After you connected to the sensor device you use the coroutine Network.open_data_stream() to open the data stream:
Open measurement data stream
Specifies which measurement channels should be enabled
The amount of seconds between two consecutive messages, before a TimeoutError will be raised
A context manager object for managing stream data
>>> from asyncio import run
>>> async def read_streaming_data():
... async with Network() as network:
... await network.connect_sensor_device(0)
... channels = StreamingConfiguration(first=True, third=True)
... async with network.open_data_stream(channels) as stream:
... first = []
... third = []
... messages = 0
... async for data, _ in stream:
... first.append(data.values[0])
... third.append(data.values[1])
... messages += 1
... if messages >= 3:
... break
... return first, third
>>> first, third = run(read_streaming_data())
>>> len(first)
3
>>> len(third)
3
After you opened the stream use an async with statement to iterate over the received streaming data. For example, the code below:
>>> from mytoolit.can.streaming import StreamingConfiguration
>>> async def read_streaming_data():
... async with Network() as network:
... await network.connect_sensor_device("Test-STH")
... channels = StreamingConfiguration(first=True)
... async with network.open_data_stream(channels) as stream:
... async for data, lost_messages in stream:
... print(data)
... break
# Example Output: [32579, 32637, 32575]@1724251001.976368
>>> run(read_streaming_data())
[...]@... #...
connects to a device called Test-STH,
opens a data stream for the first measurement channel,
receives a single streaming message and prints its representation.
The data returned by the async for (stream) is an object of the class StreamingData:
Support for storing data of a streaming message
This object has the following attributes:
StreamingData.values: a list containing either two or three values,
StreamingData.timestamp: the timestamp when the data was collected (actually when it was received by the CAN controller)
StreamingData.counter: a cyclic message counter (0 – 255) that can be used to detect data loss
Note
The amount of data stored in StreamingData.values depends on the enabled streaming channels. For the recommended amount of one or three enabled channels the list contains three values. For
one enabled channel all three values belong to the same channel, while
for the three enabled channels
the first value belongs to the first channel,
the second to the second channel,
and the third to the third channel.
If you want to store streaming data for later use you can use the Storage class to open a context manager that lets you store data as HDF5 file via the method add_streaming_data() of the class StorageData:
Add streaming data to the storage object
The streaming data that should be added to the storage
>>> from mytoolit.can.streaming import StreamingConfigBits
Store streaming data for single channel
>>> channel3 = StreamingConfiguration(first=False, second=False,
... third=True)
>>> data1 = StreamingData(values=[1, 2, 3], counter=21, timestamp=1)
>>> data2 = StreamingData(values=[4, 5, 6], counter=22, timestamp=2)
>>> filepath = Path("test.hdf5")
>>> with Storage(filepath, channel3) as storage:
... storage.add_streaming_data(data1)
... storage.add_streaming_data(data2)
... # Normally the class takes care about when to store back data
... # to the disk itself. We do a manual flush here to check the
... # number of stored items.
... storage.acceleration.flush()
... print(storage.acceleration.nrows)
6
>>> filepath.unlink()
Store streaming data for three channels
>>> all = StreamingConfiguration(first=True, second=True, third=True)
>>> data1 = StreamingData(values=[1, 2, 3], counter=21, timestamp=1)
>>> data2 = StreamingData(values=[4, 5, 6], counter=22, timestamp=2)
>>> with Storage(filepath, all) as storage:
... storage.add_streaming_data(data1)
... storage.add_streaming_data(data2)
... storage.acceleration.flush()
... print(storage.acceleration.nrows)
2
>>> filepath.unlink()
For a more complete example, please take a look at the HDF5 example code.
Sometimes the
connection to your sensor device might be bad or
code might run too slow to retrieve/process streaming data.
In both cases there will be some form of data loss. The ICOc library currently takes multiple measures to detect data loss.
The iterator for streaming data AsyncStreamBuffer will raise a StreamingTimeoutError, if there is no streaming data for a certain amount of time (default: 5 seconds):
Raised if no streaming data was received for a certain amount of time
AsyncStreamBuffer also provides access to statistics that can be used to determine the amount of lost data. For example, if you iterate through the streaming messages with async for, then in addition to the streaming data the iterator will also return the amount of lost messages since the last successfully received message (lost_messages in the example below):
async with network.open_data_stream(channels) as stream:
async for data, lost_messages in stream:
if lost_messages > 0:
print(f"Lost {lost_messages} messages!")
To access the overall data quality, since the start of streaming you can use the method AsyncStreamBuffer.dataloss():
Calculate the overall amount of data loss
The overall amount of data loss as number between 0 (no data loss) and 1 (all data lost).
The example code below shows how to use this method:
>>> from asyncio import run
>>> from time import monotonic
>>> from mytoolit.can import Network
>>> async def determine_data_loss(identifier):
... async with Network() as network:
... await network.connect_sensor_device(identifier)
...
... end = monotonic() + 1 # Read data for roughly one second
... channels = StreamingConfiguration(first=True)
... async with network.open_data_stream(channels) as stream:
... async for data, lost_messages in stream:
... if monotonic() > end:
... break
...
... return stream.dataloss()
>>> data_loss = run(determine_data_loss(identifier="Test-STH"))
>>> data_loss < 0.1 # We assume that the data loss was less than 10 %
True
If you want to calculate the amount of data loss for a specific time-span you can use the method AsyncStreamBuffer.reset() to reset the message statistics at the start of the time-span. In the following example we stream data for (roughly) 2.1 seconds and return a list with the amount of data loss over periods of 0.5 seconds:
>>> from asyncio import run
>>> from time import monotonic
>>> from mytoolit.can import Network
>>> async def determine_data_loss(identifier):
... async with Network() as network:
... await network.connect_sensor_device(identifier)
...
... start = monotonic()
... end = monotonic() + 2.1
... last_reset = start
... data_lost = []
... channels = StreamingConfiguration(first=True)
... async with network.open_data_stream(channels) as stream:
... async for data, lost_messages in stream:
... current = monotonic()
... if current >= last_reset + 0.5:
... data_lost.append(stream.dataloss())
... stream.reset_stats()
... last_reset = current
... if current > end:
... break
...
... return data_lost
>>> data_lost = run(determine_data_loss(identifier="Test-STH"))
>>> len(data_lost)
4
>>> all(map(lambda loss: loss < 0.1, data_lost))
True
Note
We used a overall runtime of 2.1 seconds, since in a timing interval of 2 seconds there is always the possibility that the code above either returns three or four data loss values depending on the specific timing.
The buffer of the CAN controller is only able to store a certain amount of streaming messages before it has to drop them to make room for new ones. For this reason the ICOc library will raise a StreamingBufferError, if the buffer for streaming messages exceeds a certain threshold (default: 10 000 messages):
Raised if there are too many streaming messages in the buffer