Skip to main content

Windowing

SA Engine has two functions that make windows over streams. The first is called winagg() and is a count-based window function. The second function is called twinagg() and is a time-based window function.

winagg()

Winagg is a count-based window function that creates windows from an input stream. You specify the number of stream elements that each window contains (the window "size") and how many stream elements the window moves forward before emitting the next window (the "stride").

For example, to create a non-overlapping window (a tumbling window) you provide the same value for size and stride. Try it by running the following query (remember that heartbeat() generates a stream of seconds emitted at a given pace, so it takes 5s for the query to output any result):

winagg(heartbeat(1), 5, 5);
Not connected

To run this code block you must be logged in and your studio instance must be started.

You can read the query as "emit the 5 latest elements every 5th element". The figure below illustrates the behavior of this query:

winagg_1_5-5_drop.png

It is not required to keep all the values from the input stream. You can specify a stride that is larger than the window size. Doing this will sample the stream every stride element. Try it by running the following query:

winagg(heartbeat(1), 4, 6);
Not connected

To run this code block you must be logged in and your studio instance must be started.

You can read the query as "emit the 4 latest elements every 6th element". It effectively skips two values between every window, which is illustrated in the figure below:

winagg_2_4-6_drop.png

If you set a stride value that is lower than the window size you get overlapping windows (sliding window). Try it by running the following query (it takes 10s for the query to output any result):

winagg(heartbeat(1), 10, 1);
Not connected

To run this code block you must be logged in and your studio instance must be started.

The query produces a 10-element window every time the input stream emits a new element, which is illustrated in the figure below:

winagg_5_10-1_drop.png

twinagg()

Twinagg is a time-based window function that creates windows from an input stream of timestamped values. It works much like winagg but instead of specifying window size and stride in number of elements you specify it in seconds.

We will use a custom stream to illustrate how twinagg works. The function below outputs a stream of timestamped values. Create the function by running the query:

create function x() -> stream of timeval
as select stream of tt
from timeval tt
where tt in [ts(|2022-05-12T00:00:00.0Z|,16),
ts(|2022-05-12T00:00:00.3Z|,4),
ts(|2022-05-12T00:00:00.4Z|,8),
ts(|2022-05-12T00:00:00.5Z|,2),
ts(|2022-05-12T00:00:00.7Z|,1),
ts(|2022-05-12T00:00:00.8Z|,7),
ts(|2022-05-12T00:00:01.0Z|,3),
ts(|2022-05-12T00:00:01.1Z|,6),
ts(|2022-05-12T00:00:01.2Z|,9),
ts(|2022-05-12T00:00:01.3Z|,5),
ts(|2022-05-12T00:00:01.5Z|,12),
ts(|2022-05-12T00:00:01.6Z|,11)];
Not connected

To run this code block you must be logged in and your studio instance must be started.

If we illustrate the value stream on a timeline it looks like this:

twinagg_stream_drop.png

To create a non-overlapping time window (a tumbling window) you provide the same value for size and stride. Try it by running the following query:

twinagg(x(), 0.3, 0.3);
Not connected

To run this code block you must be logged in and your studio instance must be started.

The figure below illustrates how the tumbling window passes over the stream elements:

twinagg_1_drop.png

Just like with winagg, twinagg can also skip elements by specifying a stride that is larger than the window size, or use a sliding window by specifying a stride that is smaller than the window size.

Here is an example where window size is larger than the stride. Try it by running the query:

twinagg(x(), 0.6, 0.3);
Not connected

To run this code block you must be logged in and your studio instance must be started.

The query produces a 0.6 second window every 0.3 seconds, which is illustrated in the figure below:

twinagg_2_drop.png

Functions

predwin(Stream s,Integer c,Object p,Function e,Function l,Boolean start_entered)
->Stream of Vector

Form predicate windows over stream s by applying the window delimination functions e and l on sliding change windows over s of size cw with stride 1. A new window is started when e(cw,p) is true or at the begining if start_entered=true and ended when l(cw, p) is true


predwin(Stream s,Integer c,Object p,Function e,Function l)->Stream of Vector

Form predicate windows over stream s by applying the window delimination functions e and l on sliding change windows over s of size cw with stride 1. A new window is started when e(cw,p) is true and ended when l(cw, p) is true


remove_null(Stream s)->Stream

Remove tuples in s containing some null


simwinstream(Real pace,Integer sz)->Stream of Vector of Real

twinagg(Stream of Timeval s,Number size,Number stride)
->Stream of Timeval of Vector

Stream of time windows over stream s represented as time stamped vectors where: size is the window size in seconds stride is the window stride in seconds


twinagg(Stream of Timeval s,Number size,Number stride,Boolean keep_ts)
->Stream of Timeval of Vector

Stream of time windows over stream s represented as time stamped vectors where: size is the window size in seconds stride is the window stride in seconds keep_ts wether to keep timestamp of each element in window.


twinagg(Stream of Timeval s,Number size,Boolean keep_ts,Function pred,
Vector args)->Stream of Timeval of Vector

Stream of time windows over stream s represented as time stamped vectors where: size is the window size in seconds pred is a test function returning true if the window should be emitted args is a vector containing any additional arguments to the function pred keep_ts wether to keep timestamp of each element in window.


twinagg(Stream of Timeval s,Number size,Number stride,Timeval start)
->Stream of Timeval of Vector

Stream of time windows over stream s represented as time stamped vectors where: size is the window size in seconds stride is the window stride in seconds start is the point in time where the windowing should start.


twinagg(Stream of Timeval s,Number size,Function pred)
->Stream of Timeval of Vector

Stream of time windows over stream s represented as time stamped vectors where: size is the window size in seconds pred is a test function returning true if the window should be emitted


twinagg(Stream of Timeval s,Number size,Function pred,Vector args)
->Stream of Timeval of Vector

Stream of time windows over stream s represented as time stamped vectors where: size is the window size in seconds pred is a test function returning true if the window should be emitted args is a vector containing any additional arguments to the function pred


twinagg0(Stream of Timeval s,Number size,Number stride,Timeval start,
Boolean keep_ts)->Stream of Timeval of Vector

Stream of time windows over stream s represented as time stamped vectors where: size is the window size in seconds stride is the window stride in seconds start is the point in time where the windowing should start. keep_ts wether to keep timestamp of each element in window.


winagg(Stream s,Number size,Number stride)->Stream of Vector

Stream of count windows over stream s represented as vectors where: size is the number of elements in each window stride is the number of elements in the window stride