Skip to main content

Piping streams with data flows

Guide specification
Guide type:Wasm code
Requirements:None
Recommended reading:None

Introduction

To pass streams and data between different processes, they can be published on flows. A flow is a named queue in the SA Engine internal database where we can publish and subscribe to data.

This is useful for example:

  • if you want to fork a data stream to enable multiple processing pipelines
  • if you want to send out debug information somewhere inside a processing pipeline
  • if you have several sensors with different integration strategies and want to collect data from all into a model
  • if you have edges in a federation working independently but want to be able to listen in from a client from time to time
  • if you want to create plots that combine several streams

Publish and subscribe to a stream

You can publish a data stream to a flow with the publish(<stream>, <flow-name>) function. And you can receive a data stream from a flow with the function subscribe(<flow-name>).

For example, start listening to the flow "my_flow" with the subscribe function and keep it running.

subscribe("my_flow");

Now, publish a data stream to the flow with the publish function.

sink(publish(timeout(simstream(0.5), 4), "my_flow"));

We see that the data stream is transmitted on the flow and received by the subscribe query above.

The figure below illustrates how the data stream in the example above is forked by publishing the stream on the flow "my_flow".

flow05

Forking data stream with publish().

A subscription to a flow will get the values that are published after the subscription is started. Values that are published to a flow with no subscriber will not be received anywhere. However, the latest value is always saved, so a subscriber that enters a flow that has emitted values in the past will receive the last emitted value on the flow.

Exercise

Experiment with starting and stopping the subscriber and publisher above in different orders.

Multiple subscribers

It is possible to have several subscriptions to the same flow.

For example, lets define some listener functions that do different manipulations on a flow named "r_flow".

create function listener_1() -> Real
as select r * 3.0 + 10.0
from Real r
where r in subscribe("r_flow");

create function listener_2() -> Real
as select y
from Real r, Real y
where r in subscribe("r_flow")
and y = r*r - sin(r);

Now, start the listeners and plot the results, and keep the plots running.

//plot: Line plot
listener_1();
//plot: Line plot
listener_2();

Now when we publish a stream to "r_flow" we will see that both listeners will show the published data stream.

sink(publish(timeout(simstream(0.5),10),"r_flow"));

The figure below illustrates how the data stream in the example is forked and picked up by two separate subscribers.

flow06

Forked data stream with multiple subscribers.

Publish from queries

publish() takes a stream as input and returns a stream, and the returned stream is the same as the one published on the flow.

This means that we can use publish() to fork data streams in queries.

For example, the following query both publishes the data from simstream() on the flow "fs" and outputs the same stream.

select publish(simstream(0.5), "fs") limit 5;

The following query publishes the data from simstream() on the flow "fs" and then does some calculations on the stream.

select ks
from stream of real ks,
stream of real ss
where ss = publish(simstream(0.5), "fs")
and ks = ss * 0.5
limit 5;

Publish must be part of query pipeline

Note however that publish() must be a part of the processing pipeline in the query. That means that the output of publish() must affect the query result. Otherwise the call to publish() will be removed by the query optimizer (since it is not contributing to the query result) and no values will be published on the flow.

For example, start a subscriber for the flow "fs".

subscribe("fs");

Now run the following query where publish() is not part of the processing pipeline, and you will see that the subscribe() above will not output any values.

select ks
from stream of real ks,
stream of real ss,
stream of real f
where ss = simstream(0.5)
and f = publish(ss, "fs")
and ks = ss * 0.5
limit 5;

This is because the call to publish() is bypassed in the query and will therefore be removed by the query optimizer. The figure below shows what we mean by this.

flow01

No data is published on flow because publish is not part of data pipeline.

To remedy this we need to include the result from publish() as part of the data pipeline.

For example, start a subscriber for the flow "fs".

subscribe("fs");

Now run the following query where publish() this time IS part of the processing pipeline, and you will see that the subscribe() above will output the published values.

select ks
from stream of real ks,
stream of real ss,
stream of real f
where ss = simstream(0.5)
and f = publish(ss, "fs")
and ks = f * 0.5
limit 5;

The figure below shows the data flow of the new query.

flow02

The data is now published on the flow because publish is now part of data pipeline.

Transforming data on flows

The values in the output data stream from publish() are the same as the data published on the flow. This means that any transformations you want to do on the data published will have to be done on the receiver end, otherwise it will affect the query result.

For example, let's say we want to multiply each value in the published stream by two and write the following query.

select ks
from stream of real ks,
stream of real ss,
stream of real f
where ss = simstream(0.5)
and f = publish(ss * 2, "fs")
and ks = f * 0.5
limit 5;

This will publish the from simstream() multiplied by two on the flow "fs", BUT the output f from publish() will now also be the values multiplied by 2 and thus change the query result (compared to the query without * 2 in the previous section).

flow03

Transforming the data published on the flow also changes the output of publish().

This can of course be the desired outcome from this query and in that case this is fine. But if we do not want the transformation of the data to affect the query result, then we have to apply the transformation at the subscriber.

For example, for the above case we can start the following subscriber (the cast() simply tells the compiler that the the subscribed stream is a stream of Real).

cast(subscribe("fs") as stream of real) * 2;

Now we can publish the stream unaltered and the subscribe query above will handle the transformation of the values.

select ks
from stream of real ks,
stream of real ss,
stream of real f
where ss = simstream(0.5)
and f = publish(ss, "fs")
and ks = f * 0.5
limit 5;

The figure below shows the data flow for this scenario.

flow04

Transforming the data published on the flow is done at the receiving end.

Emit on flow

Unofficial

The function emit_on_flow() is in Unofficial state.

"Internal functions used by the system. No guarantee of when they are removed or added. Should not be used unless you know the risks."

The function emit_on_flow(<value>, <flow-name>) can be used to send values that are not streams, or to publish on multiple flows from the same query.

For example, start the following two subscribers and keep them running.

subscribe("f1");
subscribe("f2");

Then we run the following query which emits two different values on two different flows, "f1" and "f2".

sink((select stream of True
from Integer i, Integer f1, Integer f2
where i in heartbeat(1)
and f1 = emit_on_flow(i,"f1")
and f2 = emit_on_flow(1/(i+1),"f2")
limit 9));

Performance compared to publish

It should be noted that using emit_on_flow() comes with a rather large performance penalty compared to publish(). Each call to emit_on_flow() needs to look up the flow publisher and set a bunch of values for each element. Compare this to publish() which only forks an existing stream. This means that using emit_on_flow() will be order of magnitudes slower than using publish().

Merge streams

When subscribing to several flows, it is possible to merge them into one single stream of values with subscribe:merge(<flows>).

Warning

The examples in this section use emit_on_flow() to create multiple flows that can be merged. This is just for illustration purposes since emit_on_flow() is easier to use from a single function. In real scenarios you would already have multiple flows from separate sources (either sensor streams or data streams forked with publish()) and you should refrain from using emit_on_flow() in those scenarios due to the reasons previously stated about the performance of emit_on_flow() compared to publish().

For example, start the following subscriber that merges the three flows "f1", "f2", and "f3", and keep it running.

subscribe:merge(["f1","f2","f3"]);

Now you can run the following query that emits different values on each of the flows and you will see that the values are merged into a single stream by the subscribe:merge() above.

sink((select stream of True
from Integer i, Charstring f1, Real f2, Integer f3
where i in heartbeat(1)
and f1 = emit_on_flow(stringify(i),"f1")
and f2 = emit_on_flow(1/(i+1),"f2")
and f3 = emit_on_flow(i*10 - i,"f3")
limit 9));

With subscribe:merge_tagged(<flows>) you get a "bus stream" - the values are put into vectors with the flow name and a timestamp. (The timestamp is rnow() format, seconds since last epoch. See Temporal functions)

For example, start the following query and keep it running.

subscribe:merge_tagged(["f1","f2","f3"]);

Now when you run the following query you will see that the values are merged into a "bus stream" by the subscribe:merge_tagged() above.

sink((select stream of True
from Integer i, Charstring f1, Real f2, Integer f3
where i in heartbeat(1)
and f1 = emit_on_flow(stringify(i),"f1")
and f2 = emit_on_flow(1/(i+1),"f2")
and f3 = emit_on_flow(i*10 - i,"f3")
limit 9));

Subscribe to flows on edges

When data is being published on an edge, it is also possible to subscribe to it from a client using subscribe and adding the edge name using @edge-name. For example, the following will subscribe to the flow "my-flow" on the edge "my-edge".

subscribe('my-flow@my-edge');

This is similar to running the subscription on the edge, like below, but since the query is executed on the server it can continue running even if the connection to the edge is temporarily lost.

//peer: my-edge
subscribe('my-flow');

Subscribe to print output from edge

On each edge there exists a default flow OUTPUT which contains the output from print(). So by subscribing to the output flow we can get the print() output from any edge connected to the server.

subscribe('OUTPUT@my-edge');

Merge flows from multiple edges

The subscribe:merge and subscribe:pivot commands can be used to combine flows from multiple edges. For example, the following will subscribe to the flow f1 on my-edge as well as on f2 on my-other-edge.

subscribe:merge(["f1@my-edge", "f2@my-other-edge"]);

While subscribe:merge merges the two flows into one stream, subscribe:pivot creates a stream of vector, where the vector contains values from all streams.

Further reading

Reference documentation of dataflow functions: Dataflow functions

Using dataflows to create multiplots of several streams: Advanced visualization