To communicate with the ICOtronic system use the Network class:
Basic class to communicate with STU and sensor devices
Note
Please ignore the sender parameter, you will never need to change it. In fact, we are planning to get rid of the parameter in a future version of ICOc.
We recommend you use the context manager to open and close the connection (to the STU):
>>> from asyncio import run
>>> from mytoolit.can import Network
>>> async def create_and_shutdown_network():
... async with Network() as network:
... pass # ← Your code goes here
>>> run(create_and_shutdown_network())
To connect to an STH to read streaming data you can use the coroutine Network.open_data_stream():
Open measurement data stream
Specifies if the data of the first measurement channel should be streamed or not
Specifies if the data of the second measurement channel should be streamed or not
Specifies if the data of the third measurement channel should be streamed or not
The amount of seconds between two consecutive messages, before a TimeoutError will be raised
A context manager object for managing stream data
>>> from asyncio import run
>>> async def read_streaming_data():
... async with Network() as network:
... await network.connect_sensor_device(0)
... async with network.open_data_stream(first=True,
... third=True) as stream:
... stream_data = StreamingData()
... messages = 0
... async for data in stream:
... stream_data.extend(data)
... messages += 1
... if messages >= 3:
... break
... return stream_data
>>> stream_data = run(read_streaming_data())
>>> len(stream_data.first)
3
>>> len(stream_data.second)
0
>>> len(stream_data.third)
3
After you opened the stream use an async with statement to iterate over the received streaming data. For example, the code below:
>>> async def read_streaming_data():
... async with Network() as network:
... await network.connect_sensor_device("Test-STH")
...
... async with network.open_data_stream(first=True) as stream:
... async for data in stream:
... print(data)
... break
>>> run(read_streaming_data())
1: [...]
2: []
3: []
connects to a device called Test-STH,
opens a data stream for the first measurement channel,
receives a single streaming message and prints its representation.
The data returned by the async for (stream) is an object of the class StreamingData:
Auxiliary class to store streaming data
This object stores three lists containing TimestampedValue objects, which you can access using the attributes:
StreamingData.first,
StreamingData.second, and
StreamingData.third.
You can combine multiple TimestampedValue’s using the method StreamingData.extend():
Add additional streaming data
The streaming data that should be added to this streaming data object
>>> value11 = TimestampedValue(timestamp=11, value=11, counter=1)
>>> value12 = TimestampedValue(timestamp=12, value=12, counter=2)
>>> value13 = TimestampedValue(timestamp=13, value=13, counter=3)
>>> value14 = TimestampedValue(timestamp=14, value=14, counter=4)
>>> value21 = TimestampedValue(timestamp=21, value=21, counter=5)
>>> value22 = TimestampedValue(timestamp=22, value=22, counter=6)
>>> value31 = TimestampedValue(timestamp=31, value=31, counter=7)
>>> value32 = TimestampedValue(timestamp=32, value=32, counter=8)
>>> value33 = TimestampedValue(timestamp=33, value=33, counter=9)
>>> value34 = TimestampedValue(timestamp=34, value=34, counter=10)
>>> data = StreamingData([value11, value12], [], [value31, value32])
>>> other = StreamingData([value13, value14], [value21, value22],
... [value33, value34])
>>> data.extend(other)
>>> data
1: [11@11 (1), 12@12 (2), 13@13 (3), 14@14 (4)]
2: [21@21 (5), 22@22 (6)]
3: [31@31 (7), 32@32 (8), 33@33 (9), 34@34 (10)]
>>> other
1: [13@13 (3), 14@14 (4)]
2: [21@21 (5), 22@22 (6)]
3: [33@33 (9), 34@34 (10)]
Another useful method is StreamingData.apply(), which can be used to change the values stored in the streaming data (e.g. by converting the 16 bit ADC value into multiples of \(g\)):
Apply a certain function to the streaming data
The function that should be applied to the streaming data
Specifies if the function should be applied to the first measurement channel or not
Specifies if the function should be applied to the second measurement channel or not
Specifies if the function should be applied to the third measurement channel or not
>>> value11 = TimestampedValue(timestamp=11, value=11, counter=5)
>>> value12 = TimestampedValue(timestamp=12, value=12, counter=6)
>>> value13 = TimestampedValue(timestamp=13, value=13, counter=7)
>>> value14 = TimestampedValue(timestamp=14, value=14, counter=8)
>>> data = StreamingData([value11, value12], [value13], [value14])
>>> data
1: [11@11 (5), 12@12 (6)]
2: [13@13 (7)]
3: [14@14 (8)]
>>> data.apply(lambda value: value + 10)
>>> data
1: [21@11 (5), 22@12 (6)]
2: [23@13 (7)]
3: [24@14 (8)]
>>> value11 = TimestampedValue(timestamp=11, value=11, counter=11)
>>> value12 = TimestampedValue(timestamp=12, value=12, counter=12)
>>> data = StreamingData([value11, value12])
>>> data
1: [11@11 (11), 12@12 (12)]
2: []
3: []
>>> data.apply(lambda value: value*2)
>>> data
1: [22@11 (11), 24@12 (12)]
2: []
3: []
Store a single (streaming) value and its timestamp
contains:
a value (TimestampedValue.value),
a timestamp (TimestampedValue.timestamp), and
a message counter (TimestampedValue.counter).
The attribute TimestampedValue.value will usually store a 16 bit ADC value (:type:`int`) unless you convert the data (for example, using StreamingData.apply()).
For code examples, please check out the examples directory:
Read STH Name: Read the name of the „first“ available STH (device id 0).
Read Data Points: Read five acceleration messages (5 · 3 = 15 values) and print their string representation (value, timestamp and message counter)