Piping streams with dataflows
Guide specification | |
---|---|
Guide type: | Wasm code |
Requirements: | None |
Recommended reading: | None |
Introduction
To pass streams and data between different processes, they can be published on flows. A flow is a named queue in the SA Engine internal database where we can publish and subscribe to data.
This is useful for example:
- if you have several sensors with different integration strategies and want to collect data from all into a model
- if you have edges in a federation working independently but want to be able to listen in from a client from time to time
- if you want to create plots that combine several streams
- if you want to do several things with the same data
Publish and subscribe to a stream
To listen to a flow, use the subscribe
function.
subscribe("my_flow");
Start the subscription query and leave it running. Now, to publish to the flow use the publish
function with the same flow name.
publish(timeout(simstream(0.5), 4), "my_flow");
To use the result only in the subscriber, use a sink
to throw away the result after publishing it.
sink(publish(timeout(simstream(0.5), 4), "my_flow"));
A subscription to a flow will get the values that are published after the subscription is started. Values that are published to a flow with no subscriber will not be received anywhere. However, the latest value is always saved, so a subscriber that enters a flow that has had any values in the past will receive a first value immediately.
Experiment with starting and stopping the subscriber and publisher above in different orders.
Multiple listeners
It is possible to have several subscribers to the same flow. For example, lets define some listener functions that do different manipulations on "r_flow".
create function listener_1() -> Real
as select r * 3.0 + 10.0
from Real r
where r in subscribe("r_flow");
create function listener_2() -> Real
as select y
from Real r, Real y
where r in subscribe("r_flow")
and y = r*r - sin(r);
Now, start the listeners and plot the results.
//plot: Line plot
listener_1();
//plot: Line plot
listener_2();
When results are being published to "r_flow", both listener plots will start showing results.
create function publish_my_stream() -> Stream
as publish(timeout(simstream(0.5),10),"r_flow");
sink(publish_my_stream());
Notice that the listeners automatically stops when the timeout function closes the publishing stream.
Publishing on several flows
Emit on flow
The function emit_on_flow
can be used inside stream queries to publish intermediate results.
It can also be used to send valus that are not streams, or to publish on multiple streams from the same query.
create function mystreams() -> stream of boolean
as select stream of True
from Integer i, Integer f1, Integer f2
where i in heartbeat(1)
and f1 = emit_on_flow(i,"f1")
and f2 = emit_on_flow(1/(i+1),"f2")
limit 9;
//plot: Line plot
subscribe("f1");
//plot: Line plot
subscribe("f2");
sink(mystreams());
Notice that even though mystreams()
stop after 9 values, the listeners do not stop since emit_on_flow
does not close the publishing stream.
Merge streams
When subscribing to several flows, it is possible to merge them into one single stream of values.
create function mystreams() -> stream of boolean
as select stream of True
from Integer i, Charstring f1, Real f2, Integer f3
where i in heartbeat(1)
and f1 = emit_on_flow(stringify(i),"f1")
and f2 = emit_on_flow(1/(i+1),"f2")
and f3 = emit_on_flow(i*10 - i,"f3")
limit 9;
subscribe:merge(["f1","f2","f3"]);
sink(mystreams());
With subscribe:merge_tagged
you get a "bus stream" - the values are put into vectors with the flow name and a timestamp.
(The timestamp is rnow() format, seconds since last epoch.
See Temporal functions)
subscribe:merge_tagged(["f1","f2","f3"]);
sink(mystreams());
Subscribing to flows on edges
When data is being published on an edge, it is also possible to subscribe to it from a client using subscribe
and adding the edge name using @edge
.
For example, the following will subscribe to the flow my-flow on my-edge.
subscribe('my-flow@my-edge');
This is similar to running the subscription on the edge, like below, but since the query is executed on the server it can continue running even if the connection to the edge is temporarily lost.
//peer: my-edge
subscribe('my-flow');
The subscribe:merge
and subscribe:pivot
commands can be used to combine flows from several edges.
For example, the following will subscribe to the flow f1 on my-edge as well as on f2 on my-other-edge.
subscribe:merge(["f1@my-edge", "f2@my-other-edge"]);
While subscribe:merge
merges the two flows into one stream, subscribe:pivot
creates a stream of vector, where the vector contains values from all streams.
Further reading
Reference documentation of dataflow functions: Dataflow functions
Using dataflows to create multiplots of several streams: Advanced visualization