Streams
A stream is a possibly infinite sequence of elements. It often grows over time at some pace.
Example: The following query returns an infinite stream of the elapsed time every 0.5 seconds.
heartbeat(0.5)
The query is an example of a continuous query (CQ) since it continuously produces objects. You can explicitly stop it pressing the stop_circle button.
You can apply computational functions on streams. The applied function will then be applied on each element of the stream.
Example:
sqrt(heartbeat(0.5))
The result can be visualized in real time.
Example:
//plot: line plot
sin(10*heartbeat(0.01))
The select Stream of syntax provides a very flexible way of
defining new streams out of other ones.
Example:
select Stream of x
from Real x
where x in extract(heartbeat(0.5))
and mod(x,1) < 0.1
The syntax e in extract(s) extracts elements e from the stream s
so they can be used in select queries.
You can stop an infinite CQ after a specific number of elements by
specifying a limit in a select query.
Example:
select heartbeat(0.125) limit 5
You can also stop a CQ after a specific number of seconds by using the
timeout(s,limit) function.
Example:
timeout(heartbeat(0.125),0.9)
You can assign a stream to a variable without running it.
Example:
set :s = heartbeat(0.5)
This runs the stream object bound to session variable :s:
:s
You can combine streams using system functions taking more than one stream as argument.
Example: the function merge(Stream s1,Stream s2)->Stream combines
the elements of two streams as they arrive producing a merged stream:
merge(heartbeat(0.5), heartbeat(0.6))
You can produce vectors (pairs) from the stream and plot them as 2D-points in a scatter plot.
Example:
//plot: Scatter plot
select Stream of [x,y]
from Real x, Real y
where x in heartbeat(0.1)
and y = x * sin(5*x)
You can make your own simulated sine stream function of coordinate pairs [x,y].
Example:
create function my_stream() -> Stream of Vector of Real
as select Stream of [x,y]
from Real x, Real y
where x in heartbeat(0.1)
and y = x * sin(5*x)
//plot: Scatter plot
my_stream()
We can also make a colored scatter plots of my_stream.
//plot: Multi plot
{
"sa_plot": "Scatter plot",
"color_axis": 2 -- Plot each different Y with different color
};
my_stream()
Stream windows
Windows forming functions continuously construct windows over a
stream. The windows are represented as vectors, so the window forming
functions convert an input stream of elements to a stream of vectors
of elements. The function winagg() is a count-based window function
and the function twinagg() is a time-based window function, as
explained next.
Count windows
The window forming function winagg(Stream s,Number size,Number
stride)->Stream of Vector creates a count-based stream of
windows from an input stream s. You specify the number of stream
elements that each window contains (the window size) and how many
stream elements the window moves forward before emitting the next
window (the stride).
To create a tumbling window (a non-overlapping window) you provide the same value for size and stride.
Example: The following query produces a tumbling window.
winagg(heartbeat(1), 5, 5)
Since heartbeat(1) generates a stream element per second, it takes
5s for the query to output the first result. You can read the query as
"emit the 5 latest elements every 5th element". The figure below
illustrates the behavior of this query:

It is not required to keep all the values from the input stream. You can specify a stride that is larger than the window size. Doing this will sample the stream every stride element.
Example:
winagg(heartbeat(1), 4, 6)
You can read the query as "emit the 4 latest elements every 6th element". It effectively skips two values between every window, which is illustrated in the figure below:

You get sliding windows having overlapping values by specifying a stride value that is lower than the window size.
Example:
winagg(heartbeat(1), 10, 1)
The query produces a 10-element window every time the input stream emits a new element, which is illustrated in the figure below:

Temporal windows
The window forming function twinagg() is a time-based window
function that creates a stream of windows from an input stream of
time stamped values. It works much like winagg() but instead of
specifying window size and stride in number of elements you specify it
in seconds.
We will use a custom stream to illustrate how twinagg() works. The
the variable x() returns a finite stream of explicit time stamped
values.
create function x() -> Stream of Timeval
as select Stream of t
from Timeval t
where t in [ts(|2022-05-12T00:00:00.0Z|,16),
ts(|2022-05-12T00:00:00.3Z|,4),
ts(|2022-05-12T00:00:00.4Z|,8),
ts(|2022-05-12T00:00:00.5Z|,2),
ts(|2022-05-12T00:00:00.7Z|,1),
ts(|2022-05-12T00:00:00.8Z|,7),
ts(|2022-05-12T00:00:01.0Z|,3),
ts(|2022-05-12T00:00:01.1Z|,6),
ts(|2022-05-12T00:00:01.2Z|,9),
ts(|2022-05-12T00:00:01.3Z|,5),
ts(|2022-05-12T00:00:01.5Z|,12),
ts(|2022-05-12T00:00:01.6Z|,11)]
The syntax stream e1, e2,... specifies a constant finite stream.
If we illustrate the value stream on a timeline it looks like this:

To create a non-overlapping time window (a tumbling window) you provide the same value for size and stride. Try it by running the following query:
twinagg(x(), 0.3, 0.3)
The figure below illustrates how the tumbling window passes over the stream elements:

Just like with winagg(), twinagg() can also skip elements by
specifying a stride that is larger than the window size, or use a
sliding window by specifying a stride that is smaller than the window
size.
Here is an example where window size is larger than the stride. Try it by running the query:
twinagg(x(), 0.6, 0.3)
The query produces a 0.6 second window every 0.3 seconds, which is illustrated in the figure below:

Streamed data reduction
The polymorphic
reduce
function can also be applied on streams. Streams can be seen as
continuously extended sequences that even can be infinite. The
semantics of reduce for streams is therefore a stream where a new
element is returned after each call to the reductor. By contrast,
for other collections the reductor accumulates values and there is a
single result returned for the accumulation of the whole collection.
Let's define a stream of running maximum value pairs of another stream . If has the elements the running maximum will have elements .
First we define a reductor that computes for given and .
create function max_n(Vector max_n1, Vector e_n)
-> Vector of Real
as [e_n[1], -- x_n
max(e_n[2], -- y_n
max_n1[2])] -- max(y_1...Y_n-1)
max_n([10,5],[10,6]);
max_n([10,5],[10,4]);
Now we can use reduce function to define max_stream.
create function max_stream(Stream of Vector s) -> Stream of Vector
as reduce(s, 'max_n')
Let's test it on my_stream.
timeout(my_stream(),1)
timeout(max_stream(my_stream()),1)
Let's combine my_stream and max_stream in the same stream
and scatter plot them together with different colors.
For this we need to define a color stream from another stream
where has elements and has elements
.
Each new is computed from and with this function:
create function new_c_n(Vector of Real old_c, Vector of real e)
-> Vector of Real
as [e[1], -- x_n
e[2], -- y_n
max(old_c[3], e[2]), -- max(y_1...y_n)
case when e[2] > old_c[3]
then 0 -- blue
else 1 -- red
end]
//plot: Multi plot
{
"sa_plot": "Scatter plot",
"size_axis": "none",
"color_axis": 4,
"memory": 200
};
reduce(my_stream(), 'new_c_n', [0,0,0,0])
You can specify the initial as third argument of reduce.
Visit the reference section Streams for more on queries and functions over streams.
