Device Connection

Connecting to STU

To communicate with the ICOtronic system use the Network class:

class mytoolit.can.Network

Basic class to communicate with STU and sensor devices

We recommend you use the context manager to open and close the connection (to the STU):

>>> from asyncio import run
>>> from mytoolit.can import Network

>>> async def create_and_shutdown_network():
...     async with Network() as network:
...         pass # ← Your code goes here

>>> run(create_and_shutdown_network())

Connecting to Sensor Device

To connect to an STH use the coroutine Network.connect_sensor_device()

async Network.connect_sensor_device(identifier: int | str | EUI) None

Connect to a sensor device (e.g. SHA, SMH or STH)

Parameters

identifier:

The

  • MAC address (EUI),

  • name (str), or

  • device number (int)

of the sensor device we want to connect to

Example

>>> from asyncio import run

Connect to the sensor device with device number 0

>>> async def connect_sensor_device():
...     async with Network() as network:
...         await network.connect_sensor_device(0)
...         return await network.is_connected()
>>> run(connect_sensor_device())
True

Auxiliary Functionality

Reading Names

After your are connected to the sensor device you can read its (advertisement) name using the coroutine Network.get_name().

async Network.get_name(node: str | Node = 'STU 1', device_number: int = 255) str

Retrieve the name of a Bluetooth device

You can use this method to name of both

  1. disconnected and

  2. connected

devices.

  1. For disconnected devices (STHs) you will usually use the STU (e.g. STU 1) and the device number at the STU (in the range 0 up to the number of devices - 1) to retrieve the name.

  2. For connected devices you will use the device name and the special “self addressing” device number (0xff) to ask a device about its own name. Note: A connected STH will return its own name, regardless of the value of the device number.

Parameters

node:

The node which has access to the Bluetooth device

device_number:

The number of the Bluetooth device (0 up to the number of available devices - 1; 0xff for self addressing).

Returns

The (Bluetooth broadcast) name of the device

Example

>>> from asyncio import run, sleep
>>> from platform import system

Get Bluetooth advertisement name of device “0” from STU 1

>>> if system() == 'Linux':
...    async def reset():
...        async with Network() as network:
...            await network.reset_node('STU 1')
...    run(reset())
>>> async def get_bluetooth_device_name():
...     async with Network() as network:
...         await network.activate_bluetooth('STU 1')
...         # Wait for device scan in node STU 1 to take place
...         await sleep(2)
...         # We assume that at least one STH is available
...         return await network.get_name('STU 1', device_number=0)
>>> sth_name = run(get_bluetooth_device_name())
>>> isinstance(sth_name, str)
True
>>> 0 <= len(sth_name) <= 8
True

If you want to read the name of the sensor device then the parameter node should have the value "STH 1", if you want to read the name of the STU use the default value "STU 1".

>>> from asyncio import run
>>> from mytoolit.can import Network

>>> async def read_sensor_name(name):
...     async with Network() as network:
...         await network.connect_sensor_device(name)
...         sensor_name = await network.get_name("STH 1")
...         return sensor_name

>>> sensor_name = "Test-STH"
>>> run(read_sensor_name(sensor_name))
'Test-STH'

Streaming

Reading Data

After you connected to the sensor device you use the coroutine Network.open_data_stream() to open the data stream:

Network.open_data_stream(channels: StreamingConfiguration, timeout: float = 5) DataStreamContextManager

Open measurement data stream

Parameters

channels:

Specifies which measurement channels should be enabled

timeout:

The amount of seconds between two consecutive messages, before a TimeoutError will be raised

Returns

A context manager object for managing stream data

Examples

>>> from asyncio import run
>>> async def read_streaming_data():
...     async with Network() as network:
...         await network.connect_sensor_device(0)
...         channels = StreamingConfiguration(first=True, third=True)
...         async with network.open_data_stream(channels) as stream:
...             first = []
...             third = []
...             messages = 0
...             async for data, _ in stream:
...                 first.append(data.values[0])
...                 third.append(data.values[1])
...                 messages += 1
...                 if messages >= 3:
...                     break
...             return first, third
>>> first, third = run(read_streaming_data())
>>> len(first)
3
>>> len(third)
3

After you opened the stream use an async with statement to iterate over the received streaming data. For example, the code below:

>>> from mytoolit.can.streaming import StreamingConfiguration

>>> async def read_streaming_data():
...     async with Network() as network:
...         await network.connect_sensor_device("Test-STH")
...         channels = StreamingConfiguration(first=True)
...         async with network.open_data_stream(channels) as stream:
...             async for data, lost_messages in stream:
...                 print(data)
...                 break

# Example Output: [32579, 32637, 32575]@1724251001.976368
>>> run(read_streaming_data()) 
[...]@... #...
  • connects to a device called Test-STH,

  • opens a data stream for the first measurement channel,

  • receives a single streaming message and prints its representation.

The data returned by the async for (stream) is an object of the class StreamingData:

class mytoolit.can.streaming.StreamingData(counter: int, timestamp: float, values: Sequence[float])

Support for storing data of a streaming message

This object has the following attributes:

  • StreamingData.values: a list containing either two or three values,

  • StreamingData.timestamp: the timestamp when the data was collected (actually when it was received by the CAN controller)

  • StreamingData.counter: a cyclic message counter (0 – 255) that can be used to detect data loss

Note

The amount of data stored in StreamingData.values depends on the enabled streaming channels. For the recommended amount of one or three enabled channels the list contains three values. For

  • one enabled channel all three values belong to the same channel, while

  • for the three enabled channels

    • the first value belongs to the first channel,

    • the second to the second channel,

    • and the third to the third channel.

Storing Data

If you want to store streaming data for later use you can use the Storage class to open a context manager that lets you store data as HDF5 file via the method add_streaming_data() of the class StorageData:

StorageData.add_streaming_data(streaming_data: StreamingData) None

Add streaming data to the storage object

Parameters

streaming_data:

The streaming data that should be added to the storage

Examples

>>> from mytoolit.can.streaming import StreamingConfigBits

Store streaming data for single channel

>>> channel3 = StreamingConfiguration(first=False, second=False,
...                                   third=True)
>>> data1 = StreamingData(values=[1, 2, 3], counter=21, timestamp=1)
>>> data2 = StreamingData(values=[4, 5, 6], counter=22, timestamp=2)
>>> filepath = Path("test.hdf5")
>>> with Storage(filepath, channel3) as storage:
...     storage.add_streaming_data(data1)
...     storage.add_streaming_data(data2)
...     # Normally the class takes care about when to store back data
...     # to the disk itself. We do a manual flush here to check the
...     # number of stored items.
...     storage.acceleration.flush()
...     print(storage.acceleration.nrows)
6
>>> filepath.unlink()

Store streaming data for three channels

>>> all = StreamingConfiguration(first=True, second=True, third=True)
>>> data1 = StreamingData(values=[1, 2, 3], counter=21, timestamp=1)
>>> data2 = StreamingData(values=[4, 5, 6], counter=22, timestamp=2)
>>> with Storage(filepath, all) as storage:
...     storage.add_streaming_data(data1)
...     storage.add_streaming_data(data2)
...     storage.acceleration.flush()
...     print(storage.acceleration.nrows)
2
>>> filepath.unlink()

For a more complete example, please take a look at the HDF5 example code.

Determining Data Loss

Sometimes the

  • connection to your sensor device might be bad or

  • code might run too slow to retrieve/process streaming data.

In both cases there will be some form of data loss. The ICOc library currently takes multiple measures to detect data loss.

Bad Connection

The iterator for streaming data AsyncStreamBuffer will raise a StreamingTimeoutError, if there is no streaming data for a certain amount of time (default: 5 seconds):

exception mytoolit.can.streaming.StreamingTimeoutError

Raised if no streaming data was received for a certain amount of time

AsyncStreamBuffer also provides access to statistics that can be used to determine the amount of lost data. For example, if you iterate through the streaming messages with async for, then in addition to the streaming data the iterator will also return the amount of lost messages since the last successfully received message (lost_messages in the example below):

async with network.open_data_stream(channels) as stream:
    async for data, lost_messages in stream:
        if lost_messages > 0:
            print(f"Lost {lost_messages} messages!")

To access the overall data quality, since the start of streaming you can use the method AsyncStreamBuffer.dataloss():

AsyncStreamBuffer.dataloss() float

Calculate the overall amount of data loss

Returns

The overall amount of data loss as number between 0 (no data loss) and 1 (all data lost).

The example code below shows how to use this method:

>>> from asyncio import run
>>> from time import monotonic
>>> from mytoolit.can import Network

>>> async def determine_data_loss(identifier):
...       async with Network() as network:
...           await network.connect_sensor_device(identifier)
...
...           end = monotonic() + 1 # Read data for roughly one second
...           channels = StreamingConfiguration(first=True)
...           async with network.open_data_stream(channels) as stream:
...               async for data, lost_messages in stream:
...                   if monotonic() > end:
...                       break
...
...               return stream.dataloss()

>>> data_loss = run(determine_data_loss(identifier="Test-STH"))
>>> data_loss < 0.1 # We assume that the data loss was less than 10 %
True

If you want to calculate the amount of data loss for a specific time-span you can use the method AsyncStreamBuffer.reset() to reset the message statistics at the start of the time-span. In the following example we stream data for (roughly) 2.1 seconds and return a list with the amount of data loss over periods of 0.5 seconds:

>>> from asyncio import run
>>> from time import monotonic
>>> from mytoolit.can import Network

>>> async def determine_data_loss(identifier):
...       async with Network() as network:
...           await network.connect_sensor_device(identifier)
...
...           start = monotonic()
...           end = monotonic() + 2.1
...           last_reset = start
...           data_lost = []
...           channels = StreamingConfiguration(first=True)
...           async with network.open_data_stream(channels) as stream:
...               async for data, lost_messages in stream:
...                   current = monotonic()
...                   if current >= last_reset + 0.5:
...                      data_lost.append(stream.dataloss())
...                      stream.reset_stats()
...                      last_reset = current
...                   if current > end:
...                       break
...
...               return data_lost

>>> data_lost = run(determine_data_loss(identifier="Test-STH"))
>>> len(data_lost)
4
>>> all(map(lambda loss: loss < 0.1, data_lost))
True

Note

We used a overall runtime of 2.1 seconds, since in a timing interval of 2 seconds there is always the possibility that the code above either returns three or four data loss values depending on the specific timing.

Slow Processing of Data

The buffer of the CAN controller is only able to store a certain amount of streaming messages before it has to drop them to make room for new ones. For this reason the ICOc library will raise a StreamingBufferError, if the buffer for streaming messages exceeds a certain threshold (default: 10 000 messages):

exception mytoolit.can.streaming.StreamingBufferError

Raised if there are too many streaming messages in the buffer