Skip to main content

Piping streams with dataflows

Guide specification
Guide type:Wasm code
Requirements:None
Recommended reading:None

Introduction

To pass streams and data between different processes, they can be published on flows. A flow is a named queue in the SA Engine internal database where we can publish and subscribe to data.

This is useful for example:

  • if you have several sensors with different integration strategies and want to collect data from all into a model
  • if you have edges in a federation working independently but want to be able to listen in from a client from time to time
  • if you want to create plots that combine several streams
  • if you want to do several things with the same data

Publish and subscribe to a stream

To listen to a flow, use the subscribe function.

subscribe("my_flow");

Start the subscription query and leave it running. Now, to publish to the flow use the publish function with the same flow name.

publish(timeout(simstream(0.5), 4), "my_flow");

To use the result only in the subscriber, use a sink to throw away the result after publishing it.

sink(publish(timeout(simstream(0.5), 4), "my_flow"));

A subscription to a flow will get the values that are published after the subscription is started. Values that are published to a flow with no subscriber will not be received anywhere. However, the latest value is always saved, so a subscriber that enters a flow that has had any values in the past will receive a first value immediately.

Exercise

Experiment with starting and stopping the subscriber and publisher above in different orders.

Multiple listeners

It is possible to have several subscribers to the same flow. For example, lets define some listener functions that do different manipulations on "r_flow".

create function listener_1() -> Real
as select r * 3.0 + 10.0
from Real r
where r in subscribe("r_flow");

create function listener_2() -> Real
as select y
from Real r, Real y
where r in subscribe("r_flow")
and y = r*r - sin(r);

Now, start the listeners and plot the results.

//plot: Line plot
listener_1();
//plot: Line plot
listener_2();

When results are being published to "r_flow", both listener plots will start showing results.

create function publish_my_stream() -> Stream
as publish(timeout(simstream(0.5),10),"r_flow");

sink(publish_my_stream());

Notice that the listeners automatically stops when the timeout function closes the publishing stream.

Publishing on several flows

Emit on flow

The function emit_on_flow can be used inside stream queries to publish intermediate results. It can also be used to send valus that are not streams, or to publish on multiple streams from the same query.

create function mystreams() -> stream of boolean
as select stream of True
from Integer i, Integer f1, Integer f2
where i in heartbeat(1)
and f1 = emit_on_flow(i,"f1")
and f2 = emit_on_flow(1/(i+1),"f2")
limit 9;
//plot: Line plot
subscribe("f1");
//plot: Line plot
subscribe("f2");
sink(mystreams());

Notice that even though mystreams() stop after 9 values, the listeners do not stop since emit_on_flow does not close the publishing stream.

Merge streams

When subscribing to several flows, it is possible to merge them into one single stream of values.

create function mystreams() -> stream of boolean
as select stream of True
from Integer i, Charstring f1, Real f2, Integer f3
where i in heartbeat(1)
and f1 = emit_on_flow(stringify(i),"f1")
and f2 = emit_on_flow(1/(i+1),"f2")
and f3 = emit_on_flow(i*10 - i,"f3")
limit 9;
subscribe:merge(["f1","f2","f3"]);
sink(mystreams());

With subscribe:merge_tagged you get a "bus stream" - the values are put into vectors with the flow name and a timestamp. (The timestamp is rnow() format, seconds since last epoch. See Temporal functions)

subscribe:merge_tagged(["f1","f2","f3"]);
sink(mystreams());

Subscribing to flows on edges

When data is being published on an edge, it is also possible to subscribe to it using subscribe with the edge name as an argument. This query must be executed on the peer that manages the edge, that is the nameserver or an edge manager depending on federation setup.

subscribe('my-edge', 'my-flow');

This is similar to running the subscription on the edge, like below, but since the query is in the server it can continue running even if connection to the edge is temporarily lost.

//peer: my-edge
subscribe('my-flow');

The subscribe:merge and subscribe:pivot commands can be used to subscribe on edges using @edge. For example, the following will subscribe to the flow my-flow on my-edge. Queries with the @edge notation can be executed locally in the client.

subscribe:merge(["my-flow@my-edge"]);

Its possible to combine flows from several edges, like in the following query.

subscribe:merge(["f1@my-edge", "f2@my-other-edge"]);

Further reading

Reference documentation of dataflow functions: Dataflow functions

Using dataflows to create multiplots of several streams: Advanced visualization