Skip to main content

Basic data reduction

Guide specification
Guide type:Wasm code
Requirements:None
Recommended reading:None

Introduction

In this guide we look at different ways of doing data reduction through sampling. We will illustrate the data reduction techniques on streams from the built-in synthetic stream generator simstream(), but the techniques apply just as well to data streams from real sensors.

Input data

The function simstream(pace) generates a stream with a new simulated numerical element every pace seconds. We can look at the stream from the first 1.0 seconds of simstream(0.1):

timeout(simstream(0.1), 1.0)

Count-based sampling

The easiest way to reduce the data rate is to sample the output of simstream(pace). One way of sampling is to use the function winagg(s,size,stride). For each stride elements of stream s, winagg() returns a temporal window as a vector of the last size elements. Choosing a window size less than the stride will return a down-sampled stream window based on selecting every 10th element.

winagg(simstream(0.1), 1, 10)

The preceding example samples from simstream() a window with one element every ten elements. We use vector indexing to get the first element in each result stream window:

select Stream of v[1]
from Vector v
where v in winagg(simstream(.1), 1, 10)
Exercise

Set a larger value of size and see how it affects the output!

This was an easy way of reducing the data stream through down sampling.

Time-based sampling

In certain applications it is meaningful to sample elements in temporal windows rather than the above counting windows formed by winagg(). The built-in stream function twinagg(s,size,stride) is similar to winagg(s,size,stride), but the input parameters size and stride are specified in seconds rather than number of stream elements. The function returns a stream of time stamped windows containing the last size elements in stream s each stride seconds. Note that twinagg() requires the stream s to be of a time stamped stream.

We can timestamp any stream using ts():

timeout(ts(simstream(0.1)), 1.0)

Now that we have a time-stamped stream we can use twinagg() on the stream:

twinagg(ts(simstream(0.1)), 1.0, 1.0)

In the preceding example, each window contains the elements in simstream(0.1) received each second (size=1.0), and the stride is also one second (stride=1.0), so all elements in simstream(0.1) are present in the output, which is called a temporal tumbling window.

We see that twinagg(s,size,stride) forms a stream of temporal windows of the elements in s. A temporal window consists of a time stamp and a vector that represents the elements of the window. To get the window elements we use the value() function. The following query extracts the window elements from the twinagg() result and returns the first element in the window vector, thereby sampling one element from the stream each second:

select value(tsv)[1]
from Timeval of Vector tsv, Stream of Timeval s
where s = ts(simstream(.1))
and tsv in twinagg(s, 1.0, 1.0)

We can extract the timestamp from the time stamped window with the function timestamp(). So if we parameterize the example above with the variables streamrate and samplingrate we can adjust the frequency of the stream and how often the stream is sampled:

select timestamp(tsv), value(tsv)[1]
from Timeval of Vector tsv, Stream of Timeval s,
Number streamrate, Number samplingrate
where streamrate = .02 and samplingrate = .5
and s = ts(simstream(streamrate))
and tsv in twinagg(s, samplingrate, samplingrate);

Statistics over streams

Another way of reducing data is to do statistics over aggregated data. For example, we can compute the moving average over streams by aggregating measurements into windows with winagg() and compute the mean of the elements in each window.

For example, the following query computes the moving average of simstream() by forming a sliding window with 20 elements every 10th time simstream() and compute the mean of those 20 elements:

//plot: Line plot
timeout(mean(winagg(simstream(0.01), 20, 10)), 1.5);

We can plot what the moving average looks like compared to the original stream:

//plot: Line plot
timeout(zip([simstream(0.01),
mean(winagg(simstream(0.01), 20, 10))]), 1.5);

The example above computes the mean of the last 20 elements (size=20) for every 10th elements in simstream() (stride=10).

Other aggregation functions, such as stdev() or sum(), work the same way.

//plot: Line plot
select mean(v), stdev(v), sum(v)
from Vector of number v
where v in winagg(simstream(0.01),20,10);

Vector streams

A common situation is to have vector streams where each element in a vector represents data from a different sensor. To be able to do statistics on the measurements from each sensor in this case, you have to extract each dimension as a separate stream.

Let's create a function that returns a simulated vector stream that will help us illustrate this. The following function simulates the result from three different sensors, where the elements have been pivoted into a single stream of vector elements having three values:

create function my_vector_stream() -> Stream of Vector of Number
as pivot([simstream(0.05), heartbeat(0.05), simstream(0.06)], [0,0,0]);

Since the function returns a stream of vectors, winagg() will produce a stream of window vectors where each element in the windows is a vector of the three values in the elements of my_vector_stream().

Let's look at the first three windows for some windowing function:

select winagg(my_vector_stream(), 2, 2)
limit 3

We see that making a window of two vectors from my_vector_stream() results in a 2x3 Vector of Vector of Number, which is the same as a 2x3 Matrix. This means that we have a matrix where each column represents one sensor. Since we want to do statistics for each sensor, we need to transpose the result from winagg() to get the elements from each sensor in a separate vector.

Example

Let's say that we have three sensors AA, BB, and CC and their output is collected as a stream of vectors [a,b,c][a,b,c]. If we apply winagg() with size NN on this stream we get a Vector of Vector of Number which is a Matrix

M=[[a1b1c1][a2b2c2][aNbNcN]]M = \begin{bmatrix} [a_1 & b_1 & c_1] \\ [a_2 & b_2 & c_2] \\ & \vdots & \\ [a_N & b_N & c_N] \end{bmatrix}

If we transpose this we get the elements of each sensor in a separate vector.

T=transpose(M)=[[a1a2...aN][b1b2...bN][c1c2...cN]]\mathrm{T = transpose}(M) = \begin{bmatrix} [a_1 & a_2 & ... & a_N] \\ [b_1 & b_2 & ... & b_N] \\ [c_1 & c_2 & ... & c_N] \end{bmatrix}

This means we can now do statistics over signal AA, BB or CC by operating on their respective vectors T(1)T(1), T(2)T(2) and T(3)T(3).

Let's try this and compute some statistics over the sensor measurements from my_vector_stream(). First we create a function run_stats() that transforms the output from my_vector_stream() to a stream of statistics over each sensor:

create function run_stats() -> Stream of Vector of Number
as select Stream of [mean(t[1]), mean(t[2]), mean(t[3])]
from Matrix m, Matrix t
where m in winagg(my_vector_stream(),20,1)
and t = transpose(m);

Then we plot the first five seconds of statistics from run_stats().

//plot: Line plot
timeout(run_stats(), 5.0);

Conclusion

This guide has shown how to do data reduction by sampling and by statistics over aggregated data. As next step we would recommend reading the Data reduction on edge devices guide where we try these concepts on a real edge device.