Skip to main content

Setting up custom sensors

SA Engine allows for many different and versatile ways of injesting data streams from sensors. The most straightforward way read an external data stream is to use a pipe pump.

Pipe pump description

A pipe pump will start a separate subprocess and read the subprocess' standard out into SA Engine one row at a time. This means that each row printed to the standard out from the subprocess becomes one element emitted by the subscribe inside SA Engine.

Registering and deregistering a pipe pump

When registering a pipe pump in SA Engine you bind a command to run to a flow namespace using the function _pp:register(Charstring ns, Charstring command).

For example, to register a pipe pump named mypipe which uses a shell script as data generator:

_pp:register("mypipe"," -arg1 -arg2");

This will register a pipe pump with namespace mypipe.

A pipe pump can be deregistered using _pp:deregister(Charstring name).

So, to deregister the pipe pump mypipe:


Subscribing to and unsubscribing from signals

The subprocess producing the data should listen for new subscription and unsubscription events on its standard input. A subscription is activated by receiving the string "+<signal_name>\n". When the suprocess receives a message like this it should begin to produce values from the signal with name <signal_name>.

In SA Engine, subscription events are sent with the subscribe("<namespace>:<signal_name>") command.

For example, subscribing to a signal named mysignal on mypipe:


This sends the string "+mysignal\n" to standard input of the subprocess bound to the pipe pump mypipe.

A subscription is deactivated (unsubscribed) by receiving the string "-<signal_name>\n". This will inversely tell the subprocess to stop procuding values for the given signal.

In SA Engine, unsubscription events are sent automatically when the continuous query is stopped.

Signal output format

Each row procuded by the subprocess is expected to start with "<signal_name>,". The rest of the line should be a list of comma-separated values.

For example, if the subprocess has a signal temp with time and temperature values it should print rows to standard out that has the following format:


Protocol overview

The following sequence diagram describes the protocol:

Pipe pump protocol

NOTE If you are going to use the pipe pump from a local terminal it is imperative that you ignore SIGINT.

Example: BME280 sensor on Raspberry Pi

This is an example showing how you can set up a BME280 humidity sensor that provides temperature, pressure, and humidity signals on a Raspberry Pi.

BME280 on a Raspberry Pi Zero

You can try this out yourself if you have a Raspberry Pi and a BME280 breakout. The code below also requires the BME280 python library to be installed.

The easiest way to setup and deploy the data generator and OSQL code to the device is to:

  1. Create a new model (the SA Studio User Guide describes how to create and deploy models).
  2. Add the data generator script to the model folder.
  3. Add the OSQL code for setting up the sensors and pipe pump to master.osql.
  4. Deploy the model onto the device (see SA Studio User Guide).

The data generator

Here we have a data generator script which is a Python program that follows the specification of the pipe pump protocol.

The code spawns a thread thread_function that listens to standard in and adds a signal to the global signals list whenever a new string "+<signal_name>\n" is written on standard in. It also removes any signal whenever a new string "-<signal_name>\n" is received.

The main loop then checks if there are any known signals in the global signals list. If a known signal is present ("t" for temperature, "h" for humidity, or "p" for pressure) then it reads the value from the sensor and prints it to standard out on the format "<signal_name>,<time>,<value>".

import sys
import time
import select
import threading
import signal

# Imports for the BME280
from smbus import SMBus
from bme280 import BME280

# Initialise the BME280
bus = SMBus(1)
bme280 = BME280(i2c_dev=bus)

# Void function to swallow unwanted output
def _void_f(*args,**kwargs):

# Global container that keeps track of active signals
signals = []

# Check if activate signal from start
if len(sys.argv) > 1:
signals = [s for s in sys.argv[1:]]

running = True

# Thread listening to standard in for adding/removing signals
def thread_function(name):
for line in sys.stdin:
print("line: "+line, file=sys.stderr)
if line:
if(line[0] == "+" and line[1:len(line):1] not in signals):
elif(line[0] == "-"):
running = False

# Start listening on standard in
x = threading.Thread(target=thread_function, args=(1,))

dt = 0.02 # How often inner loop is executed
tick = 0
while running:
t_start = time.time()
if len(signals) == 0:
if tick % 5 == 0 and "t" in signals:
temp = bme280.get_temperature()
print("%s,%f,%s" % ("t",time.time(), temp))
if tick % 10 == 0 and "h" in signals:
humidity = bme280.get_humidity()
print("%s,%f,%s" % ("h",time.time(), humidity))
if tick % 10 == 0 and "p" in signals:
press = bme280.get_pressure()
print("%s,%f,%s" % ("p",time.time(), press))
t_end = time.time()
t_left = dt - (t_end - t_start)
tick = (tick+1) % 21
if tick % 4 == 0: # Flush every 4th loop
if t_left > 0:
except KeyboardInterrupt:
except Exception:
sys.stdout.write = _void_f
sys.stdout.flush = _void_f
sys.stderr.write = _void_f
sys.stderr.flush = _void_f


The OSQL code

Below is the OSQL code for setting up the signals and interaction with the data generator through a pipe pump.

It starts with registering the pipe pump with _pp:register_flow.


The file, which is the data generator, needs to be located in the directory where this OSQL code is executed. If is located elsewhere the file:current_folder() needs to be replaced with the path to

It then defines a new signal type pi0bme:signal which inherits from the Signal type.

After that we have the function that receives the data for a specific signal from the pipe pump bme by subscribing to the stream with subscribe and emits the values as a stream of timestamped vectors.

Below the streaming function we have the signal definitions. It instantiates three different signals: "temperature", "humidity", and "pressure". Each with the corresponding pipe pump signal names "t", "h", and "p", which the data generator listens for in

Finally we specify that calling ts_signal_stream, which is SA Engine's built-in interface to signal streams, for any of our new signals will use bme_stream.

_pp:register_flow("bme","python3 "+file:current_folder()+"", "basic");

create type pi0bme:signal under Signal;

create function bme_stream(pi0bme:signal s) -> Stream of Timeval of Vector of Number
as select Stream of ts(timeval(ts),value)
from Charstring row, Vector data, Number ts,
Charstring flow, vector of Number value
where flow = options(s)["flow"]
and row in subscribe("bme:"+flow)
and data = csv:from_string(row)
and value = skip(data,2)
and ts = data[2];

create pi0bme:signal (name, options)
("temperature", {"flow": "t"}),
("humidity", {"flow": "h"}),
("pressure", {"flow": "p"});

for each pi0bme:signal s {
set ts_signal_stream(s) = bme_stream(s);

Testing the pipe pump

Once the data generator script and the OSQL code have been deployed to the device (preferrably as a model by using the steps described earlier) you can use the regular functions for reading data from signals.


All commands in this section should be executed on the device. See the SA Studio User Guide if you are unsure how to run queries on an edge device.

Check that we have all three signals set up by calling signals on the device:


The output should be:


Get the data stream from the temperature sensor by calling signal_stream on the device:


The output should be a stream of temperature values:


Get the timestamped data stream from the temperature sensor by calling ts_signal_stream on the device:


The output should be a stream of timestamped temperature values:

ts(|2022-10-26T09:38:09.175Z|, [22.5354412375495])
ts(|2022-10-26T09:40:04.427Z|, [22.7606121603798])
ts(|2022-10-26T09:40:04.528Z|, [22.7606121603798])
ts(|2022-10-26T09:40:04.549Z|, [22.7606121603798])
ts(|2022-10-26T09:40:04.650Z|, [22.7606121603798])