Skip to main content

conditional-stats-on-streams

For the examples below, we set the variable :s1 to a stream of time stamped](/docs/reference/osql-ref/time#time-stamped-objects) numbers that mimics some sensor readings:

set :s1 = stream ts(|2022-03-07T23:10:17.9Z|,1),
ts(|2022-03-07T23:10:18.0Z|,2),
ts(|2022-03-07T23:10:18.1Z|,3),
ts(|2022-03-07T23:10:18.2Z|,4),
ts(|2022-03-07T23:10:18.3Z|,5),
ts(|2022-03-07T23:10:18.4Z|,6),
ts(|2022-03-07T23:10:18.5Z|,7),
ts(|2022-03-07T23:10:18.6Z|,8),
ts(|2022-03-07T23:10:18.8Z|,9),
ts(|2022-03-07T23:10:18.9Z|,10)

The variable :s1 is assigned to a stream of time stamped numbers 1 through 10 with time stamps ~0.1 seconds apart (with a gap at 18.7).

Run the stream :s1 to see the time stamped stream elements:

:s1

The figure below illustrates how running :s1 returns a stream of numbers from 1 to 10 that are time stamped at specific time points:

Windows of latest elements

The first example shows how to use twinagg() to create a stream of sliding temporal windows where each window contains the latest 0.5 elements preceding each element in a time stamped stream.

In this case, since we want to always form a new window for every element in the incoming stream, the predicate pred should always be true:

create function always_true(Object o) -> Boolean as 1=1

We can now illustrate how the predicate window function twinagg() works by feeding it with the elements in :s1 and limit the time window to 0.5 seconds. This means that only the elements from the last 0.5 seconds will be present in each window. We use the predicate always_true() so every timestamped element in the stream will produce a new window.

The figure below illustrates how twinagg() with the always_true() predicate behaves at time 23:10:18.3 for element five:

Since the predicate is always true, the elements from the last 0.5 seconds (the window size) are collected in each window.

Let's call twinagg() with :s1, window size 0.5, and predicate always_true() to verify that we get the expected result:

twinagg(:s1, .5, #'always_true')

You see that the above call to twinagg() returns a stream of time stamped windows, one for each incoming element, where each window contains the incoming stream elements having time stamps less than 0.5 seconds before the current one. We notice that this is a combination of a temporal and counting windows where the window size is temporal 0.5 seconds, while the stride is a count of one element.

Windows of conditional successive elements

To illustrate twinagg() further we define a stream of windows where the contents of each window are successive stream elements satisfying the condition of being larger than a constant value six.

This predicate is true if the value of the time stamped number tn exceeds the constant threshold six:

create function above_six(Timeval of Number tn) -> Boolean
as select v > 6
from Number v
where v = value(tn)

We now use our new predicate in a call to twinagg() with :s1, and a time window of 0.5 seconds. The figure below illustrates that twinagg() tests whether the value of tn for time stamped number five is larger than six.

Since the value of the time stamped number v is less than six, the result window is not yet formed. However, look at element eight at time 23:10:18.6:

The value of v is now larger than six, and therefore a new window of the elements from latest 0.5 seconds are formed as a result window.

We call twinagg with :s1), window size 0.5, and predicate above_six() to verify that you get the expected result:

twinagg(:s1, 0.5, #'above_six')

The query returns a stream of temporal windows. You see that once the value of:s1 goes above 6 twinagg() starts to collect elements into a new time window.

A time stamped object contains both the time stamp and the object itself. If we are interested only in the objects without the time stamps in a time stamped stream we can embed the call to twinagg() in a select statement that extracts only the values of time stamped objects.

select Stream of value(tsv)
from Timeval of Vector tsv
where tsv in twinagg(:s1, .5, #'above_six')

Parameterized windows of conditional successive elements

So far we have shown predicate windows where the predicate does a constant test for each incoming element, e.g. testing whether its value is less than a constant value six.

One would like to generalize this to enable predicates having parameters, e.g. in our example being able to have any kind of threshold, not just the constant six. The second variant of the predicate window function twinagg() can be used for this.

To enable arbitrary threshold values we can create a predicate that takes a second vector parameter p:

create function threshold_param(Timeval of Number tn, Vector p) -> Boolean
as select v > p[1]
from Number v
where v = value(tn)

The above predicate takes a time stamped number tn and evaluates to true if the number is greater than the first element in the parameter vector p.

We can use our new predicate in a call to the second variant of twinagg() on :s1 with a time window of 0.5 seconds and the parameter [6]. The figure below illustrates how twinagg() behaves for element five at time 23:10:18.3:

Since the value of tn of element five (i.e. 5) is less than the first and only element in the parameter vector p (i.e. 6), twinagg() has not formed a full result window. However, if we look at element eight at time 23:10:18.6 when the value is larger than 6:

The value of tn is now larger than the first element in p, and therefore the elements from the last 0.5 seconds are returned as a window.

Run twinagg() on the signal with window size 0.5 and predicate threshold_param() and verify that you get the expected result:

twinagg(:s1, 0.5, #'threshold_param', [6])

You see that once the value of an element in :s1 goes above the first element in the p vector [6], twinagg() returns a full time stamped window and starts to form a new time window.

Multi-valued stream generator

Until now we only have worked with single-valued streams. Typically you have streams whose elements are vectors, such as signals from multiple sources.

To illustrate this we first define a new simulated data stream generator :s2 that returns a stream of time stamped vectors containing three values. The first value consists of the number 1 through 10, the second value contains numbers between 100 and 600, and the third value is a simple 0/1 signal with possible null values:

set :s2 = stream ts(|2022-03-08T08:06:39.044Z|,[1, 100, null]),
ts(|2022-03-08T08:06:39.144Z|,[2, 100, 1]),
ts(|2022-03-08T08:06:39.252Z|,[3, 200, 0]),
ts(|2022-03-08T08:06:39.345Z|,[4, 300, 0]),
ts(|2022-03-08T08:06:39.455Z|,[5, 300, 0]),
ts(|2022-03-08T08:06:39.549Z|,[6, 500, 1]),
ts(|2022-03-08T08:06:39.660Z|,[7, 500, 0]),
ts(|2022-03-08T08:06:39.752Z|,[8, 500, 0]),
ts(|2022-03-08T08:06:39.861Z|,[9, 600, 1]),
ts(|2022-03-08T08:06:39.956Z|,[10, 600, 1])

Run :s2 to see the stream output:

:s2

The figure below illustrates the elements of the time stamped stream :s2:

Conditions on multi-valued stream elements

Let's create a predicate that thresholds the :s2 on the first index of the time stamped vectors in the stream. This will be convenient when we illustrate the output of twinagg() for multi-valued streams. Since :s2 produces a timestamped stream of vectors, the predicate passed to twinagg() needs to take a Timeval of Vector as first argument.

create function threshold_multi(Timeval of Vector tv, Vector p) -> Boolean
as select v[1] > p[1]
from Vector v
where v = value(tv)

Here we use the same parameter vector p for the threshold as before, but instead of comparing with the value of the entire time stamped stream element tv as we did previously, we now compare with the first index of the vector time stamped by tv.

We still use the threshold [6] and time interval of 0.5 seconds for our window. The figure below illustrates how twinagg() behaves at time 08:06:39.549:

The first element in v (i.e. 6) is not larger than the first element in p (also 6). Therefore twinagg() has not formed a full result window yet. However, if we look at the result at time 08:06:39.660 when v is [7, 500, 0]:

The first element in v (7) is now larger than the first element in p (6). Therefore the vectors from the last 0.5 seconds are temporal windows.

Run twinagg() on :s2 with window size 0.5 and predicate threshold_multi() and verify that you get the expected result:

twinagg(:s2, .5, #'threshold_multi', [6])

We see that each time the first element in the vector stream is greater than 6, twinagg() returns a window (i.e. vector) of the vectors from the last 0.5 seconds.

And that concludes this guide on how to use twinagg() to get predicate windows from streams.