Predicate windows
Guide specification | |
---|---|
Guide type: | Wasm code |
Requirements: | None |
Recommended reading: | None |
Introduction
A predicate window function is a function producing a stream of predicate windows whose contents are dynamically determined by a predicate parameter that is a function returning a Boolean value.
To show statistics of signal streams you can use two predicate window
function variants of twinagg()
where the parameter pred
specifies the predicate.
create function twinagg(Stream of Timeval s, Number size, Function pred)
-> Stream of Timeval of Vector
/* Time stamped stream of windows over stream `s` whose elements are
time stamped vectors where:
`size` is the window size in seconds
`pred` is a predicate returning true if the window is full */
create function twinagg(Stream of Timeval s, Number size, Function pred, Vector args)
-> Stream of Timeval of Vector
/* Time stamped stream of windows over stream `s` whose elements are
time stamped vectors where:
`size` is the window size in seconds
`pred` is a predicate returning true if the window is full
`args` is a vector containing any additional arguments to the predicate */
The first version takes as argument a time stamped
stream s
, the
time window size
in seconds, and a predicate pred
. It produces a
stream of time stamped windows containing the elements in s
during
the latest size
seconds whenever the predicate is true for an
element in s
.
The second version also takes a vector as an extra argument passed to the predicate.
Simulated data stream
For the examples below, we create a function that returns a time stamped stream of numbers that mimics some sensor readings:
create function signal_1() -> Stream of Timeval
as select Stream of tt
from Timeval tt
where tt in [ts(|2022-03-07T23:10:17.9Z|,1),
ts(|2022-03-07T23:10:18.0Z|,2),
ts(|2022-03-07T23:10:18.1Z|,3),
ts(|2022-03-07T23:10:18.2Z|,4),
ts(|2022-03-07T23:10:18.3Z|,5),
ts(|2022-03-07T23:10:18.4Z|,6),
ts(|2022-03-07T23:10:18.5Z|,7),
ts(|2022-03-07T23:10:18.6Z|,8),
ts(|2022-03-07T23:10:18.8Z|,9),
ts(|2022-03-07T23:10:18.9Z|,10)]
The function returns a stream of time stamped elements 1 through 10 with time stamps ~0.1 seconds apart (with a gap at 18.7). Run the function to see the time stamped stream elements:
signal_1()
The figure below illustrates how the function signal_1()
returns a
stream of elements from 1 to 10 time stamped at specific time points:

Windows of latest elements
The first example shows how to use twinagg()
to create a stream of
sliding temporal windows where each window contains the latest 0.5
elements preceding each element in a time stamped stream.
In this case, since we want to always form a new window for every
element in the incoming stream, the predicate pred
should
always be true:
create function always_true(Object o) -> Boolean as 1=1
We can now illustrate how the predicate window function twinagg()
works by feeding it with the elements in signal_1()
and limit the
time window to 0.5 seconds. This means that only the elements from the
last 0.5 seconds will be present in each window. We use the predicate
always_true()
so every timestamped element in the stream will
produce a new window.
The figure below illustrates how twinagg()
with the always_true()
predicate behaves at time 23:10:18.3 for element five:

Since the predicate is always true, the elements from the last 0.5 seconds (the window size) are collected in each window.
Let's call twinagg()
with signal_1()
, window size 0.5, and
predicate always_true()
to verify that we get the expected
result:
twinagg(signal_1(), .5, #'always_true')
You see that the above call to twinagg()
returns a stream of time
stamped windows, one for each incoming element, where each window
contains the incoming stream elements having time stamps less than 0.5
seconds before the current one. We notice that this is a combination
of a temporal and counting
window where the
window size is temporal 0.5 seconds, while the stride is a count of
one element.
Windows of conditional successive elements
To illustrate twinagg()
further we define a stream of windows where
the contents of each window are successive stream elements satisfying
the condition of being larger than a constant value six.
This predicate is true if the value of the time stamed
object tn
exceeds the constant threshold six:
create function above_six(Timeval of Number tn) -> Boolean
as select v > 6
from Number v
where v = value(tn)
We now use our new predicate in a call to twinagg()
with
signal_1()
, and a time window of 0.5 seconds. The figure below
illustrates that twinagg()
tests whether the value of tn
for time
stamped element five is larger than six.

Since the value of the time stamped object v
is less than six, the
result window is not yet formed. However, look at element eight at
time 23:10:18.6:

The value of v
is now larger than six, and therefore a new window of
the elements from latest 0.5 seconds are formed as a result window.
We call twinagg
with signal_1()
, window size 0.5, and predicate
above_six()
to verify that you get the expected result:
twinagg(signal_1(), 0.5, #'above_six')
The query returns a stream of time stamped
windows. You see
that once the value from signal_1()
goes above 6 twinagg()
starts
to collect elements into a new time window.
Time stamped objects contain both the time stamps and the objects
themselves. If we are interested only in the objects without the time
stamps we can embed the call to twinagg()
in a select statement that
extracts only the values of time stamped objects.
select Stream of value(tsv)
from Timeval of Vector tsv
where tsv in twinagg(signal_1(), .5, #'above_six')
Parameterized windows of conditional successive elements
So far we have shown predicate windows where the predicate does a constant test for each incoming element, e.g. testing whether its value is less than a constant value six.
One would like to generalize this to enable predicates having
parameters, e.g. in our example being able to have any kind of
threshold, not just the constant six. The second variant of the
predicate window function twinagg()
can be used for this.
To enable arbitrary threshold values we can create a predicate that
takes a second vector parameter p
:
create function threshold_param(Timeval of Number tn, Vector p) -> Boolean
as select v > p[1]
from Number v
where v = value(tn)
The above predicate takes a time stamped number tn
and evaluates to
true if the number is greater than the first element in the parameter
vector p
.
We can use our new predicate in a call to the second variant of
twinagg()
on signal_1()
with a time window of 0.5 seconds and the
parameter [6]
. The figure below illustrates how twinagg()
behaves
for element five at time 23:10:18.3:

Since the value of tn
of element five (i.e. 5) is less than the
first and only element in the parameter vector p
(i.e. 6),
twinagg()
has not formed a full result window. However, if we look
at element eight at time 23:10:18.6 when the value is larger than 6:

The value of tn
is now larger than the first element in p
, and
therefore the elements from the last 0.5 seconds are returned as a
window.
Run twinagg()
on the signal with window size 0.5 and predicate
threshold_param()
and verify that you get the expected result:
twinagg(signal_1(), 0.5, #'threshold_param', [6])
You see that once the value from signal_1()
goes above the first
element in the p
vector [6]
, twinagg()
returns a full time
stamped window and starts to form a new time window.
Multi-valued stream generator
Until now we only have worked with single-valued streams. Typically you have streams whose elements are vectors, such as signals from multiple sources.
To illustrate this we first define a new simulated data stream
generator multisignal()
that returns a stream of time stamped
vectors containing three values. The first value consists of the
number 1 through 10, the second value contains numbers between 100 and
600, and the third value is a simple 0/1 signal with possible null
values:
create function multisignal() -> Stream of Timeval
as select Stream of tt
from Timeval tt
where tt in [ts(|2022-03-08T08:06:39.044Z|,[1, 100, null]),
ts(|2022-03-08T08:06:39.144Z|,[2, 100, 1]),
ts(|2022-03-08T08:06:39.252Z|,[3, 200, 0]),
ts(|2022-03-08T08:06:39.345Z|,[4, 300, 0]),
ts(|2022-03-08T08:06:39.455Z|,[5, 300, 0]),
ts(|2022-03-08T08:06:39.549Z|,[6, 500, 1]),
ts(|2022-03-08T08:06:39.660Z|,[7, 500, 0]),
ts(|2022-03-08T08:06:39.752Z|,[8, 500, 0]),
ts(|2022-03-08T08:06:39.861Z|,[9, 600, 1]),
ts(|2022-03-08T08:06:39.956Z|,[10, 600, 1])]
Run the function to see the stream output:
multisignal()
The figure below illustrates the elements of the time stamped stream
from the function call multisignal()
:

Conditions on multi-valued stream elements
Let's create a predicate that thresholds the multisignal()
on the
first index of the time stamped vectors in the stream. This will be
convenient when we illustrate the output of twinagg()
for
multi-valued streams. Since multisignal()
produces a timestamped
stream of vectors, the predicate passed to twinagg()
needs
to take a Timeval of Vector
as first argument.
create function threshold_multi(Timeval of Vector tv, Vector p) -> Boolean
as select v[1] > p[1]
from Vector v
where v = value(tv)
Here we use the same parameter vector p
for the threshold as before,
but instead of comparing with the value of the entire time stamped
stream element tv
as we did previously, we now compare with the
first index of the vector time stamped by tv
.
We still use the threshold [6]
and time interval of 0.5 seconds for
our window. The figure below illustrates how twinagg()
behaves at
time 08:06:39.549:

The first element in v
(i.e. 6) is not larger than the first element
in p
(also 6). Therefore twinagg()
has not formed a full result
window yet. However, if we look at the result at time 08:06:39.660
when v
is [7, 500, 0]
:

The first element in v
(7) is now larger than the first element in
p
(6). Therefore the vectors from the last 0.5 seconds are returned
as a time stamped window.
Run twinagg()
on multisignal()
with window size 0.5 and predicate
threshold_multi()
and verify that you get the expected result:
twinagg(multisignal(), .5, #'threshold_multi', [6])
We see that each time the first element in the vector stream is
greater than 6, twinagg()
returns a window (i.e. vector) of the
vectors from the last 0.5 seconds.
Statistics on multi-valued streams
So far we have just formed (parameterized) predicate window streams. Typically, statistics is collected over the windows, as will be shown next.
Transpose result to separate signal windows
We recall that each vector in the result stream of multisignal()
has
three values from three different signals. To be able to do statistics
on a single signal we need to transform the result so that all values
from a single signal ends up in a single vector. We can do this by
transposing the result.
Let's say that we have three sensors , , and and their
output is collected as a stream of vectors (which is exactly
what multisignal()
does). If we apply twinagg()
with size on
this stream we get a Vector of Vector of Number
which represents a
matrix:
If we transpose the matrix we get all values for each sensor in a separate vector.
This means we can now do statistics over signal , or by operating on their respective vectors , and .
Let's say that the second signal in the multisignal()
stream
represents RPM values from an engine:

To be able to do statistics on the RPM from the twinagg()
result we
need to transpose the result so all RPM values are collected in a
single vector.
For example, consider transposing the result from twinagg()
in our
previous example. The figure below illustrates how the transposed
result has one vector per signal, and the RPM values are colored in
burgundy:

So to get the readings from each signal into a seperate vector per
signal we can create a select statement around twinagg()
that
transposes the result. Run the following query and see that you get
the expected results:
select Stream of ts, vt
from Timeval of Vector tsv, Timeval ts,
Vector of Vector of Number v,
Vector of Vector of Number vt
where tsv in twinagg(multisignal(), .5, #'threshold_multi', [6])
and v = value(tsv)
and vt = transpose(v)
and ts = timestamp(tsv)
The function transpose(Vector of Vector of Number m)->Vector of
Vector of Number
transposes a matrix represented as a vector of vector
of numbers.
We see that the query returns a stream whose elements are vectors where the inner vectors represent vectors of values for each signal. This is exactly what we need to do statistics over each of the signals.
Trigger statistics on signal
Let's say that the second signal in multisignal()
is RPM values from
an engine and the third signal is a light indicator which signals that
something has gone wrong. Whenever the indicator light switches on we
would like to see some statistics for the engine RPM.

To accomplish this we first need to define a new predicate that is true whenever the indicator light is on (i.e., when the third signal is equal to 1):
create function on_light(Timeval of Vector tv) -> Boolean
/* Returns true when element 3 in the vector is equal to 1 */
as select v[3] = 1
from Vector v
where v = value(tv)
Now we can use this new predicate in a call to twinagg()
and call it in a function statistics_on_light()
that both transforms
the result (as shown in the select statement from the previous
section) and carries out statistics calculations we might be
interested in.
create function statistics_on_light() -> Stream of Vector
as select Stream of [ts, rpm, mean(rpm), stdev(rpm)]
from Timeval of Vector tsv, Timeval ts,
Vector of Vector of Number v,
Vector of Vector of Number vt,
Vector of Number rpm
where tsv in twinagg(multisignal(), .5, #'on_light')
and v = value(tsv)
and vt = transpose(v)
and ts = timestamp(tsv)
and rpm = vt[2]
The function above will return a stream of elements [ts, rpm, mean,
stdev]
where ts
is the time-stamp of the RPM values for the last
0.5 seconds, rpm
is a vector containing the sensor readings in the
window, mean
is the average reading, and stdev
is the standard
deviation. Try it by executing the following query:
statistics_on_light()
Specify which signal should trigger output
The on_light()
predicate assumes that the light has index
three in the vector. This might be subject to change, or you might
have other signals that you wish to use as alerts. It would therefore
be better if we could provide the index for the alert signal as
input. To illustrate this we can define a new predicate for
general alerts:
create function on_alert(Timeval of Vector tv, Vector p) -> Boolean
as select v[signal_index] = 1
from Vector v, Number signal_index
where v = value(tv)
and signal_index = p[1]
We can now tweak statistics_on_light()
by passing the index of the
light signal into the predicate:
create function statistics_on_alert(Number signal_index) -> Stream of Vector
as select Stream of [ts, rpm_window, mean(rpm_window), stdev(rpm_window)]
from Timeval of Vector tsv, Timeval ts,
Vector of Vector of Number v,
Vector of Vector of Number vt,
Vector of Number rpm_window
where tsv in twinagg(multisignal(), .5, #'on_alert', [signal_index])
and v = value(tsv)
and vt = transpose(v)
and ts = timestamp(tsv)
and rpm_window = vt[2]
Try the new function by calling it with the index set to three:
statistics_on_alert(3)
You should get the same result as you did for statistics_on_light()
but now you have specified the index of the alert signal manually.
And that concludes this guide on how to use twinagg()
for
conditional statistics multi-signal time stamed streams.