Windowing
SA Engine has two functions that make windows over streams. The first is called winagg()
and is a count-based window function. The second function is called twinagg()
and is a time-based window function.
winagg()
Winagg is a count-based window function that creates windows from an input stream. You specify the number of stream elements that each window contains (the window "size") and how many stream elements the window moves forward before emitting the next window (the "stride").
For example, to create a non-overlapping window (a tumbling window) you provide the same value for size and stride. Try it by running the following query (remember that heartbeat()
generates a stream of seconds emitted at a given pace, so it takes 5s for the query to output any result):
winagg(heartbeat(1), 5, 5);
To run this code block you must be logged in and your studio instance must be started.
You can read the query as "emit the 5 latest elements every 5th element". The figure below illustrates the behavior of this query:

It is not required to keep all the values from the input stream. You can specify a stride that is larger than the window size. Doing this will sample the stream every stride element. Try it by running the following query:
winagg(heartbeat(1), 4, 6);
To run this code block you must be logged in and your studio instance must be started.
You can read the query as "emit the 4 latest elements every 6th element". It effectively skips two values between every window, which is illustrated in the figure below:

If you set a stride value that is lower than the window size you get overlapping windows (sliding window). Try it by running the following query (it takes 10s for the query to output any result):
winagg(heartbeat(1), 10, 1);
To run this code block you must be logged in and your studio instance must be started.
The query produces a 10-element window every time the input stream emits a new element, which is illustrated in the figure below:

twinagg()
Twinagg is a time-based window function that creates windows from an input stream of timestamped values. It works much like winagg
but instead of specifying window size and stride in number of elements you specify it in seconds.
We will use a custom stream to illustrate how twinagg
works. The function below outputs a stream of timestamped values. Create the function by running the query:
create function x() -> stream of timeval
as select stream of tt
from timeval tt
where tt in [ts(|2022-05-12T00:00:00.0Z|,16),
ts(|2022-05-12T00:00:00.3Z|,4),
ts(|2022-05-12T00:00:00.4Z|,8),
ts(|2022-05-12T00:00:00.5Z|,2),
ts(|2022-05-12T00:00:00.7Z|,1),
ts(|2022-05-12T00:00:00.8Z|,7),
ts(|2022-05-12T00:00:01.0Z|,3),
ts(|2022-05-12T00:00:01.1Z|,6),
ts(|2022-05-12T00:00:01.2Z|,9),
ts(|2022-05-12T00:00:01.3Z|,5),
ts(|2022-05-12T00:00:01.5Z|,12),
ts(|2022-05-12T00:00:01.6Z|,11)];
To run this code block you must be logged in and your studio instance must be started.
If we illustrate the value stream on a timeline it looks like this:

To create a non-overlapping time window (a tumbling window) you provide the same value for size and stride. Try it by running the following query:
twinagg(x(), 0.3, 0.3);
To run this code block you must be logged in and your studio instance must be started.
The figure below illustrates how the tumbling window passes over the stream elements:

Just like with winagg
, twinagg
can also skip elements by specifying a stride that is larger than the window size, or use a sliding window by specifying a stride that is smaller than the window size.
Here is an example where window size is larger than the stride. Try it by running the query:
twinagg(x(), 0.6, 0.3);
To run this code block you must be logged in and your studio instance must be started.
The query produces a 0.6 second window every 0.3 seconds, which is illustrated in the figure below:

Functions
predwin(Stream s,Integer c,Object p,Function e,Function l,Boolean start_entered)
->Stream of Vector
Form predicate windows over stream s
by applying the window delimination
functions e
and l
on sliding change windows over s
of size cw
with stride 1. A new window is started when e(cw,p)
is true or at the
begining if start_entered=true
and ended when l(cw, p)
is true
predwin(Stream s,Integer c,Object p,Function e,Function l)->Stream of Vector
Form predicate windows over stream s
by applying the window delimination
functions e
and l
on sliding change windows over s
of size cw
with stride 1. A new window is started when e(cw,p)
is true
and ended when l(cw, p)
is true
remove_null(Stream s)->Stream
Remove tuples in s
containing some null
simwinstream(Real pace,Integer sz)->Stream of Vector of Real
twinagg(Stream of Timeval s,Number size,Number stride)
->Stream of Timeval of Vector
Stream of time windows over stream s
represented as
time stamped vectors where:
size
is the window size in seconds
stride
is the window stride in seconds
twinagg(Stream of Timeval s,Number size,Number stride,Boolean keep_ts)
->Stream of Timeval of Vector
Stream of time windows over stream s
represented as
time stamped vectors where:
size
is the window size in seconds
stride
is the window stride in seconds
keep_ts
wether to keep timestamp of each element in window.
twinagg(Stream of Timeval s,Number size,Boolean keep_ts,Function pred,
Vector args)->Stream of Timeval of Vector
Stream of time windows over stream s
represented as
time stamped vectors where:
size
is the window size in seconds
pred
is a test function returning true if the window should be emitted
args
is a vector containing any additional arguments to the function pred
keep_ts
wether to keep timestamp of each element in window.
twinagg(Stream of Timeval s,Number size,Number stride,Timeval start)
->Stream of Timeval of Vector
Stream of time windows over stream s
represented as
time stamped vectors where:
size
is the window size in seconds
stride
is the window stride in seconds
start
is the point in time where the windowing should start.
twinagg(Stream of Timeval s,Number size,Function pred)
->Stream of Timeval of Vector
Stream of time windows over stream s
represented as
time stamped vectors where:
size
is the window size in seconds
pred
is a test function returning true if the window should be emitted
twinagg(Stream of Timeval s,Number size,Function pred,Vector args)
->Stream of Timeval of Vector
Stream of time windows over stream s
represented as
time stamped vectors where:
size
is the window size in seconds
pred
is a test function returning true if the window should be emitted
args
is a vector containing any additional arguments to the function pred
twinagg0(Stream of Timeval s,Number size,Number stride,Timeval start,
Boolean keep_ts)->Stream of Timeval of Vector
Stream of time windows over stream s
represented as
time stamped vectors where:
size
is the window size in seconds
stride
is the window stride in seconds
start
is the point in time where the windowing should start.
keep_ts
wether to keep timestamp of each element in window.
winagg(Stream s,Number size,Number stride)->Stream of Vector
Stream of count windows over stream s
represented as vectors where:
size
is the number of elements in each window
stride
is the number of elements in the window stride