Skip to main content

Streams

This page uses Wasm code blocks so you can run the examples directly in the browser.

A stream query is a query that returns a stream as result. Stream queries continuously produce values in real-time, so they are also called continuous queries.

In the simplest case is is an expression returning a stream.

For example, the following stream query returns a stream of continuously produced numbers. You can stop a continuous query by pushing the stop button.

heartbeat(0.5) 

You can apply computational functions on streams. The applied function will then be applied on each element of the stream.

For example:

sin(heartbeat(0.5)) 

The result can be visualized in real time, for example:

//plot: line plot
sin(10*heartbeat(0.01))

The select Stream of syntax provides a very flexible way of definiing new streams out of other ones, e.g.

select Stream of x
from Real x
where x in heartbeat(0.5)
and mod(x,1) < 0.1

You can combine streams using system functions taking more than one stream as argument.

For example, the function merge(Stream s1,Stream s2)->Stream combines the elements of two streams as they arrive:

select Stream of x
from Real x
where x in merge(heartbeat(0.5), heartbeat(0.6))

Stream windows​

SA Engine has a number of functions that constructs windows over input streams called window forming functions. The windows are represented as vectors, so the window forming functions convert an input stream of elements to a stream of vectors of elements. The function winagg() is a count-based window function and the function twinagg() is a time-based window function, as explained next.

Count windows​

The window forming function winagg(Stream s,Number size,Number stride)->Stream of Vector creates a count-based stream of windows from an input stream s. You specify the number of stream elements that each window contains (the window size) and how many stream elements the window moves forward before emitting the next window (the stride).

For example, to create non-overlapping windows (tumbling_ windows) you provide the same value for size and stride. Try it by running the following query (remember that heartbeat(s) generates a stream of seconds emitted at a given pace s, so it takes 5s for the query to output any result):

winagg(heartbeat(1), 5, 5);

You can read the query as "emit the 5 latest elements every 5th element". The figure below illustrates the behavior of this query:

winagg_1_5-5_drop.png

It is not required to keep all the values from the input stream. You can specify a stride that is larger than the window size. Doing this will sample the stream every stride element. Try it by running the following query:

winagg(heartbeat(1), 4, 6);

You can read the query as "emit the 4 latest elements every 6th element". It effectively skips two values between every window, which is illustrated in the figure below:

winagg_2_4-6_drop.png

If you set a stride value that is lower than the window size you get overlapping windows (sliding window). Try it by running the following query (it takes 10s for the query to output any result):

winagg(heartbeat(1), 10, 1);

The query produces a 10-element window every time the input stream emits a new element, which is illustrated in the figure below:

winagg_5_10-1_drop.png

Temporal windows​

The window forming function twinagg() is a time-based window function that creates a stream of windows from an input stream of timestamped values. It works much like winagg() but instead of specifying window size and stride in number of elements you specify it in seconds.

We will use a custom stream to illustrate how twinagg() works. The function x() below outputs a stream of timestamped values. Create the function by running the query:

create function x() -> stream of timeval
as select stream of tt
from timeval tt
where tt in [ts(|2022-05-12T00:00:00.0Z|,16),
ts(|2022-05-12T00:00:00.3Z|,4),
ts(|2022-05-12T00:00:00.4Z|,8),
ts(|2022-05-12T00:00:00.5Z|,2),
ts(|2022-05-12T00:00:00.7Z|,1),
ts(|2022-05-12T00:00:00.8Z|,7),
ts(|2022-05-12T00:00:01.0Z|,3),
ts(|2022-05-12T00:00:01.1Z|,6),
ts(|2022-05-12T00:00:01.2Z|,9),
ts(|2022-05-12T00:00:01.3Z|,5),
ts(|2022-05-12T00:00:01.5Z|,12),
ts(|2022-05-12T00:00:01.6Z|,11)];

If we illustrate the value stream on a timeline it looks like this:

twinagg_stream_drop.png

To create a non-overlapping time window (a tumbling window) you provide the same value for size and stride. Try it by running the following query:

twinagg(x(), 0.3, 0.3);

The figure below illustrates how the tumbling window passes over the stream elements:

twinagg_1_drop.png

Just like with winagg(), twinagg() can also skip elements by specifying a stride that is larger than the window size, or use a sliding window by specifying a stride that is smaller than the window size.

Here is an example where window size is larger than the stride. Try it by running the query:

twinagg(x(), 0.6, 0.3);

The query produces a 0.6 second window every 0.3 seconds, which is illustrated in the figure below:

twinagg_2_drop.png

Functions​

Stream functions

Window functions

Dataflow functions

Sensor ontology functions