Skip to main content

Predicate windows

Guide specification
Guide type:Wasm code
Requirements:None
Recommended reading:None

Introduction

A predicate window function is a function producing a stream of predicate windows whose contents are dynamically determined by a predicate parameter that is a function returning a Boolean value.

To show statistics of signal streams you can use two predicate window function variants of twinagg() where the parameter pred specifies the predicate.

create function twinagg(Stream of Timeval s, Number size, Function pred)
-> Stream of Timeval of Vector
/* Time stamped stream of windows over stream `s` whose elements are
time stamped vectors where:
`size` is the window size in seconds
`pred` is a predicate returning true if the window is full */

create function twinagg(Stream of Timeval s, Number size, Function pred, Vector args)
-> Stream of Timeval of Vector
/* Time stamped stream of windows over stream `s` whose elements are
time stamped vectors where:
`size` is the window size in seconds
`pred` is a predicate returning true if the window is full
`args` is a vector containing any additional arguments to the predicate */

The first version takes as argument a time stamped stream s, the time window size in seconds, and a predicate pred. It produces a stream of time stamped windows containing the elements in s during the latest size seconds whenever the predicate is true for an element in s.

The second version also takes a vector as an extra argument passed to the predicate.

Simulated data stream

For the examples below, we create a function that returns a time stamped stream of numbers that mimics some sensor readings:

create function signal_1() -> Stream of Timeval
as select Stream of tt
from Timeval tt
where tt in [ts(|2022-03-07T23:10:17.9Z|,1),
ts(|2022-03-07T23:10:18.0Z|,2),
ts(|2022-03-07T23:10:18.1Z|,3),
ts(|2022-03-07T23:10:18.2Z|,4),
ts(|2022-03-07T23:10:18.3Z|,5),
ts(|2022-03-07T23:10:18.4Z|,6),
ts(|2022-03-07T23:10:18.5Z|,7),
ts(|2022-03-07T23:10:18.6Z|,8),
ts(|2022-03-07T23:10:18.8Z|,9),
ts(|2022-03-07T23:10:18.9Z|,10)]

The function returns a stream of time stamped elements 1 through 10 with time stamps ~0.1 seconds apart (with a gap at 18.7). Run the function to see the time stamped stream elements:

signal_1()

The figure below illustrates how the function signal_1() returns a stream of elements from 1 to 10 time stamped at specific time points:

Windows of latest elements

The first example shows how to use twinagg() to create a stream of sliding temporal windows where each window contains the latest 0.5 elements preceding each element in a time stamped stream.

In this case, since we want to always form a new window for every element in the incoming stream, the predicate pred should always be true:

create function always_true(Object o) -> Boolean as 1=1

We can now illustrate how the predicate window function twinagg() works by feeding it with the elements in signal_1() and limit the time window to 0.5 seconds. This means that only the elements from the last 0.5 seconds will be present in each window. We use the predicate always_true() so every timestamped element in the stream will produce a new window.

The figure below illustrates how twinagg() with the always_true() predicate behaves at time 23:10:18.3 for element five:

Since the predicate is always true, the elements from the last 0.5 seconds (the window size) are collected in each window.

Let's call twinagg() with signal_1(), window size 0.5, and predicate always_true() to verify that we get the expected result:

twinagg(signal_1(), .5, #'always_true')

You see that the above call to twinagg() returns a stream of time stamped windows, one for each incoming element, where each window contains the incoming stream elements having time stamps less than 0.5 seconds before the current one. We notice that this is a combination of a temporal and counting window where the window size is temporal 0.5 seconds, while the stride is a count of one element.

Windows of conditional successive elements

To illustrate twinagg() further we define a stream of windows where the contents of each window are successive stream elements satisfying the condition of being larger than a constant value six.

This predicate is true if the value of the time stamed object tn exceeds the constant threshold six:

create function above_six(Timeval of Number tn) -> Boolean
as select v > 6
from Number v
where v = value(tn)

We now use our new predicate in a call to twinagg() with signal_1(), and a time window of 0.5 seconds. The figure below illustrates that twinagg() tests whether the value of tn for time stamped element five is larger than six.

Since the value of the time stamped object v is less than six, the result window is not yet formed. However, look at element eight at time 23:10:18.6:

The value of v is now larger than six, and therefore a new window of the elements from latest 0.5 seconds are formed as a result window.

We call twinagg with signal_1(), window size 0.5, and predicate above_six() to verify that you get the expected result:

twinagg(signal_1(), 0.5, #'above_six')

The query returns a stream of time stamped windows. You see that once the value from signal_1() goes above 6 twinagg() starts to collect elements into a new time window.

Time stamped objects contain both the time stamps and the objects themselves. If we are interested only in the objects without the time stamps we can embed the call to twinagg() in a select statement that extracts only the values of time stamped objects.

select Stream of value(tsv)
from Timeval of Vector tsv
where tsv in twinagg(signal_1(), .5, #'above_six')

Parameterized windows of conditional successive elements

So far we have shown predicate windows where the predicate does a constant test for each incoming element, e.g. testing whether its value is less than a constant value six.

One would like to generalize this to enable predicates having parameters, e.g. in our example being able to have any kind of threshold, not just the constant six. The second variant of the predicate window function twinagg() can be used for this.

To enable arbitrary threshold values we can create a predicate that takes a second vector parameter p:

create function threshold_param(Timeval of Number tn, Vector p) -> Boolean
as select v > p[1]
from Number v
where v = value(tn)

The above predicate takes a time stamped number tn and evaluates to true if the number is greater than the first element in the parameter vector p.

We can use our new predicate in a call to the second variant of twinagg() on signal_1() with a time window of 0.5 seconds and the parameter [6]. The figure below illustrates how twinagg() behaves for element five at time 23:10:18.3:

Since the value of tn of element five (i.e. 5) is less than the first and only element in the parameter vector p (i.e. 6), twinagg() has not formed a full result window. However, if we look at element eight at time 23:10:18.6 when the value is larger than 6:

The value of tn is now larger than the first element in p, and therefore the elements from the last 0.5 seconds are returned as a window.

Run twinagg() on the signal with window size 0.5 and predicate threshold_param() and verify that you get the expected result:

twinagg(signal_1(), 0.5, #'threshold_param', [6])

You see that once the value from signal_1() goes above the first element in the p vector [6], twinagg() returns a full time stamped window and starts to form a new time window.

Multi-valued stream generator

Until now we only have worked with single-valued streams. Typically you have streams whose elements are vectors, such as signals from multiple sources.

To illustrate this we first define a new simulated data stream generator multisignal() that returns a stream of time stamped vectors containing three values. The first value consists of the number 1 through 10, the second value contains numbers between 100 and 600, and the third value is a simple 0/1 signal with possible null values:

create function multisignal() -> Stream of Timeval
as select Stream of tt
from Timeval tt
where tt in [ts(|2022-03-08T08:06:39.044Z|,[1, 100, null]),
ts(|2022-03-08T08:06:39.144Z|,[2, 100, 1]),
ts(|2022-03-08T08:06:39.252Z|,[3, 200, 0]),
ts(|2022-03-08T08:06:39.345Z|,[4, 300, 0]),
ts(|2022-03-08T08:06:39.455Z|,[5, 300, 0]),
ts(|2022-03-08T08:06:39.549Z|,[6, 500, 1]),
ts(|2022-03-08T08:06:39.660Z|,[7, 500, 0]),
ts(|2022-03-08T08:06:39.752Z|,[8, 500, 0]),
ts(|2022-03-08T08:06:39.861Z|,[9, 600, 1]),
ts(|2022-03-08T08:06:39.956Z|,[10, 600, 1])]

Run the function to see the stream output:

multisignal()

The figure below illustrates the elements of the time stamped stream from the function call multisignal():

Conditions on multi-valued stream elements

Let's create a predicate that thresholds the multisignal() on the first index of the time stamped vectors in the stream. This will be convenient when we illustrate the output of twinagg() for multi-valued streams. Since multisignal() produces a timestamped stream of vectors, the predicate passed to twinagg() needs to take a Timeval of Vector as first argument.

create function threshold_multi(Timeval of Vector tv, Vector p) -> Boolean
as select v[1] > p[1]
from Vector v
where v = value(tv)

Here we use the same parameter vector p for the threshold as before, but instead of comparing with the value of the entire time stamped stream element tv as we did previously, we now compare with the first index of the vector time stamped by tv.

We still use the threshold [6] and time interval of 0.5 seconds for our window. The figure below illustrates how twinagg() behaves at time 08:06:39.549:

The first element in v (i.e. 6) is not larger than the first element in p (also 6). Therefore twinagg() has not formed a full result window yet. However, if we look at the result at time 08:06:39.660 when v is [7, 500, 0]:

The first element in v (7) is now larger than the first element in p (6). Therefore the vectors from the last 0.5 seconds are returned as a time stamped window.

Run twinagg() on multisignal() with window size 0.5 and predicate threshold_multi() and verify that you get the expected result:

twinagg(multisignal(), .5, #'threshold_multi', [6])

We see that each time the first element in the vector stream is greater than 6, twinagg() returns a window (i.e. vector) of the vectors from the last 0.5 seconds.

Statistics on multi-valued streams

So far we have just formed (parameterized) predicate window streams. Typically, statistics is collected over the windows, as will be shown next.

Transpose result to separate signal windows

We recall that each vector in the result stream of multisignal() has three values from three different signals. To be able to do statistics on a single signal we need to transform the result so that all values from a single signal ends up in a single vector. We can do this by transposing the result.

Example

Let's say that we have three sensors AA, BB, and CC and their output is collected as a stream of vectors [a,b,c][a,b,c] (which is exactly what multisignal() does). If we apply twinagg() with size NN on this stream we get a Vector of Vector of Number which represents a matrix:

M=[[a1b1c1][a2b2c2][aNbNcN]]M = \begin{bmatrix} [a_1 & b_1 & c_1] \\ [a_2 & b_2 & c_2] \\ & \vdots & \\ [a_N & b_N & c_N] \end{bmatrix}

If we transpose the matrix we get all values for each sensor in a separate vector.

T=transpose(M)=[[a1a2...aN][b1b2...bN][c1c2...cN]]\mathrm{T = transpose}(M) = \begin{bmatrix} [a_1 & a_2 & ... & a_N] \\ [b_1 & b_2 & ... & b_N] \\ [c_1 & c_2 & ... & c_N] \end{bmatrix}

This means we can now do statistics over signal AA, BB or CC by operating on their respective vectors T(1)T(1), T(2)T(2) and T(3)T(3).

Let's say that the second signal in the multisignal() stream represents RPM values from an engine:

To be able to do statistics on the RPM from the twinagg() result we need to transpose the result so all RPM values are collected in a single vector.

For example, consider transposing the result from twinagg() in our previous example. The figure below illustrates how the transposed result has one vector per signal, and the RPM values are colored in burgundy:

So to get the readings from each signal into a seperate vector per signal we can create a select statement around twinagg() that transposes the result. Run the following query and see that you get the expected results:

select Stream of ts, vt
from Timeval of Vector tsv, Timeval ts,
Vector of Vector of Number v,
Vector of Vector of Number vt
where tsv in twinagg(multisignal(), .5, #'threshold_multi', [6])
and v = value(tsv)
and vt = transpose(v)
and ts = timestamp(tsv)

The function transpose(Vector of Vector of Number m)->Vector of Vector of Number transposes a matrix represented as a vector of vector of numbers.

We see that the query returns a stream whose elements are vectors where the inner vectors represent vectors of values for each signal. This is exactly what we need to do statistics over each of the signals.

Trigger statistics on signal

Let's say that the second signal in multisignal() is RPM values from an engine and the third signal is a light indicator which signals that something has gone wrong. Whenever the indicator light switches on we would like to see some statistics for the engine RPM.

To accomplish this we first need to define a new predicate that is true whenever the indicator light is on (i.e., when the third signal is equal to 1):

create function on_light(Timeval of Vector tv) -> Boolean
/* Returns true when element 3 in the vector is equal to 1 */
as select v[3] = 1
from Vector v
where v = value(tv)

Now we can use this new predicate in a call to twinagg() and call it in a function statistics_on_light() that both transforms the result (as shown in the select statement from the previous section) and carries out statistics calculations we might be interested in.

create function statistics_on_light() -> Stream of Vector
as select Stream of [ts, rpm, mean(rpm), stdev(rpm)]
from Timeval of Vector tsv, Timeval ts,
Vector of Vector of Number v,
Vector of Vector of Number vt,
Vector of Number rpm
where tsv in twinagg(multisignal(), .5, #'on_light')
and v = value(tsv)
and vt = transpose(v)
and ts = timestamp(tsv)
and rpm = vt[2]

The function above will return a stream of elements [ts, rpm, mean, stdev] where ts is the time-stamp of the RPM values for the last 0.5 seconds, rpm is a vector containing the sensor readings in the window, mean is the average reading, and stdev is the standard deviation. Try it by executing the following query:

statistics_on_light()

Specify which signal should trigger output

The on_light() predicate assumes that the light has index three in the vector. This might be subject to change, or you might have other signals that you wish to use as alerts. It would therefore be better if we could provide the index for the alert signal as input. To illustrate this we can define a new predicate for general alerts:

create function on_alert(Timeval of Vector tv, Vector p) -> Boolean
as select v[signal_index] = 1
from Vector v, Number signal_index
where v = value(tv)
and signal_index = p[1]

We can now tweak statistics_on_light() by passing the index of the light signal into the predicate:

create function statistics_on_alert(Number signal_index) -> Stream of Vector
as select Stream of [ts, rpm_window, mean(rpm_window), stdev(rpm_window)]
from Timeval of Vector tsv, Timeval ts,
Vector of Vector of Number v,
Vector of Vector of Number vt,
Vector of Number rpm_window
where tsv in twinagg(multisignal(), .5, #'on_alert', [signal_index])
and v = value(tsv)
and vt = transpose(v)
and ts = timestamp(tsv)
and rpm_window = vt[2]

Try the new function by calling it with the index set to three:

statistics_on_alert(3)

You should get the same result as you did for statistics_on_light() but now you have specified the index of the alert signal manually.

And that concludes this guide on how to use twinagg() for conditional statistics multi-signal time stamed streams.