Skip to main content

Working with several streams in a query

This page uses Wasm code blocks so you can run the examples directly in the browser.

In this section we will go through how to use one of the stream merging functions, pivot, to combine several streams in a query. This tutorial might help you if you are working with streams of several signals and would like to combine and compare them.

Pivot streams​

The function pivot transforms a vector of streams VSVS into a pivoted stream of vectors PSPS where each element PSiPS_i is the most recently received value from some stream VSiVS_i in VSVS.

doc("pivot")

Let's start with a degenerate example where we only merge one stream:

select pivot
from Stream of Vector pivot
where pivot in pivot([simstream(.02)])

Now let's expand this example by pivoting two streams:

select v
from Stream of Vector pivot, Vector of Stream vs, Vector v
where vs = [1+simstream(0.02), 2+simstream(.2)]
and pivot = pivot(vs)
and v in pivot
limit 14
info

Notice that the first vector contains a null value. This is because the pivoted stream had not received any value from stream 2 when it received the first value from stream 1. This can be avoided by using a pivot streams variant that takes an extra argument being the initial vector in the result stream:

select v
from Stream of Vector pivot, Vector of Stream vs, Vector v
where vs = [1+simstream(0.02), 2+simstream(.2)]
and pivot = pivot(vs, zeros(dim(vs)))
and v in pivot
limit 14

The following is an example of a more advanced stream pivot:

select pivot
from Stream of Vector pivot,
Vector of Stream vs,
Stream heartbeat_mod,
Stream fast_heartbeat_mod
where vs = [
simstream(0.02),
3+simstream(.2),
heartbeat_mod,
fast_heartbeat_mod
]
and heartbeat_mod = 1 - 2*mod(heartbeat(1), 2)
and fast_heartbeat_mod = mod(round(10*heartbeat(.1)), 2)
and pivot = pivot(vs, zeros(dim(vs)))

Arithmetics on combined streams!​

If you want to run arithmetics on pivoted streams you first need to extract the vectors from the pivoted stream. This is done with the line v in ps in the query below.

//plot: Line plot
select v[1]*v[4] + v[3]*10, v[4], v[2]
from Stream of Vector sv,
Vector of Stream vs,
Stream hbmod,
Stream hbmodfast,
Vector of Number v
where hbmod = 1 - 2*mod(heartbeat(1), 2)
and hbmodfast = mod(round(10*heartbeat(.1)), 2)
and vs = [simstream(0.02), 3+3*simstream(.2), hbmod, hbmodfast]
and sv = pivot(vs, zeros(dim(vs)))
and v in sv

Let's smooth that v[2]

//plot: Line plot
select v[1]*v[4] + v[3]*10, v[4], avg(win)
from Stream of Vector sv,
Vector of Stream vs,
Stream hbmod,
Stream hbmodfast,
Vector of Number v,
Vector of Number win
where hbmod = 1 - 2*mod(heartbeat(1), 2)
and hbmodfast = mod(round(10*heartbeat(.1)), 2)
and vs = [simstream(0.02), winagg(3+3*simstream(.2),10,1),
hbmod, hbmodfast]
and sv = pivot(vs, zeros(dim(vs)))
and v in sv
and win = v[2]

The following two queries seem to be defined in the same way, but they do give rise to different behavior:

//plot: Multi plot
{'sa_plot': 'Scatter plot', 'labels':['x','y'], 'memory': 1000};
pivot([sin(10*heartbeat(.01)),
cos(10*heartbeat(.01))])
//plot: Multi plot
{'sa_plot': 'Scatter plot', 'labels':['x','y'], 'memory': 1000};
select sin(x), cos(x)
from Number x
where x in 10*heartbeat(.02)
Note

The first query differs slightly since it uses two independent streams together with pivot streams, This means that we essentially have two streams where sin and cos are applied to their own stream over time (heartbeat) before they are merged.

In the second example we apply sin and cos to the x from the same heartbeat stream.

More examples of how we can specify multiple streams in a query:

//plot: Multi plot
{'sa_plot': 'Line plot', 'memory': 200};
select pivot([sin(hb), sin(hb+pi())])
from Stream of Number hb
where hb = 10*heartbeat(.02)
//plot: Multi plot
{'sa_plot': 'Bar plot', 'memory': 200};
pivot(select Vector of sin(i/3 + 3*heartbeat(.03))
from Integer i
where i in iota(0,29)
order by i
)

Hopefully these short examples has made you understand how you can combine streams using pivot streams.

The next part of the tutorial shows how to develop data stream wrappers to enable easy access and understanding of external data streams by defining meta-data (ontologies) describing properties of the streams.