Skip to main content

Streams

One important feature of OSQL is to enable processing functions and filters over streams. In the simplest case is is an expression returning a stream.

The function heartbeat(Real pace)->Stream of Real returns an infinite stream of numbers representing the time elapsed from its start every pace seconds.

Example:

heartbeat(0.5) 

The function heartbeat() is a stream generator, i.e. a function that returns a stream as an object. When you enter a stream object to the REPL it will run the stream and display each extracted stream element as in the example above.

The stream generator diota(pace,l,u) returns a finite stream of the integers between l and u with pace delays in-between.

Example:

diota(0.5,1,5)

The function extract(Stream s)->Bag of Object runs s to extract the elements in s. The extracted elements can then be used as a regular bag in set queries by using the in operator.

Example: The following set query continuously returns sine values extracted from the heartbeat stream:

select sin(x)
from Real x
where x in extract(diota(0.5,1,5))

The query is an example of a continuous query (CQ) since it continuously produces objects until you explicitly stop it.

Note

The in operator can be applied directly on a stream s as well, but this will be deprecated. Thus the legacy notation e in s is the same as e in extract(s). For clarity it is recommended to use the format e in extract(s) to indicate the side effect of first running s to extract its elements.

A function taking a stream as argument to produce another stream as result is called a stream function. For example, the stream function first_n(Stream s,Real n)->Stream returns a finite stream of the n first elements in s.

Example:

first_n(heartbeat(0.125),5)

The stream function timeout(s,t) runs the stream s for t seconds.

Example:

timeout(heartbeat(0.125),1.1)

The stream function skip(s,n) skips the n first elements in the stream s.

Example:

skip(heartbeat(1),1)

The stream function changed(s) returns a stream where successive duplicated elements have been removed.

Example: The duplicated elements in the stream floor(heartbeat(0.25)*2.5) have been eliminated in this query:

select changed(floor(heartbeat(0.25)*2.5)) limit 7

Stream elements are enumerated starting with number 1. The stream function section(s,b,e) returns a stream of the elements in stream s starting with element number b and ending with element number e.

section(heartbeat(0.125),3,5)

Synthetic streams

Synthetic stream generators are functions returning streams whose elements are generated by some algorithm rather than originating in some physical measurements. In the documentation synthetic streams are used for illustrating the behavior of the system. They are furthermore very useful for generating simulated data streams to test models without accessing sensors and other external data sources.

A trivial example of a synthetic stream generator is the function siota(Integer l, Integer u)->Stream of Integer that generates a finite stream of integers between l and u. It is used in throughout the documentation to produce simple predictable streams that illustrate the behavior of other stream functions.

Similarly, the stream generator diota(Real pace, Integer l, Integer u)->Stream of Integer returns a finite stream of integers between l and u with a delay of pace in-between produced elements.

Example: This query generates a finite real-time sine wave:

//plot: Line plot
sin(diota(0.01,1,100)*3.14/50)

The stream generator heartbeat(Real pace)->Stream of Real is an important built-in synthetic stream generator that can be used for defining other real-time stream generators.

Example: This stream generator function returns a real-time infinite sine wave:

create function rsin(Real pace, Real multiplier) -> Stream of Real
as sin(heartbeat(pace)*multiplier)

Let's try it:

//plot: Line plot
rsin(0.01,10)
Exercise

Vary the parameters of rsin() and see what happens.

In general, the built-in library of trigonometric functions is very useful for generating synthetic streams. The system function simstream(Number pace) is such a synthetic stream generator that returns a simulated stream of numbers with given pace based on sine.

Example:

//plot: Line plot
simstream(0.1)

It is defined as simsig(heartbeat(pace)), thus calling the function simsig(x) each x seconds after its start at the given pace. This is an example of how to generate a synthetic stream.

Exercise

Inspect the definitions of simstream() and simsig() to see how they are defined.

The stream generator randstream(Real l, Real u)->Stream of Real generates a stream of random numbers between l and u.

Example:

//plot: Scatter plot
select randstream(0,1)
limit 1000

The guide Generate simulated audio shows how to generate synthetic audio streams using trigonometric functions.

Select stream queries

A select stream query produces a new stream where elements fulfilling user defined conditions are filtered out. For example, the following CQ filters out the stream elements of simstream(0.1) larger than 1 visualized as a line plot:

//plot: Line plot
select Stream of x
from Real x
where x in extract(simstream(0.01))
and x > 1

The CQ generates a new stream of the selected stream elements. Notice how the result stream pauses (slows down) once in a while when when elements less than 1 are produced by simstream(0.01).

Exercise

Try running:

select Stream of x
from Real x
where x in extract(heartbeat(0.5))
and x < 1

Why does it stop?

You can define new stream functions as select stream queries.

Example: This stream function computes a stream of the square roots of the elements in its argument stream s:

create function sqrt_stream(Stream of Integer s) -> Stream of Real
as select Stream of sqrt(e)
from Integer e
where e in extract(s)
sqrt_stream(diota(0.5,1,5))

Several objects can be grouped together in select stream queries producing streams of tuples.

Example:

select Stream of e,e*10
from Integer e
where e in extract(diota(0.5,1,5))

You can define stream functions producing streams of tuples.

Example:

create function mult_stream(Stream of Integer s, Integer i)
-> Stream of (Integer,Integer)
as select Stream of e,e*i
from Integer e
where e in extract(s)
mult_stream(diota(0.5,1,5),10)

Stream windows

It is often necessary to operate on streams of finite sections of the latest elements of a stream, called stream windows, e.g. a stream of the latest 10 elements in an infinite stream. In OSQL windows are represented as vectors. There are several built-in functions for forming such streams of windows.

Tumbling and sliding windows

In general the function winagg(s, sz, str) forms a stream of windows over a stream s where each window has size sz elements. The third parameter str is called the stride and defines how many stream elements the window moves forward over the stream.

Example:

winagg(heartbeat(0.5),2,2)

In the example above the size and the stride are both 2, meaning that once a window of 2 elements is produced a new one is started to be formed. This is called a tumbling window.

If the stride is smaller than the size, new overlapping windows will be produced more often. This is called a sliding window.

Example:

winagg(heartbeat(0.5),2,1)
Exercise

Run query returning sliding window over heartbeat(0.5) with size 4 and stride 2.

Bar plots can be used for running visualizations of numerical windows.

Example:

//plot: Bar plot
winagg(sin(heartbeat(0.1)*5),10,2)

The query produces a stream of vectors (i.e. windows) by collecting into the vectors 10 elements at the time from the stream sin(heartbeat(0.1)*5) slided with stride 2.

Since heartbeat(0.1) produces a number 10 times per second and the windows contain 10 elements displayed 10/2=5 times, the result from the expression is a stream of vectors produced once per second, i.e. with pace 5 HZ.

Stream aggregation

Since streams can be infinite, the computation of aggregated values over streams works differently from applying aggregate functions on bags, vectors or arrays, where a single value is aggregated over all elements in a collection. For example, summing all values in the stream heartbeat(0.5) would not be computable since heartbeat(0.5) is infinite and a regular application of sum would never return.

Therefore, when regular aggregate functions are applied on streams, they are applied on each element in the stream.

In the following example, the aggregate function sum is applied on the vectors representing tumbling windows of size 2 in the heartbeat(0.5) stream:

sum(winagg(heartbeat(0.5),2,2))

Moving average

A common way to reduce noisy signals is to form the moving average of sliding or tumbling windows over a stream of measurements by computing the average avg(w) for each window w.

Example:

//plot: Line plot
avg(winagg(simstream(0.01),10,5))
Exercise

As an alternative, try using median() instead of avg().

This CQ returns the moving averages larger than 0.7 of windows of simstream(0.01):

//plot: Line plot
select Stream of x
from Real x
where x in extract(avg(winagg(simstream(0.01),5,5)))
and x > 0.7

Here you will notice rather long pauses.

Stream aggregate functions

A stream aggregate function returns an aggregated value for every element in the stream it is applied on. It takes a stream as argument and returns a stream of aggregated values.

For example, the stream aggregate function rsum(Stream of Number s)->Stream of Number returns the running sum of the numbers in s as the sum of the elements in s up to the current element.

Example:

rsum(heartbeat(0.5))

The reduce function can be used for defining your own stream aggregate functions. If reduce is applied on a stream s it will returns an aggregated stream as the value of the the reductor for each element in s. The definition of the reductor is the same as for other reduce functions.

For example, the function myrsum is the same as rsum:

create function myrsum(Stream of Real s)->Stream of Real
as reduce(s, '+');
Exercise

Define the running product myrprod.

Time stamped streams

The function ts_simstream(pace) produces a stream of numbers where each element is time stamped with the wall time when it was produced. This is called a time stamped stream.

Example:

//plot: Line plot
ts_simstream(0.01)
info

Notice that the X-labels indicate the times when each element was produced. If you wait a while you will see how the stream start scrolling to the left.

Time stamped streams can be defined by time stamping the elements x of a stream's result by calling the function ts(x). For example, the following query returns the same timestamped simulated stream of numbers as ts_simstream(0.01):

//plot: Line plot
select Stream of ts(x)
from Real x
where x in extract(simstream(0.01))

In general, the function ts(x) time stamps any kind of object. It returns a time stamped object having the value x (see Time).

Exercise

Visualize the four first elements of the time stamped stream as text.

The function sample_stream(x, pace) returns an infinite stream of the expression x evaluated every pace seconds, for example:

sample_stream("Hello stream",1)

The following CQ returns a visualized stream of time stamped random numbers:

//plot: Scatter plot
sample_stream(ts(rand(100)),0.1)

Time stamped windows

One can time stamp entire windows using the ts(x) function.

For example, the following query produces a stream of time stamped sliding windows each having 4 elements with 50% overlap:

ts(winagg(heartbeat(0.25),4,2))

Individual stream elements in windows can also be time stamped using ts(x).

Example:

winagg(ts(heartbeat(0.25)),2,2)
Exercise

Time stamp both elements and windows in the query above. How does the time stamp of each window relate to the time stamps of its elements?

Temporal windows

The kind of windows discussed so far are called counting windows in that they produce window vectors based on counting the incoming stream elements. This is a very efficient and simple method to form windows, e.g. for continuously computing statistics over windows of arriving data, as the moving average above. It works particular well if the elements arrive at a constant pace.

If the elements arrive irregularly one may need to form temporal windows whose sizes are based on elements arriving during a time period rather than on the number of arriving elements as for counting windows.

Temporal windows are formed with the OSQL function twinagg(ts, size, stride) that takes a timestamped stream of objects as input and produces a time stamped stream of vectors of objects as result. The parameters size and stride are here measured in seconds rather than number of elements as winagg().

Example:

select tv
from Timeval of Vector of Real tv
where tv in extract(twinagg(ts_simstream(0.01),0.5,0.5))
limit 10

The following query computes the mean and median of simstream with pace 100HZ each 1/2 second:

//plot: Line plot
select avg(v), median(v)
from Timeval of Vector of Real tv, Vector of Real v
where tv in extract(twinagg(ts_simstream(0.01),0.5,0.5))
and v = value(tv)
limit 10

Notice that you can apply any Vector function on v.

For more on windows over stream see Windowing.

Bus streams

It is common that several streams from a source are duplexed into a single bus stream whose element are pairs (id,value) representing the latest value observed for a sensor identified by id:

Example: The following stream :s simulates a bus stream for three sensors with identifiers 'S0','S1' and 'S2'.

set :s1 = (select Stream of 'S'||mod(i,3), i*10
from Integer i
where i in range(10))
:s1

We use the term signal to denote labels of sensors. To select the observations in a bus stream for one signal can be done with a simple select stream query.

Example: This query returns a stream of readings for signal 'S1':

select Stream of v
from Integer v
where ('S1',v) in extract(:s1)

Usually computations are made from more than one senor in a bus stream. The function pivot_bus(Vector keys, Stream s)->Stream of Vector produces a stream of vectors of the latest readings from the signals keys in the bus stream s. Such a stream of vectors is called a signal stream. The function pivots a bus stream into a signal stream.

If [S1,...,Sk][S_1,...,S_k] are the signals in keys, the elements in each signal stream will be vectors [V1,...,Vk][V_1,...,V_k] where ViV_i is the latest value of signal SiS_i.

Example: This query returns a signal stream of the readings for signals 'L0' and 'L1':

pivot_bus(['S0','S1'],:s1)

Notice that first element of the signal stream is [null,10]. The reason is that the first element in :s1 is the pair ('S1',10) indicating that signal S1 has the value 10. At that point signal S0 is unknown.

A common case is to provide default defaults for missing signal values. The function defaults(Vector v, Vector d) replaces unknown values in v with the corresponding element in d.

Example::

defaults([null,10,null],[1,2,3])

We can use defaults to replace unknown values in each signal stream vector.

Example:

defaults(pivot_bus(['S0','S1'],:s1),[0,1])

Combining windows

There are a number of stream function to combine several windows. They are useful if you are working with streams of several signals and would like to combine and compare them.

Merging streams

The function merge(Vector of Stream vs)->Stream creates a new stream from the streams in vs. The elements of the merged stream are taken from each of the incomong streams in vs as the arrive.

Example:

merge([diota(0.2,1,6), diota(0.5,1,5)*10])

The elements of the first stream are produced with pace 0.2 seconds, while the elements of the second stream are produced with pace 0.5 second.

The merge() function is used when equivalent elements are retrieved from different source streams. Often it is required to tag the elements of the merged stream with source identifiers. This can be done by merging steams of pairs (s,v) where v is the value from source s.

Example: First we define a function to label elements in a stream of integers with source identifiers:

create function labeled_stream(Charstring signal, Stream of Integer s)
-> Stream of (Charstring, Integer)
as select Stream of signal, e
from Integer e
where e in extract(s)
labeled_stream('A',diota(0.2,1,6))

Then we can merge two streams from simulated sources labeled 'A' and 'B' by:

set :comb = merge([labeled_stream('A',diota(0.2,1,6)),
labeled_stream('B',diota(0.5,1,5)*10)]);
:comb;

This is actually a bus stream of source 'A' and 'B'. We transform it into the corresponding signal stream with:

set :p = pivot_bus(['A','B'], :comb)
:p

We can time stamp the source vectors with:

ts(:p)

Pivoting streams

The function pivot(Vector of Stream vs)->Stream of Vector merges a vector of streams VSVS into a pivoted stream of vectors PSPS where each element PSiPS_i is the most recently received value from some stream VSiVS_i in VSVS.

Example: The following query merges two different streams:

pivot([diota(0.2,1,6), diota(0.5,1,5)*10])
info

Notice that the first vector contains a null value. This is because the pivoted stream had not received any value from the second stream when it received the first value from stream 1. This can be avoided by applying defaults on the result.

defaults(pivot([diota(0.2,1,6), diota(0.5,1,5)*10]), [0,0])

The following query returns a stream of the currently largest elements in the incoming streams:

select max(pivot([sin(heartbeat(0.25)), sin(heartbeat(0.5))]))
limit 15

pivot() makes it possible to identify the origin of each element of the result stream by accessing elements of each pivoted result stream element.

Example: The following query returns a stream where each element contains both the index of the incoming stream producing the maximum value and the value itself:

select argmax(m), max(m)
from Vector of Real m
where m in extract(pivot([sin(heartbeat(0.25)), sin(heartbeat(0.5))]))
limit 15

Let's time stamp the elements:

select ts([argmax(m), max(m)])
from Vector of Real m
where m in extract(pivot([sin(heartbeat(0.25)), sin(heartbeat(0.5))]))
info

When time stamping query result tuples that have several values they must be placed in vectors.

Now let's pivot two simulated streams:

select v
from Stream of Vector pivot, Vector of Stream vs, Vector v
where vs = [1+simstream(0.02), 2+simstream(.2)]
and pivot = pivot(vs)
and v in extract(pivot)
limit 14
select v
from Stream of Vector pivot, Vector of Stream vs, Vector v
where vs = [1+simstream(0.02), 2+simstream(.2)]
and pivot = defaults(pivot(vs), zeros(dim(vs)))
and v in extract(pivot)
limit 14

The following is an example of a more advanced stream pivot:

select pivot
from Stream of Vector pivot,
Vector of Stream vs,
Stream heartbeat_mod,
Stream fast_heartbeat_mod
where vs = [
simstream(0.02),
3+simstream(.2),
heartbeat_mod,
fast_heartbeat_mod
]
and heartbeat_mod = 1 - 2*mod(heartbeat(1), 2)
and fast_heartbeat_mod = mod(round(10*heartbeat(.1)), 2)
and pivot = defaults(pivot(vs), zeros(dim(vs)))
limit 14

Arithmetics on combined streams

If you want to run arithmetics on pivoted streams you first need to extract the vectors from the pivoted stream. This is done with the line v in extract(sv) in the query below.

//plot: Line plot
select v[1]*v[4] + v[3]*10, v[4], v[2]
from Stream of Vector sv,
Vector of Stream vs,
Stream hbmod,
Stream hbmodfast,
Vector of Real v
where hbmod = 1 - 2*mod(heartbeat(1), 2)
and hbmodfast = mod(round(10*heartbeat(.1)), 2)
and vs = [simstream(0.02), 3+3*simstream(.2), hbmod, hbmodfast]
and sv = defaults(pivot(vs), zeros(dim(vs)))
and v in extract(sv)
limit 100

Let's smooth that v[2]

//plot: Line plot
select v[1]*v[4] + v[3]*10, v[4], avg(win)
from Stream of Vector sv,
Vector of Stream vs,
Stream hbmod,
Stream hbmodfast,
Vector of Real v,
Vector of Real win
where hbmod = 1 - 2*mod(heartbeat(1), 2)
and hbmodfast = mod(round(10*heartbeat(.1)), 2)
and vs = [simstream(0.02), winagg(3+3*simstream(.2),10,1),
hbmod, hbmodfast]
and sv = defaults(pivot(vs), zeros(dim(vs)))
and v in extract(sv)
and win = v[2]
limit 100

The following two queries seem to be defined in the same way, but they do give rise to different behavior:

//plot: Multi plot
{'sa_plot': 'Scatter plot', 'labels':['x','y'], 'memory': 1000};
select pivot([sin(10*heartbeat(.01)),
cos(10*heartbeat(.01))])
limit 150
//plot: Multi plot
{'sa_plot': 'Scatter plot', 'labels':['x','y'], 'memory': 1000};
select sin(x), cos(x)
from Real x
where x in extract(10*heartbeat(.02))
limit 50
Note

The first query differs slightly since it uses two independent streams together with pivot streams, This means that we essentially have two streams where sin and cos are applied to their own stream over time (heartbeat) before they are merged.

In the second example we apply sin and cos to the x from the same heartbeat stream.

More examples of how we can specify multiple streams in a query:

//plot: Multi plot
{'sa_plot': 'Line plot', 'memory': 200};
select pivot([sin(hb), sin(hb+pi())])
from Stream of Real hb
where hb = 10*heartbeat(.02)
limit 100
//plot: Multi plot
{'sa_plot': 'Bar plot', 'memory': 200};
select pivot(select Vector of sin(i/3 + 3*heartbeat(.03))
from Integer i
where i in range(0,29)
order by i)
limit 100

Hopefully these short examples have made you understand how you can combine streams.

Type extents and the solution domain

We explain the theory behind how select stream queries combine infinite sets of objects and show how this is used for combining infinite streams.

Every type t has an extent(t) being the set of all objects of type t. For example, the extent of the type Integer is "all integers from negative infinity to positive infinity", or {,...,2,1,0,1,2,...,}\{-\infty, ..., -2, -1, 0, 1, 2, ..., \infty\}.

A select statement first forms the Cartesian product of the extents of all variables in the from clause, called the solution domain. The where clause then specifies conditions limiting the emitted result to a subset of the solution domain.

For example, consider the select statement below. It forms a Cartesian product of the extent of i, which is the set of all possible integers, with the extent of s, which is the set of all possible strings. The where clause has two conditions. One that limits i to the set {1,2,3}\{1,2,3\}, and one that limits s to the set {"a","b","c"}\{"a","b","c"\}. This restricts the result to the Cartesian product of {1,2,3}\{1,2,3\} and {"a","b","c"}\{"a","b","c"\}.

select i, s
from Integer i, Charstring s
where i in (values 1,2,3)
and s in (values "a","b","c");

Running the above query gives the following result:

[1,"a"]
[1,"b"]
[1,"c"]
[2,"a"]
[2,"b"]
[2,"c"]
[3,"a"]
[3,"b"]
[3,"c"]

The figure below illustrates how the extents span the solution domain, and how the result is produced as the Cartesian product of the two extents limited by the conditions in the where clause:

type_extent_and_solution_domain-1_drop.png

We can limit the solution further by imposing additional conditions in the where clause, like this:

select i, s
from Integer i, Charstring s
where i in (values 1,2,3)
and s in (values "a","b","c")
and i > 2;

Running the above query gives the following result:

[3,"a"]
[3,"b"]
[3,"c"]

This time the subset of the solution space is further limited due to the extra condition on i and now looks like this:

type_extent_and_solution_domain-2_drop.png

For a stream query, let's consider binding variables to infinite subsets in the solution domain. We can do this by using streams that never end. For example, heartbeat() generates a stream of seconds emitted at a given pace. This stream is infinite since time has no end. The select statement below tries to combine two such streams, but as we will see, taking the Cartesian product between two infinite sets will not work. Try running the query:

select s1, s2
from Real s1, Real s2
where s1 in extract(heartbeat(1))
and s2 in extract(heartbeat(1));

The result should be:

[0,0]
[0,1]
[0,2]
[0,3]
.
.
.

We see that the first stream never increases because the second stream (which is evaluated first) never finishes. The figure below illustrates how the Cartesian product "travels" through the solution domain:

type_extent_and_solution_domain-3_drop.png

To combine infinite streams you must instead use the pivot() function. It combines the outputs of multiple streams into one vector and outputs a new vector each time one of the streams emits a result. The query below combines the streams from the previous example into a single stream:

select v
from Stream of Real s1, Stream of Real s2, Vector v
where s1 = heartbeat(1)
and s2 = heartbeat(1)
and v in extract(pivot([s1,s2]));

The result should be:

[0,null]
[0,0]
[1,0]
[1,1]
[2,1]
[2,2]
[3,2]
[3,3]
[4,3]
[4,4]
.
.
.

As you see, this query will continue to infinity but include the values from both streams. If we look at the subset emitted from the solution domain, we see that the query "travels" diagonally through the solution domain:

type_extent_and_solution_domain-4_drop.png

Visualizing streams

You can use Multi plot for flexible stream visualization.

You can prefix a stream query with a JSON record specifying how to visualize the result. For example, the following query is visualized by a sliding line plot over the latest 200 values:

//plot: Multi plot
{'sa_plot': 'Line plot', 'memory': 200};
select [sin(x), cos(x)]
from Real x
where x in extract(10*heartbeat(.02))

Trigonometric functions lend themselves to algebraic manipulation over streams, like this amplitude modulation example:

//plot: Multi plot
{'sa_plot': 'Line plot', 'memory': 200};
select [sin(x)*sin(x/30), cos(x)*cos(x/30)]
from Real x
where x in extract(20*heartbeat(.01))

which is more appealing in parametric coordinates (scatter plot):

//plot: Multi plot
{'sa_plot': 'Scatter plot', 'memory': 1000};
select [sin(x)*sin(x/30), cos(x)*cos(x/30)]
from Real x
where x in extract(10*heartbeat(.005))

Functions

Stream functions

Window functions

Dataflow functions

Sensor ontology functions