conditional-stats-on-streams
For the examples below, we set the variable :s1 to a stream of time
stamped](/docs/reference/osql-ref/time#time-stamped-objects) numbers
that mimics some sensor readings:
set :s1 = stream ts(|2022-03-07T23:10:17.9Z|,1),
ts(|2022-03-07T23:10:18.0Z|,2),
ts(|2022-03-07T23:10:18.1Z|,3),
ts(|2022-03-07T23:10:18.2Z|,4),
ts(|2022-03-07T23:10:18.3Z|,5),
ts(|2022-03-07T23:10:18.4Z|,6),
ts(|2022-03-07T23:10:18.5Z|,7),
ts(|2022-03-07T23:10:18.6Z|,8),
ts(|2022-03-07T23:10:18.8Z|,9),
ts(|2022-03-07T23:10:18.9Z|,10)
The variable :s1 is assigned to a stream of time
stamped numbers
1 through 10 with time stamps ~0.1 seconds apart (with a gap at 18.7).
Run the stream :s1 to see the time stamped stream elements:
:s1
The figure below illustrates how running :s1 returns a stream of
numbers from 1 to 10 that are time
stamped at
specific time points:

Windows of latest elements
The first example shows how to use twinagg() to create a stream of
sliding temporal windows where each window contains the latest 0.5
elements preceding each element in a time stamped stream.
In this case, since we want to always form a new window for every
element in the incoming stream, the predicate pred should
always be true:
create function always_true(Object o) -> Boolean as 1=1
We can now illustrate how the predicate window function twinagg()
works by feeding it with the elements in :s1 and limit the
time window to 0.5 seconds. This means that only the elements from the
last 0.5 seconds will be present in each window. We use the predicate
always_true() so every timestamped element in the stream will
produce a new window.
The figure below illustrates how twinagg() with the always_true()
predicate behaves at time 23:10:18.3 for element five:

Since the predicate is always true, the elements from the last 0.5 seconds (the window size) are collected in each window.
Let's call twinagg() with :s1, window size 0.5, and
predicate always_true() to verify that we get the expected
result:
twinagg(:s1, .5, #'always_true')
You see that the above call to twinagg() returns a stream of time
stamped windows,
one for each incoming element, where each window contains the incoming
stream elements having time stamps less than 0.5 seconds before the
current one. We notice that this is a combination of a temporal and
counting windows where
the window size is temporal 0.5 seconds, while the stride is a count
of one element.
Windows of conditional successive elements
To illustrate twinagg() further we define a stream of windows where
the contents of each window are successive stream elements satisfying
the condition of being larger than a constant value six.
This predicate is true if the value of the time
stamped number
tn exceeds the constant threshold six:
create function above_six(Timeval of Number tn) -> Boolean
as select v > 6
from Number v
where v = value(tn)
We now use our new predicate in a call to twinagg() with :s1, and
a time window of 0.5 seconds. The figure below illustrates that
twinagg() tests whether the value of tn for time
stamped number
five is larger than six.

Since the value of the time
stamped number
v is less than six, the result window is not yet formed. However,
look at element eight at time 23:10:18.6:

The value of v is now larger than six, and therefore a new window of
the elements from latest 0.5 seconds are formed as a result window.
We call twinagg with :s1), window size 0.5, and predicate
above_six() to verify that you get the expected result:
twinagg(:s1, 0.5, #'above_six')
The query returns a stream of temporal
windows. You see
that once the value of:s1 goes above 6 twinagg() starts to collect
elements into a new time window.
A time stamped
object contains
both the time stamp and the object itself. If we are interested only
in the objects without the time stamps in a time stamped stream we can
embed the call to twinagg() in a select statement that extracts only
the values of time stamped objects.
select Stream of value(tsv)
from Timeval of Vector tsv
where tsv in twinagg(:s1, .5, #'above_six')
Parameterized windows of conditional successive elements
So far we have shown predicate windows where the predicate does a constant test for each incoming element, e.g. testing whether its value is less than a constant value six.
One would like to generalize this to enable predicates having
parameters, e.g. in our example being able to have any kind of
threshold, not just the constant six. The second variant of the
predicate window function twinagg() can be used for this.
To enable arbitrary threshold values we can create a predicate that
takes a second vector parameter p:
create function threshold_param(Timeval of Number tn, Vector p) -> Boolean
as select v > p[1]
from Number v
where v = value(tn)
The above predicate takes a time stamped number tn and evaluates to
true if the number is greater than the first element in the parameter
vector p.
We can use our new predicate in a call to the second variant of
twinagg() on :s1 with a time window of 0.5 seconds and the
parameter [6]. The figure below illustrates how twinagg() behaves
for element five at time 23:10:18.3:

Since the value of tn of element five (i.e. 5) is less than the
first and only element in the parameter vector p (i.e. 6),
twinagg() has not formed a full result window. However, if we look
at element eight at time 23:10:18.6 when the value is larger than 6:

The value of tn is now larger than the first element in p, and
therefore the elements from the last 0.5 seconds are returned as a
window.
Run twinagg() on the signal with window size 0.5 and predicate
threshold_param() and verify that you get the expected result:
twinagg(:s1, 0.5, #'threshold_param', [6])
You see that once the value of an element in :s1 goes above the
first element in the p vector [6], twinagg() returns a full
time stamped
window and
starts to form a new time window.
Multi-valued stream generator
Until now we only have worked with single-valued streams. Typically you have streams whose elements are vectors, such as signals from multiple sources.
To illustrate this we first define a new simulated data stream
generator :s2 that returns a stream of time
stamped vectors
containing three values. The first value consists of the number 1
through 10, the second value contains numbers between 100 and 600, and
the third value is a simple 0/1 signal with possible null values:
set :s2 = stream ts(|2022-03-08T08:06:39.044Z|,[1, 100, null]),
ts(|2022-03-08T08:06:39.144Z|,[2, 100, 1]),
ts(|2022-03-08T08:06:39.252Z|,[3, 200, 0]),
ts(|2022-03-08T08:06:39.345Z|,[4, 300, 0]),
ts(|2022-03-08T08:06:39.455Z|,[5, 300, 0]),
ts(|2022-03-08T08:06:39.549Z|,[6, 500, 1]),
ts(|2022-03-08T08:06:39.660Z|,[7, 500, 0]),
ts(|2022-03-08T08:06:39.752Z|,[8, 500, 0]),
ts(|2022-03-08T08:06:39.861Z|,[9, 600, 1]),
ts(|2022-03-08T08:06:39.956Z|,[10, 600, 1])
Run :s2 to see the stream output:
:s2
The figure below illustrates the elements of the time stamped
stream :s2:

Conditions on multi-valued stream elements
Let's create a predicate that thresholds the :s2 on the
first index of the time stamped vectors in the stream. This will be
convenient when we illustrate the output of twinagg() for
multi-valued streams. Since :s2 produces a timestamped
stream of vectors, the predicate passed to twinagg() needs
to take a Timeval of Vector as first argument.
create function threshold_multi(Timeval of Vector tv, Vector p) -> Boolean
as select v[1] > p[1]
from Vector v
where v = value(tv)
Here we use the same parameter vector p for the threshold as before,
but instead of comparing with the value of the entire time stamped
stream element
tv as we did previously, we now compare with the first index of the
vector time
stamped by tv.
We still use the threshold [6] and time interval of 0.5 seconds for
our window. The figure below illustrates how twinagg() behaves at
time 08:06:39.549:

The first element in v (i.e. 6) is not larger than the first element
in p (also 6). Therefore twinagg() has not formed a full result
window yet. However, if we look at the result at time 08:06:39.660
when v is [7, 500, 0]:

The first element in v (7) is now larger than the first element in
p (6). Therefore the vectors from the last 0.5 seconds are temporal
windows.
Run twinagg() on :s2 with window size 0.5 and predicate
threshold_multi() and verify that you get the expected result:
twinagg(:s2, .5, #'threshold_multi', [6])
We see that each time the first element in the vector stream is
greater than 6, twinagg() returns a window (i.e. vector) of the
vectors from the last 0.5 seconds.
And that concludes this guide on how to use twinagg() to get predicate windows from streams.
