API

To communicate with the ICOtronic system use the Network class:

class mytoolit.can.Network(sender: str | Node = 'SPU 1')

Basic class to communicate with STU and sensor devices

Note

Please ignore the sender parameter, you will never need to change it. In fact, we are planning to get rid of the parameter in a future version of ICOc.

We recommend you use the context manager to open and close the connection (to the STU):

>>> from asyncio import run
>>> from mytoolit.can import Network

>>> async def create_and_shutdown_network():
...     async with Network() as network:
...         pass # ← Your code goes here

>>> run(create_and_shutdown_network())

To connect to an STH to read streaming data you can use the coroutine Network.open_data_stream():

Network.open_data_stream(first: bool = False, second: bool = False, third: bool = False, timeout: float = 5) DataStreamContextManager

Open measurement data stream

Parameters

first:

Specifies if the data of the first measurement channel should be streamed or not

second:

Specifies if the data of the second measurement channel should be streamed or not

third:

Specifies if the data of the third measurement channel should be streamed or not

timeout:

The amount of seconds between two consecutive messages, before a TimeoutError will be raised

Returns

A context manager object for managing stream data

Examples

>>> from asyncio import run
>>> async def read_streaming_data():
...     async with Network() as network:
...         await network.connect_sensor_device(0)
...         async with network.open_data_stream(first=True,
...                                             third=True) as stream:
...             stream_data = StreamingData()
...             messages = 0
...             async for data in stream:
...                 stream_data.extend(data)
...                 messages += 1
...                 if messages >= 3:
...                     break
...             return stream_data
>>> stream_data = run(read_streaming_data())
>>> len(stream_data.first)
3
>>> len(stream_data.second)
0
>>> len(stream_data.third)
3

After you opened the stream use an async with statement to iterate over the received streaming data. For example, the code below:

>>> async def read_streaming_data():
...     async with Network() as network:
...         await network.connect_sensor_device("Test-STH")
...
...         async with network.open_data_stream(first=True) as stream:
...             async for data in stream:
...                 print(data)
...                 break

>>> run(read_streaming_data()) 
1: [...]
2: []
3: []
  • connects to a device called Test-STH,

  • opens a data stream for the first measurement channel,

  • receives a single streaming message and prints its representation.

The data returned by the async for (stream) is an object of the class StreamingData:

class mytoolit.can.streaming.StreamingData(first: List[TimestampedValue] | None = None, second: List[TimestampedValue] | None = None, third: List[TimestampedValue] | None = None)

Auxiliary class to store streaming data

This object stores three lists containing TimestampedValue objects, which you can access using the attributes:

  • StreamingData.first,

  • StreamingData.second, and

  • StreamingData.third.

You can combine multiple TimestampedValue’s using the method StreamingData.extend():

StreamingData.extend(data: StreamingData) None

Add additional streaming data

Parameters

data:

The streaming data that should be added to this streaming data object

Examples

>>> value11 = TimestampedValue(timestamp=11, value=11, counter=1)
>>> value12 = TimestampedValue(timestamp=12, value=12, counter=2)
>>> value13 = TimestampedValue(timestamp=13, value=13, counter=3)
>>> value14 = TimestampedValue(timestamp=14, value=14, counter=4)
>>> value21 = TimestampedValue(timestamp=21, value=21, counter=5)
>>> value22 = TimestampedValue(timestamp=22, value=22, counter=6)
>>> value31 = TimestampedValue(timestamp=31, value=31, counter=7)
>>> value32 = TimestampedValue(timestamp=32, value=32, counter=8)
>>> value33 = TimestampedValue(timestamp=33, value=33, counter=9)
>>> value34 = TimestampedValue(timestamp=34, value=34, counter=10)
>>> data = StreamingData([value11, value12], [], [value31, value32])
>>> other = StreamingData([value13, value14], [value21, value22],
...                       [value33, value34])
>>> data.extend(other)
>>> data
1: [11@11 (1), 12@12 (2), 13@13 (3), 14@14 (4)]
2: [21@21 (5), 22@22 (6)]
3: [31@31 (7), 32@32 (8), 33@33 (9), 34@34 (10)]
>>> other
1: [13@13 (3), 14@14 (4)]
2: [21@21 (5), 22@22 (6)]
3: [33@33 (9), 34@34 (10)]

Another useful method is StreamingData.apply(), which can be used to change the values stored in the streaming data (e.g. by converting the 16 bit ADC value into multiples of \(g\)):

StreamingData.apply(function: Callable[[float | Quantity], float | Quantity], first: bool = True, second: bool = True, third: bool = True) None

Apply a certain function to the streaming data

Parameters

function:

The function that should be applied to the streaming data

first:

Specifies if the function should be applied to the first measurement channel or not

second:

Specifies if the function should be applied to the second measurement channel or not

third:

Specifies if the function should be applied to the third measurement channel or not

Examples

>>> value11 = TimestampedValue(timestamp=11, value=11, counter=5)
>>> value12 = TimestampedValue(timestamp=12, value=12, counter=6)
>>> value13 = TimestampedValue(timestamp=13, value=13, counter=7)
>>> value14 = TimestampedValue(timestamp=14, value=14, counter=8)
>>> data = StreamingData([value11, value12], [value13], [value14])
>>> data
1: [11@11 (5), 12@12 (6)]
2: [13@13 (7)]
3: [14@14 (8)]
>>> data.apply(lambda value: value + 10)
>>> data
1: [21@11 (5), 22@12 (6)]
2: [23@13 (7)]
3: [24@14 (8)]
>>> value11 = TimestampedValue(timestamp=11, value=11, counter=11)
>>> value12 = TimestampedValue(timestamp=12, value=12, counter=12)
>>> data = StreamingData([value11, value12])
>>> data
1: [11@11 (11), 12@12 (12)]
2: []
3: []
>>> data.apply(lambda value: value*2)
>>> data
1: [22@11 (11), 24@12 (12)]
2: []
3: []

A TimestampedValue:

class mytoolit.can.streaming.TimestampedValue(timestamp: float, value: float | Quantity, counter: int)

Store a single (streaming) value and its timestamp

contains:

  • a value (TimestampedValue.value),

  • a timestamp (TimestampedValue.timestamp), and

  • a message counter (TimestampedValue.counter).

The attribute TimestampedValue.value will usually store a 16 bit ADC value (:type:`int`) unless you convert the data (for example, using StreamingData.apply()).

Examples

For code examples, please check out the examples directory:

  • Read STH Name: Read the name of the „first“ available STH (device id 0).

  • Read Data Points: Read five acceleration messages (5 · 3 = 15 values) and print their string representation (value, timestamp and message counter)