Skip to main content

Aggregating data streams

Introduction

In the previous chapter Basic data reduction we covered how to do data reduction through sampling. Another way of reducing data is to do data aggregation. Here we will cover how to aggregate data streams to compute statistics.

Different stream formats

Streaming data can come in different formats.

Single-valued data streams

We can have simple data streams which is just a stream of values.

first_n(simstream(0.01), 10);

These type of data streams can be used for a variety of purposes such as development, testing, receiving data from sensor, sending data between threads in a model, etc. The single value is not limited to be a number data type, it can be a vector, array, string, etc depending on the source of the stream.

Multi-valued data streams

Then we have multi-valued streams where the stream is a vector containing values from different sources.

first_n(pivot([simstream(0.02),heartbeat(0.02)]),10);

The output format is the same as for a simple data stream of vectors, but the important difference is that for multi-valued streams the elements come from different sources.

These streams are good for when you are interested in the current state of data from different sources, for example when monitoring sensor values. Multi-valued streams are not suitable for aggregation of data since each new value from a source stream will also emit the most recent values from all other streams as well. Instead, for aggregation of values it is recommended to use bus streams.

Bus streams

Bus streams are common when working with message busses where multiple sensors or signals communicate their values using messages on the same bus (e.g., CAN bus).

The data format for a bus stream is as follows

[<time>, <topic>, <payload>]

where <time> is a timestamp, <topic> is the name of the signal, sensor, or message, and <payload> is the data in the message (e.g., a value or a vector).

Here is an example of a bus stream with two topics, "s" and "t", sharing the same bus.

first_n(
(select merge([s,t])
from stream s, stream t
where s = (select stream of [timeval(hb), "s", hb] // stream "s"
from real hb
where hb in heartbeat(0.1))
and t = (select stream of [timeval(hb), "t", hb] // stream "t"
from real hb
where hb in heartbeat(0.2))), 10);

Timestamped streams

Timestamped streams is not really a separate data stream format but it is such a foundational concept that it has to be mentioned. It is any type of data stream wrapped in a Timeval object which includes a timestamp.

first_n(ts(simstream(0.01)), 10);

Timestamped streams are vital when working with timeseries data. Wrapping streams in a Timeval object will facilitate creating windows over timeseries data which has a wide range of uses, such as statistics over time series data and logging of historical data.

Aggregating single-value streams

Aggregating data from single-value streams is very simple. For example, we can compute the moving average over streams by aggregating the values into windows with winagg() and compute the mean of the elements in each window.

The following query computes the moving average of simstream() by forming a sliding window with 20 elements every 10th element from simstream() and computes the average of those 20 elements:

//plot: Line plot
first_n(avg(winagg(simstream(0.01), 20, 10)), 10);

Other aggregation functions, such as stdev() or sum(), work the same way.

//plot: Line plot
first_n(
(select avg(v), stdev(v), sum(v)
from Vector of number v
where v in winagg(simstream(0.01),20,10)),10);

You can also use the function reduce as described in the chapter Streamed data reduction in the OSQL tutorial.

Aggregating multi-valued streams

As mentioned in Multi-valued data streams, these streams are not suitable for aggregation since each new value from one stream outputs the most recent values from all other streams.

For example, if we combine the following two streams with pivot where heartbeat emits one value every 0.02 seconds (0, 0.02, 0.04, 0.06, and 0.08)

first_n(pivot([simstream(0.02),heartbeat(0.02)]),10);

we see that whenever we get a new value from simstream we also get a copy of the most recent value from heartbeat, and vice versa. If we make windows of this stream to aggregate values the multiple copies of values will affect the aggregation. Instead, for aggregation purposes it is recommended to combine the source streams into bus streams.

The following example shows how the two streams are combined into a bus stream, where "s" is the simstream and "t" is the heartbeat.

first_n(
(select merge([s,t])
from stream s, stream t
where s = (select stream of [now(), "s", ss]
from real ss
where ss in simstream(0.2))
and t = (select stream of [now(), "t", hb]
from real hb
where hb in heartbeat(0.2))), 10);

When executing the above query we see that we get a bus stream with only one instance of each emitted value and no duplicates.

How to do aggregation of bus streams is covered in the next section.

Aggregating bus streams

To aggregate bus streams we create windows over the streams and then aggregate the data in each window, similar to what we did in the section Aggregating single-value streams.

When working with timeseries data it is often of interest to aggregate data over specific time periods. For example, computing the rolling 2-second average of a sensor stream, or the standard deviation for the last 10 seconds of of a sensor.

This can be achieved by putting together the following pipeline.

[bus stream] -> [timestamp stream] -> [time windows] -> [aggregate values by topic]

We will explain each part of the pipeline in detail below.

Step 0 - create a synthetic bus stream

First we define a synthetic bus stream to use in the examples. The following bus stream combines one stream "s" which ouputs a value from heartbeat every 0.1 seconds, and one stream "t" which outputs a value from heartbeat every 0.2 seconds.

create function bus_test_stream() -> stream of vector
/* Function used to generate the data in test_stream() */
as select merge([s,t])
from stream s, stream t
where s = (select stream of [now(), "s", hb] // stream "s"
from real hb
where hb in heartbeat(0.1))
and t = (select stream of [now(), "t", hb] // stream "t"
from real hb
where hb in heartbeat(0.2));

We can inspect the first values in the stream.

first_n(bus_test_stream(),10);

We see that "s" emits a value every 0.1 seconds and "t" emits a value every 0.2 seconds.

Step 1 - wrap bus stream in timeval

To be able to create time-based windows we need each value in the bus stream to be wrapped in a timeval. The first element in each bus stream output vector has time information that is used as input for the timeval.

create function timeval_stream(stream of vector bus_stream)
-> Stream of timeval of vector
/* Wrap the bus stream in timevals */
as select stream of ts(t, v)
from vector v, real t
where v in bus_stream
and t = v[1];

If we run the bus stream through the function we see that each message on the bus gets wrapped in a timeval.

first_n(timeval_stream(bus_test_stream()),10);

Step 2 - create windows

To do the aggregation we need to have the data on windowed format. This is achieved with the twinagg function which creates timebased windows over data streams.

The following function creates a tumbling window of size t seconds every t seconds.

create function windowed_bus_stream(stream of vector bus_stream, real t)
-> stream of timeval of vector
/* Create time windows of a bus stream*/
as twinagg(timeval_stream(bus_stream), t, t);

If we run it on our synthetic bus stream we can see each window in the output. The following call creates 1-second windows of our synthetic bus stream.

first_n(windowed_bus_stream(bus_test_stream(), 1.0),3);

Step 3 - aggregate values by topic

The next step is to aggregate the values in each window by topic.

If we look at a single window in the windowed bus stream the structure is the following

ts(<timestamp>, [[<timestamp>, <topic>, <value>]
[<timestamp>, <topic>, <value>]
[<timestamp>, <topic>, <value>]
[<timestamp>, <topic>, <value>]
[<timestamp>, <topic>, <value>]
[<timestamp>, <topic>, <value>]])

To aggregate the values by topic we need to extract the vectors and drop the time information so we only have the topics and values left.

To illustrate this we can take the first timeval from Step 2, extract the vectors using the value function, and drop the timestamp by only returning topic and value.

set :window = ts(|2025-12-12T09:58:15.565Z|, [[|2025-12-12T09:58:14.565Z|,'s',0],
[|2025-12-12T09:58:14.565Z|,'t',0],
[|2025-12-12T09:58:14.667Z|,'s',0.1],
[|2025-12-12T09:58:14.772Z|,'t',0.2],
[|2025-12-12T09:58:14.772Z|,'s',0.2],
[|2025-12-12T09:58:14.878Z|,'s',0.3],
[|2025-12-12T09:58:14.970Z|,'t',0.4],
[|2025-12-12T09:58:14.970Z|,'s',0.4],
[|2025-12-12T09:58:15.078Z|,'s',0.5],
[|2025-12-12T09:58:15.170Z|,'s',0.6],
[|2025-12-12T09:58:15.170Z|,'t',0.6],
[|2025-12-12T09:58:15.278Z|,'s',0.7],
[|2025-12-12T09:58:15.369Z|,'t',0.8],
[|2025-12-12T09:58:15.369Z|,'s',0.8],
[|2025-12-12T09:58:15.474Z|,'s',0.9]]);

select [topic, value]
from Real time, Charstring topic,
Real value, Vector v
where v in value(:window)
and [time, topic, value] = v;

Now we have all values from each topic. Now we just need to aggregate them which we can do by using the group by construct in our query. group by requires that we use some aggregation function so we get one result per topic.

For example, we can use the avg function to compute the average over each topic in the window.

create function compute_mean(timeval of vector of vector window) -> vector
as select vector of topic, avg(value)
from Real time, Charstring topic,
Real value, Vector v
where v in value(window)
and [time, topic, value] = v
group by topic;

If we run this function with our window as input we get the average value for each topic over the window.

compute_mean(:window);

So now we have aggregated the values for each topic over the window. Now we only need to put the different pieces together to get aggregation over a bus stream.

Step 4 - putting it all together

We have previously made a function windowed_bus_stream for making time-based windows of a bus stream. And in the previous step we made a function compute_mean that aggregates values over a single window (and computes the mean).

Now we create a function window_stats that takes a single window from our windowed_bus_stream and calls compute_mean on that window.

create function window_stats(timeval of vector of vector window) -> Timeval
/* Compute mean for each topic and return result as timeval with timestamp
from window */
as select ts(window, v)
from vector v
where v = compute_mean(window);

Now we can create a function topic_means that takes a bus stream, forms windows over the bus stream with windowed_bus_stream, and computes the average for each topic by calling window_stats on the window.

create function topic_means(stream of vector bus_stream, real t) -> Stream of Timeval
/* Compute a stream of means for every topic in the bus stream */
as select stream of window_stats(w)
from timeval of vector of vector w
where w in windowed_bus_stream(bus_stream, t);

We can now test topic_means on our synthetic bus stream to create 1-second averages over our topics "s" and "t".

first_n(topic_means(bus_test_stream(),1.0), 5);

This completes the whole pipeline and shows how you can aggregate values over bus streams by using time-based windows and group by.