# Basic data reduction

Guide specification | |
---|---|

Guide type: | Wasm code |

Requirements: | None |

Recommended reading: | None |

## Introduction

In this guide we look at different ways of doing data reduction through sampling. We will illustrate the data reduction techniques on streams from the built-in stream generator function `simstream()`

. But the techniques apply just as well to data streams from real sensors.

## Input data

The function `simstream()`

takes a pace as input and emits a new value every pace seconds. We can look at the raw output from the first 1.0 seconds of `simstream(0.1)`

:

`timeout(simstream(0.1), 1.0);`

## Count-based sampling

The easiest way to reduce the data rate is to sample the output of `simstream()`

. One way of sampling is to use the function `winagg()`

. Winagg takes a `size`

and a `stride`

input. Each `stride`

readings, `winagg()`

emits a window of the last `size`

values. Choosing a `size`

less than `stride`

will a down-sampled stream:

`winagg(simstream(0.1), 1, 10);`

The preceding example samples 1 value from `simstream()`

every 10 values. The result from `winagg()`

is a vector of the last `size`

values.

Set a larger value of `size`

and see how it affects the output!

We can use vector dereference to take the first value in each result vector (vectors in OSQL are 1-indexed):

`select v[1]`

from vector v

where v in winagg(simstream(.1), 1, 10)

This was an easy way of reducing the data stream through down sampling.

## Time-based sampling

In certain applications, it is meaningful to form time-based windows. There is an OSQL function called `twinagg()`

, which is similar to `winagg()`

, but with input parameters `size`

and `stride`

specified in seconds. It emits a window of the last `size`

seen readings each `stride`

seconds. Note that `twinagg()`

requires the stream to be of a stream of `Timeval`

s, i.e., a stream of time-stamped values.

We can timestamp any stream using `ts()`

:

`timeout(ts(simstream(0.1)), 1.0);`

Now that we have a time-stamped stream we can use `twinagg()`

on the stream:

`twinagg(ts(simstream(0.1)), 1.0, 1.0);`

In the preceding example, each window contains the values emitted by `simstream()`

during the last second (`size`

=1.0), and the stride is also one second (`stride`

=1.0), so all values emitted by `simstream()`

are present in the output.

We see that `twinagg()`

forms a `Timeval`

of each result. A `Timeval`

consists of a timestamp and a value. To extract the value from a `Timeval`

we can use the `value()`

function. The following query extracts the window vector from the `twinagg()`

result and returns the first value in the vector, thereby sampling one value from the stream each second:

`select value(tsv)[1]`

from timeval of vector tsv, stream of timeval s

where s = ts(simstream(.1))

and tsv in twinagg(s, 1.0, 1.0);

We can also extract the timestamp from the `Timeval`

with the function `timestamp()`

. So if we parameterize the example above with one variable `streamrate`

and another variable `samplingrate`

we can adjust the frequency of the stream and how often the stream is sampled:

`select timestamp(tsv), value(tsv)[1]`

from Timeval of Vector tsv, stream of timeval s,

number streamrate, number samplingrate

where streamrate = .02 and samplingrate = .5

and s = streamof(ts(simstream(streamrate)))

and tsv in twinagg(s, samplingrate, samplingrate);

## Statistics over streams

Another way of reducing data is to do statistics over aggregated data. For example, we can compute the moving average over streams by aggregating measurements into windows with `winagg()`

and then computing the mean of the values in each window.

For example, the following query computes the moving average of `simstream()`

by forming a window with 20 values every time `simstream()`

emits a new value, and then computes the mean of those 20 values:

`//plot: Line plot`

timeout(mean(winagg(simstream(0.01), 20, 1)), 1.5);

We can plot what the moving average looks like compared to the original stream:

`//plot: Line plot`

timeout(zip([simstream(0.01),

mean(winagg(simstream(0.01), 20, 1))]), 1.5);

The example above computes the mean of the last 20 values (size=20) every time a new value is emitted by `simstream()`

(stride=1).

Other aggregation functions, such as `stdev()`

or `sum()`

, work the same way.

`//plot: Line plot`

select mean(v), stdev(v), sum(v)

from Vector of number v

where v in winagg(simstream(0.01),20,1);

### Vector-valued streams

A common situation is to have vector-valued streams where each value in the vector represents data from a different sensor. To be able to do statistics on the values from each sensor in this case, you have to extract each dimension as a separate stream.

Let's create a function that returns a simulated vector stream that will help us illustrate this. The following function simulates the result from three different sensors, where the values have been pivoted into a single stream of vectors with three values:

`create function my_vector_stream() -> Stream of Vector of Number`

as pivot([simstream(0.05), heartbeat(0.05), simstream(0.06)], [0,0,0]);

Since the function emits a stream of vectors, `winagg()`

will produce a window where each element in the window is a vector emitted by `my_vector_stream()`

.

Let's look at the first three windows for some windowing function:

`first_n(winagg(my_vector_stream(), 2, 2), 3);`

We see that making a window of two vectors from `my_vector_stream()`

results in a 2x3 `Vector of Vector of Number`

, which is the same as a 2x3 `Matrix`

. This means that we have a matrix where each column represents one sensor. Since we want to do statistics for each sensor, we need to transpose the result from `winagg()`

to get the values from each sensor in a separate vector.

Let's say that we have three sensors $A$, $B$, and $C$ and their output is collected as a stream of vectors $[a,b,c]$. If we apply `winagg()`

with size $N$ on this stream we get a `Vector of Vector of Number`

which is a `Matrix`

If we transpose this we get all values for each sensor in a separate vector.

This means we can now do statistics over signal $A$, $B$ or $C$ by operating on their respective vectors $T(1)$, $T(2)$ and $T(3)$.

Let's try this and compute som statistics over the sensor data from `my_vector_stream()`

. First we create a function `run_stats()`

that transforms the output from `my_vector_stream()`

to a stream of statistics over each sensor:

`create function run_stats() -> Stream of Vector of Number`

as select Stream of [mean(t[1]), mean(t[2]), mean(t[3])]

from Matrix m, Matrix t

where m in winagg(my_vector_stream(),20,1)

and t = transpose(m);

And then we plot the first five seconds of statistics from `run_stats()`

.

`//plot: Line plot`

timeout(run_stats(), 5.0);

## Conclusion

This guide has shown how to do data reduction by sampling and statistics on aggregated data. As next step we would recommend reading the Data reduction on edge devices guide where we try these concepts on a real edge device.