Skip to main content

Publish and subscribe data flows

Guide specification
Guide type:Wasm code
Requirements:None
Recommended reading:None

Introduction

Data streams can be assigned unique names in SA Engine, called flows. Using flows streamed data can be passed between continuous queries in the local database, between SA Engine components, and between SA Engine peers. External systems can be interfaced using a publish-subscribe messaging pattern. The message pattern is used for interfacing SA Engine with distributed systems through Kafka and Azure Event Hub, distributed devices through MQTT, embedded systems through Canbus, Unix pipes, etc.

Examples of uses:

  • if you want to debug running continuous queries

  • if you want to retrieve data in real-time from SA Engine peers and edges in a federation

  • if you want to create graphical web visualizations

  • if you want to split a data stream to enable parallel processing

  • if you want to access real-time data streams from various kinds of sensors

  • if you want to access various kinds of distributed systems

Split and redirect streams

You can publish stream elements to a flow with the function publish(Stream stream, Charstring flow)->Stream. Analogously, you can receive stream elements from a flow with the function subscribe(Chartrsing flow)->Stream.

As an example, let's start listening to the flow named flow0 by calling with the subscribe function. It will not receive any stream elements before flow0 is published.

subscribe("flow0");

Now, publish a synthetic data stream siota(1,4) to flow0 with the publish function.

publish(siota(1,4), "flow0")

We see that in this case the stream is split into two equivalent streams, the one where it is published and another one where it is subscribed.

If you want to redirect the published stream to a flow without splitting it you can call the function sink(stream) that absorbs stream elements of s without sending them further.

For example, let's start another flow named my_flow:

subscribe("my_flow");
sink(publish(siota(1,4), "my_flow"));

The figure below illustrates how the stream in the example above is redirected by publishing it on the flow my_flow in a call to sink().

flow05

Redirecting a data stream into a flow with publish().

Exercise

Experiment with starting and stopping the subscriber and publisher above in different orders. Use simstream(pace) to generate simulated streams.

Elements published to a flow with no subscriber will not be received anywhere.

note

A new subscriber to an active flow where elements already have been published will currently receive a stream of some lingering latest elements in the flow. This is a known bug. In the future the flow will be empty when the subscription is started.

Multiple subscribers

It is possible to have several subscriptions to the same flow.

For example, lets define some listener functions that return different stream transformations on a flow named r_flow.

create function listener_1() -> Stream of Real
as select Stream of r * 3.0 + 10.0
from Real r
where r in subscribe("r_flow");

create function listener_2() -> Stream of Real
as select Stream of y
from Real r, Real y
where r in subscribe("r_flow")
and y = r*r - sin(r);

Now, start the listeners and plot the results, and keep the plots running.

//plot: Line plot
listener_1();
//plot: Line plot
listener_2();

Now when we redirect a stream to r_flow we will see that both listeners will show the published data stream.

sink(publish(timeout(simstream(0.5),10),"r_flow"));

The figure below illustrates how the data stream in the example is redirection and split into two separate transformed subscriber streams.

flow06

Redirected, split and transformed data stream.

Exercise

What happens if you remove the call to sink()?

Publish from queries

publish(Stream s, Charstring flow) -> Stream takes a stream s as input and returns the same stream s, which is also published on the flow. The function can be used in queries.

For example, let's subscribe to flow fs and publish on fs computations over elements of a siota() stream:

subscribe('fs')
select res
from Stream of Integer res,
Stream of Integer base
where base = siota(1,4)
and res = publish(base * 2, 'fs')

Here, the same elements of stream pub are both published on flow fs and returned as the result from the publishing CQ.

Let's slighly change the publishing CQ:

select res
from Stream of Integer res,
Stream of Integer base
where base = publish(siota(1,4), 'fs')
and res = base * 2

In this case the elements 1,2,3,4 were published on flow fs while 2,4,6,8 was returned from the publishing CQ.

Querying subscribed flows

You can use the stream returned from subscribe(Charstring flow)->Stream in queries as any other stream.

Example:

select i * 10
from Integer i
where i in subscribe('fs')
and i > 2
publish(siota(1,4),'fs')

Forcing publications

In should be noted that the stateful function publish(Stream s, Charstring flow)->Stream only registers the flow name of the stream s in the local database. It does not start publishing elements of s on flow. To also publish the elements you can include the result from (computations over) publish() in the result of the query, as in the previous examples. Otherwise the elements of the flow will not be published.

For example, the following CQ will not publish elements on flow fs;

select true
from Stream of Integer base
where base = siota(10,14)
and exists publish(base, 'fs')

The reason for this is that the elements of stream base are not extracted in the query result and therefore publish will not extract and publish elements from siota(10,14).

To enfore publication you can use the function sink(Stream s)->Boolean that extracts all elements from s and then returns true.

This CQ will do the desired publication:

select true
where sink(publish(siota(10,14), 'fs'))

Merge flows

When subscribing to several flows, it is possible to merge them into one single stream of elements with subscribe:merge(flows).

For example, start the following subscriber that merges the three flows "f1", "f2", and "f3", and keep it running.

subscribe:merge(["f1","f2","f3"]);

Now, publish different streams on the subscribed flows with different paces using the synthetic stream generator diota():

publish(diota(0.1,1,3),      'f1');
publish(diota(0.21,11,13), 'f2');
publish(diota(0.15,101,103), 'f3');

The problem with this is that the three successive calls to publish() will publish the three streams in sequence. To publish them in parallel use merge(Vector of Stream)->Stream:

merge([publish(diota(0.1,1,3),      'f1'),
publish(diota(0.21,11,13), 'f2'),
publish(diota(0.15,101,103), 'f3')])

With subscribe:merge_tagged(flows) you get a "bus stream" - the elements are put into time stamped vectors with the flow name and the current time as number of microseconds since epoc.

For example, start the following query and keep it running.

subscribe:merge_tagged(["f1","f2","f3"]);

Now when you run the following query you will see that the elements are merged into a "bus stream" by the subscribe:merge_tagged() above.

merge([publish(diota(0.1,0,2),      'f1'),
publish(diota(0.21,10,12), 'f2'),
publish(diota(0.15,100,102), 'f3')])

Subscribe to flows on edges

When data is being published on an edge, it is also possible to subscribe to it from a client using subscribe and adding the edge name using @edge-name. For example, the following will subscribe to the flow "my-flow" on the edge "my-edge".

subscribe('my-flow@my-edge');

This is similar to running the subscription on the edge, like below, but since the query is executed on the server it can continue running even if the connection to the edge is temporarily lost.

//peer: my-edge
subscribe('my-flow');

Subscribe to print output from edge

On each edge there exists a default flow OUTPUT which contains the output from print(). So by subscribing to the output flow we can get the print() output from any edge connected to the server.

subscribe('OUTPUT@my-edge');

Merge flows from multiple edges

The subscribe:merge and subscribe:pivot commands can be used to combine flows from multiple edges. For example, the following will subscribe to the flow f1 on my-edge as well as on f2 on my-other-edge.

subscribe:merge(["f1@my-edge", "f2@my-other-edge"]);

While subscribe:merge merges the two flows into one stream, subscribe:pivot creates a stream of vector, where the vector contains elements from all streams.

Emit on flow

Unofficial

The function emit_on_flow() is in Unofficial state.

"Internal functions used by the system. No guarantee of when they are removed or added. Should not be used unless you know the risks."

The function emit_on_flow(Object o, Charstring flow) can be used to send a single object o to a flow.

For example, start the following two subscribers and keep them running.

subscribe("f1")
subscribe('f2')

Then we run the following query which emits two different elements on two different flows, "f1" and "f2".

select i
from Integer i
where i in siota(1,10)
and exists emit_on_flow(i,'f1')
and exists emit_on_flow(10*i,'f2')

Performance compared to publish

It should be noted that using emit_on_flow() comes with a rather large performance penalty compared to publish(). Each call to emit_on_flow() needs to look up the flow publisher and set a bunch of values for each element. Compare this to publish() which only splits an existing stream. This means that using emit_on_flow() will be order of magnitudes slower than using publish(). However, it is useful for debugging data streams.

Further reading

Reference documentation of dataflow functions: Dataflow functions

Using dataflows to create multiplots of several streams: Advanced visualization