Skip to main content

Setting up custom sensors with pipe pump

On Linux-based systems, the recommended way to integrate data streams from sensors is to use a pipe pump. This guide describes how to write such an integration.

Pipe pump description

A pipe pump will start a separate subprocess and read the subprocess' standard out into SA Engine one row at a time. This means that each row printed to the standard out from the subprocess becomes one element emitted by the subscribe inside SA Engine.

The subprocess runs a data generator program that polls the sensor and sends the sensor data to standard out. The data generator is implemented for each sensor. It can be written in a language recommended by the sensor provider, for example C, Python or a shell script. The only requirement is that the process can be started by a command on the edge device.

In SA Engine subscriptions to the signal are created and wrapped in a data stream function. The data stream wrapper will become the interface that is used to read data from the signal and create models that work on it.

Overview of a pipe pump integration. The data generator reads the sensors and sends data to SA Engine.

OSQL setup in SA Engine

To implement the OSQL side of the pipe pump, the built-in pipe pump functions are used. The pipe pump is first registered with a command that will be used to spawn the data generator process. Then subscriptions are added for the signal data in the data generator that we are interested in.

The data generator will be implemented to send sensor data on these subscriptions and the pipe pump will represent them as dataflows. A signal interface should be added to manage the subscriptions, parse and represent the sensor data.

Pipe pump integration from SA Engine point of view. A data generator is spawned and subscriptions are sent to it. The sensor data that we subscribe to will then be available in data flows. A signal interface represents the flows to model implementors.

Registering and deregistering a pipe pump

When registering a pipe pump in SA Engine you bind a command to run to a flow namespace using the function _pp:register_flow(Charstring ns, Charstring command, Charstring mode).

The command should be a string representation of the command that starts the data generator subprocess. To start a python process it could be "python3 myprogram.py".

The namespace ns is the name that will prefix all subscriptions.

The mode can be either "basic" or "restart".

For example, to register a pipe pump named mypipe which uses a shell script as data generator:

_pp:register_flow("mypipe","my_datagenerator.sh -arg1 -arg2", "basic");

Subscribing to and unsubscribing from signals

In SA Engine, subscription events are sent with the subscribe("<namespace>:<signal_name>") command.

For example, subscribing to a signal named mysignal on a pipe pump with namespace mypipe:

subscribe("mypipe:mysignal");

This sends the string "+mysignal\n" to standard input of the subprocess bound to the pipe pump mypipe. The first subscription is sent as an argument when spawing the data generator process.

In SA Engine, unsubscription events are sent automatically when the continuous query is stopped.

Adding the signal interface

SA Engine contains an OSQL type Signal as a general representation of a sensor data stream. This is called the Sensor ontology.

To enable standard interaction with signal streams it is recommended to create a Data Stream Wrapper for each sensor. To do this, first a new type is created under the Signal type. To add a new Signal type called mysensor:signal use:

create type mysensor:signal under Signal

An instance of mysensor:signal is created for each signal in the sensor by calling create instances. All standard properties listed in the Data stream wrapper guide can be set when instantiating, but the minimum is to set the name. For example, to create signals a and b in mysensor:signal:

create mysensor:signal (name)
instances
("a"),
("b");

An example with name and doc:

create mysensor:signal (name, doc)
instances
("a", "my description of signal a"),
("b", "my description of signal b");

Each signal also need an implementation of ts_signal_stream(s). For pipe pump signals, this is based on the subscription to the signal flow. First define a function that takes a signal instance and does a subscription for it:

create function mypipe_signal_stream(mysensor:signal signal) -> Stream of Charstring
as select Stream of row
from Charstring row
where row in subscribe("mypipe:" + name(signal));

The rows sent by the data generator are lists of comma-separated values, starting with the signal name. Assuming the signals contains time values and value readings as follows:

a,<time_val1>,<a_val1>
a,<time_val2>,<a_val2>

Improve mypipe_signal_stream with parsing like this:

create function ts_mypipe_signal_stream(mysensor:signal signal) -> Stream of Timeval
as select Stream of ts(timeval(time), value)
from Charstring row, vector data, number time, number value
where row in subscribe("mypipe:" + name(signal))
and data = csv:from_string(row)
and value = skip(data,2)
and ts = data[2];

Now that the function is defined, we set it as the ts_signal_stream for all signals in mysensor

for each mysensor:signal s {
set ts_signal_stream(s) = ts_mypipe_signal_stream(s);
};

The sensor ontology also contains the function signal_stream(Signal s) which outputs a stream without timestamps. This should not be redefined in the wrapper, it is already implemented to output the values of ts_signal_stream.

Using the signal interface

The OSQL function signals() list all instances of type Signal. After creating the instances of mysensor:signal they will be included in this list.

To get the stream of sensor data from a given signal use the functions signal_stream(Signal s) or ts_signal_stream(Signal s). These can also be used with the signal name as signal_stream(Charstring s) or ts_signal_stream(Charstring s). To get the signal with a certain name, use signal_named(Charstring name).

Writing the data generator

The data generator can be written in any language supported by the edge platform. The example implementation below uses Python, other common options are C or shell scripts.

The data generator needs a subscription thread which reads subscription requests on standard in and maintains a list of subscriptions. Subscriptions requests can also be appended to the spawning command, so the data generator should parse the arguments looking for signals.

On the main thread it should poll the sensors with listed subscriptions and send the results to standard out.

Pipe pump integration from the data generator point of view. For each subscription the sensors are polled and the results are written to standard out.

The subscription thread

The subprocess producing the data should listen for new subscription and unsubscription events on its standard input. A subscription is activated by receiving the string "+<signal_name>\n". When the suprocess receives a message like this it should begin to produce values from the signal with name <signal_name>.

A subscription is deactivated (unsubscribed) by receiving the string "-<signal_name>\n". This will inversely tell the subprocess to stop producing values for the given signal.

The main thread

The main thread will create the list of subscribers and verify if there are any initial subscriptions in the process arguments, before starting the subscription thread.

It is up to the data generator to decide the polling frequency for the signal. For each iteration it should verify the list of subscribers, poll the sensors of interest, and write the result to standard out.

The output should be one row for each signal value. Each row procuded by the subprocess is expected to start with "<signal_name>,". The rest of the line should be a list of comma-separated values.

For example, if the subprocess has a signal temp with time and temperature values it should print rows to standard out that has the following format:

temp,<time_val1>,<temp_val1>
temp,<time_val2>,<temp_val2>
temp,<time_val3>,<temp_val3>
...

Protocol overview

The following sequence diagram describes the protocol:

Pipe pump protocol

NOTE If you are going to use the pipe pump from a local terminal it is imperative that you ignore SIGINT.

Example 1: BME280 sensor on Raspberry Pi

This is an example showing how you can set up a BME280 humidity sensor that provides temperature, pressure, and humidity signals on a Raspberry Pi.

This example can also be downloaded as a model in Pipe pump example: BME280

BME280 on a Raspberry Pi Zero

You can try this out yourself if you have a Raspberry Pi and a BME280 breakout. The code below also requires the BME280 python library to be installed.

The easiest way to setup and deploy the data generator and OSQL code to the device is to:

  1. Create a new model (the SA Studio User Guide describes how to create and deploy models).
  2. Add the data generator script bme.py to the model folder.
  3. Add the OSQL code for setting up the sensors and pipe pump to master.osql.
  4. Deploy the model onto the device (see SA Studio User Guide).

The data generator

Here we have a data generator script bme.py which is a Python program that follows the specification of the pipe pump protocol.

The code spawns a thread thread_function that listens to standard in and adds a signal to the global signals list whenever a new string "+<signal_name>\n" is written on standard in. It also removes any signal whenever a new string "-<signal_name>\n" is received.

The main loop then checks if there are any known signals in the global signals list. If a known signal is present ("t" for temperature, "h" for humidity, or "p" for pressure) then it reads the value from the sensor and prints it to standard out on the format "<signal_name>,<time>,<value>".

#!/usr/bin/python3
import sys
import time
import select
import threading
import signal

# Imports for the BME280
from smbus import SMBus
from bme280 import BME280

# Initialise the BME280
bus = SMBus(1)
bme280 = BME280(i2c_dev=bus)

# Void function to swallow unwanted output
def _void_f(*args,**kwargs):
pass

# Global container that keeps track of active signals
signals = []

# Check if activate signal from start
if len(sys.argv) > 1:
signals = [s for s in sys.argv[1:]]

running = True

# Thread listening to standard in for adding/removing signals
def thread_function(name):
for line in sys.stdin:
print("line: "+line, file=sys.stderr)
if line:
if(line[0] == "+" and line[1:len(line):1] not in signals):
signals.append(line[1:len(line)-1:1])
elif(line[0] == "-"):
signals.remove(line[1:len(line)-1:1])
running = False
quit()

# Start listening on standard in
x = threading.Thread(target=thread_function, args=(1,))
x.start()

dt = 0.02 # How often inner loop is executed
tick = 0
while running:
try:
t_start = time.time()
if len(signals) == 0:
time.sleep(1)
if tick % 5 == 0 and "t" in signals:
temp = bme280.get_temperature()
print("%s,%f,%s" % ("t",time.time(), temp))
if tick % 10 == 0 and "h" in signals:
humidity = bme280.get_humidity()
print("%s,%f,%s" % ("h",time.time(), humidity))
if tick % 10 == 0 and "p" in signals:
press = bme280.get_pressure()
print("%s,%f,%s" % ("p",time.time(), press))
t_end = time.time()
t_left = dt - (t_end - t_start)
tick = (tick+1) % 21
if tick % 4 == 0: # Flush every 4th loop
sys.stdout.flush()
if t_left > 0:
time.sleep(t_left)
except KeyboardInterrupt:
pass
except Exception:
sys.stdout.write = _void_f
sys.stdout.flush = _void_f
sys.stderr.write = _void_f
sys.stderr.flush = _void_f
quit()
pass

quit()

The OSQL code

Below is the OSQL code for setting up the signals and interaction with the data generator through a pipe pump.

It starts with registering the pipe pump with _pp:register_flow.

info

The file bme.py, which is the data generator, needs to be located in the directory where this OSQL code is executed. If bme.py is located elsewhere the file:current_folder() needs to be replaced with the path to bme.py.

It then defines a new signal type pi0bme:signal which inherits from the Signal type.

After that we have the function that receives the data for a specific signal from the pipe pump bme by subscribing to the stream with subscribe and emits the values as a stream of timestamped vectors.

Below the streaming function we have the signal definitions. It instantiates three different signals: "temperature", "humidity", and "pressure". Each with the corresponding pipe pump signal names "t", "h", and "p", which the data generator listens for in bme.py.

Finally we specify that calling ts_signal_stream, which is SA Engine's built-in interface to signal streams, for any of our new signals will use bme_stream.

_pp:register_flow("bme","python3 "+file:current_folder()+"bme.py", "basic");

create type pi0bme:signal under Signal;

create function bme_stream(pi0bme:signal s) -> Stream of Timeval of Vector of Number
as select Stream of ts(timeval(ts),value)
from Charstring row, Vector data, Number ts,
Charstring flow, vector of Number value
where flow = options(s)["flow"]
and row in subscribe("bme:"+flow)
and data = csv:from_string(row)
and value = skip(data,2)
and ts = data[2];

create pi0bme:signal (name, options)
instances
("temperature", {"flow": "t"}),
("humidity", {"flow": "h"}),
("pressure", {"flow": "p"});

for each pi0bme:signal s {
set ts_signal_stream(s) = bme_stream(s);
};

Testing the pipe pump

Once the data generator script bme.py and the OSQL code have been deployed to the device (preferrably as a model by using the steps described earlier) you can use the regular functions for reading data from signals.

info

All commands in this section should be executed on the device. See the SA Studio User Guide if you are unsure how to run queries on an edge device.

Check that we have all three signals set up by calling signals on the device:

signals();

The output should be:

"temperature"
"humidity"
"pressure"

Get the data stream from the temperature sensor by calling signal_stream on the device:

signal_stream("temperature");

The output should be a stream of temperature values:

[23.5265147300495]
[22.5084841936507]
[22.5084841936507]
[22.5084841936507]
[22.5084841936507]
...

Get the timestamped data stream from the temperature sensor by calling ts_signal_stream on the device:

ts_signal_stream("temperature");

The output should be a stream of timestamped temperature values:

ts(|2022-10-26T09:38:09.175Z|, [22.5354412375495])
ts(|2022-10-26T09:40:04.427Z|, [22.7606121603798])
ts(|2022-10-26T09:40:04.528Z|, [22.7606121603798])
ts(|2022-10-26T09:40:04.549Z|, [22.7606121603798])
ts(|2022-10-26T09:40:04.650Z|, [22.7606121603798])
...

Example 2: A sensor emulator

Sometimes a sensor is not always available during model development but there is sensor data available in a file. In that case it is easy to read the file directly from OSQL as described in Recorded data. But if it is known that the real sensor will be integrated using a pipe pump, creating a file-reading data generator will enable swapping between real and recorded data by changing just one line of code.

Pipe pump without sensors where the data generator reads data from a file. It should be used to temporarily replace a data generator that reads the real sensors.

The sensor emulator can be tested on any linux machine. Create a model and add the data generator, the osql master file and the testdata to it.

The test data file

The testdata file testdata.csv contains random timestamped values, belonging to a signal a or b.

"2024-06-26T06:02:15.978Z","b",-1.32799459447763
"2024-06-26T06:02:16.085Z","a",3.48642470186872
"2024-06-26T06:02:16.186Z","a",4.87943238399468
"2024-06-26T06:02:16.279Z","a",0.0798203714940103
"2024-06-26T06:02:16.386Z","b",-3.16175128771392
"2024-06-26T06:02:16.493Z","b",-0.136918056458919
...

The data generator

The data generator script dg-ab.py opens a csv file and sends the rows to standard out, when subscriptions are listed. When reaching the end of file the script will start over by closing and opening the file again.

#!/usr/bin/python3
import sys
import time
import threading
from typing import List


# Void function to swallow unwanted output
def _void_f(*args, **kwargs):
pass

# Ensure that the path points to the testdata file
csv_file: str = '/SA/models/sa-pipepump/testdata.csv'

# Global container that keeps track of active signals
signals: List[str] = []
running: bool = True


# Thread listening to standard in for adding/removing signal subscriptions
def subscription_thread_function(name):
global signals
global running
for line in sys.stdin:
print("line: "+line, file=sys.stderr)
if line:
if line[0] == "+" and line[1:len(line):1] not in signals:
signals.append(line[1:len(line)-1:1])
elif line[0] == "-":
signals.remove(line[1:len(line)-1:1])
running = False
quit()


def main():
global csv_file
global signals
global running

# Check if activate signal from start
# This is how the first subscription is sent
if len(sys.argv) > 1:
signals = [s for s in sys.argv[1:]]

running = True

# Start subuscription thread listening on standard in
x = threading.Thread(target=subscription_thread_function, args=(1,))
x.start()

f = open(csv_file, 'r', encoding="utf-8")

dt = 0.02 # How often inner loop is executed
tick = 0
while running:
try:
t_start = time.time()

# Read next line in csv test data file
line: str = f.readline()
print("csv line: "+line, file=sys.stderr)
# loop file reading after reaching end of file
if line == '':
f.close()
f = open(csv_file, 'r', encoding="utf-8")
line: str = f.readline()

# parse file
sig_t, sig_name, sig_val = line.replace("\n", "").split(",")
sig_name = sig_name.replace("\"", "")

# check for subscriptions
if len(signals) == 0:
time.sleep(1)
if sig_name in signals:
# write to standard out
print("%s,%s,%s" % (sig_name,sig_t, sig_val))

t_end = time.time()
t_left = dt - (t_end - t_start)
tick = (tick+1) % 21
if tick % 4 == 0: # Flush every 4th loop
sys.stdout.flush()
if t_left > 0:
time.sleep(t_left)
except KeyboardInterrupt:
pass
except Exception:
sys.stdout.write = _void_f
sys.stdout.flush = _void_f
sys.stderr.write = _void_f
sys.stderr.flush = _void_f
f.close()
quit()
pass
f.close()


if __name__ == '__main__':
main()
quit()

The OSQL code

The OSQL file master.osql register a flow that starts the data generator and defines a minimal data stream wrapper for signals a and b.

// Register the pipe pump
_pp:register_flow("ab_sensor","python3 "+models:folder("sa-pipepump")+"dg-ab.py", "restart");

// Create the signal interface
create type ab:signal under Signal;

create ab:signal(name, doc)
instances ("a", "The a readings"), ("b", "The readings from b");

// Function to subscribe to signal flows and parse values and timestamp
// assuming signal format: sig_name,sig_t, sig_val
create function ts_subscribe_stream(ab:signal signal) -> Stream of Timeval
as select Stream of ts(parse_iso_timestamp(time), value)
from Charstring row, charstring name, charstring time, charstring value
where row in subscribe("ab_sensor:"+name(signal))
and [name, time, value] = csv:from_string(row);

// Map function to the signal interface
for each ab:signal s {
set ts_signal_stream(s) = ts_subscribe_stream(s);
};

Testing the pipe pump

Assuming the files above have been added to an OSQL model named sa-pipepump, the pipe pump can be deployed to a linux edge using

models:deploy( ["linux-edge"], "sa-pipepump");

Test the interface by listing the signals

//peer: linux-edge
signals();

Subscribe to any of the listed signals with

//peer: linux-edge
ts_signal_stream("a");