Skip to main content

Queries over infinite streams

One important feature of the OSQL language is to enable processing functions and filters over streams. A stream is a possibly infinite sequence of objects. It often grows continuously over time at some pace.

Continuous queries

The function heartbeat(pace) returns a stream of numbers representing the time elapsed from its start every pace seconds.

Example:

heartbeat(0.5)
Note

The query is an example of a continuous query (CQ) since it continuously produces an infinite stream of elements until you explicitly stop it by pressing the stop button.

The in operator can be used to extract element from a stream.

Example:

select sqrt(x)
from Real x
where x in heartbeat(0.5)
Note

heartbeat is a stream generator, i.e. a function that returns a stream as an object. The in operator extracts the elements from a stream. If a simple function like sqrt or operator like + is applied on a stream generator it will be applied on each element of the stream thus generating a transformed stream as in the example.

You can stop an infinite CQ by specifying a limit in a select query, for example:

select heartbeat(0.125) limit 5

The stream generator diota(pace,l,u) returns a stream of the integers between l and u with ´pace´ delays in-between.

Example:

select diota(0.5,1,10) limit 5

A function taking a stream as argument is called a stream function. For example, the stream function first_n(s,n) returns a finite stream of the n first elements in the stream s.

Example:

first_n(heartbeat(0.125),5)

The function timeout(s,t) runs the stream s for t seconds.

Example:

timeout(heartbeat(0.125),1.1)

The function skip(s,n) skips the n first elements in the stream s.

Example:

skip(heartbeat(1),1)

The stream function changed(s) returns a stream where successive duplicated elements have been removed.

Example:

This stream contains duplicated elements:

select floor(heartbeat(0.25)*2.5) limit 7

The duplicated elements have been eliminated in this stream:

select changed(floor(heartbeat(0.25)*2.5)) limit 7

Stream elements are enumerated starting with number 1. The stream function section(s,b,e) returns a stream of the elements in stream s starting with element number b and ending with element number e.

section(heartbeat(0.125),3,5)

Synthetic streams

Synthetic stream generators are functions returning streams whose elements are generated by some algorithm rather that originating in some physical measurements. In the documentation synthetic streams are used for illustrating the behavior of the system. They are furthermore very useful for generating simulated data streams to test models without accessing sensors and other external data sources.

A trivial example of a synthetic stream generator is the function siota(Integer l, Integer u)->Stream of Integer that generates a finite stream of integers between l and u. It is used in throughout the documentation to produce simple predictable streams that illustrate the behavior of other stream functions.

Similarly, the function diota(Real pace, Integer l, Integer u)->Stream of Integer generates a predictable finite stream of integers between l and u with a delay of pace in-between produced elements.

Example: This function generates a finite real-time sine wave:

//plot: Line plot
sin(diota(0.01,1,100)*3.14/50)

The function heartbeat(Real pace)->Stream of Real is an important built-in synthetic stream generator that can be used for defining other real-time stream generators.

Example: This stream generator returns a real-time infinite sine wave:

create function rsin(Real pace, Real multiplier) -> Stream of Real
as sin(heartbeat(pace)*multiplier)

Let's try it:

//plot: Line plot
rsin(0.01,10)
Exercise

Vary the parameters of rsin() and see what happens.

In general, the built-in library of trigonometric functions is very useful for generating synthetic streams. The system function simstream(Number pace) is such a synthetic stream generator that returns a simulated stream of numbers with given pace based on sine.

Example:

//plot: Line plot
simstream(0.1)

It is defined as simsig(heartbeat(pace)), thus calling the function simsig(x) each x seconds after its start at the given pace. This is an example of how to generate a synthetic stream.

Exercise

Inspect the definitions of simstream() and simsig() to see how they are defined.

The function randstream(Real l, Real u)->Stream of Real generates a stream of random numbers between l and u.

Example:

//plot: Scatter plot
select randstream(0,1)
limit 1000

The guide Generate simulated audio shows how to generate synthetic audio streams using trigonometric functions.

Select stream queries

A select stream query produces a new stream where elements fulfilling user defined conditions are filtered out. For example, the following CQ filters out the stream elements of simstream(0.1) larger than 0.9 visualized as a line plot:

//plot: Line plot
select Stream of x
from Real x
where x in simstream(0.01)
and x > 0.9

The CQ generates a new stream of the selected stream elements. Notice how the result stream pauses (slows down) once in a while when when elements less than 0.9 are produced by simstream(0.01).

Exercise

Try running:

select Stream of x
from Real x
where x in heartbeat(0.5)
and x < 1

Why does it stop?

Stream windows

The CQ examples we have seen so far generate infinite streams of single elements (strings or numbers). However, it is often necessary to operate on streams of finite sections of the latest elements of a stream, called stream windows, e.g. a stream of the latest 10 elements in an infinite stream. In OSQL windows are represented as vectors. There are several built-in functions for forming such streams of windows.

Tumbling and sliding windows

In general the function winagg(s, sz, str) forms a stream of windows over a stream s where each window has size sz elements. The third parameter str is called the stride and defines how many stream elements the window moves forward over the stream.

Example:

winagg(heartbeat(0.5),2,2)

In the example above the size and the stride are both 2, meaning that once a window of 2 elements is produced a new one is started to be formed. This is called a tumbling window.

If the stride is smaller than the size, new overlapping windows will be produced more often. This is called a sliding window.

Example:

winagg(heartbeat(0.5),2,1)
Exercise

Run query returning sliding window over heartbeat(0.5) with size 4 and stride 2.

Bar plots can be used for running visualizations of numerical windows.

Example:

//plot: Bar plot
winagg(sin(heartbeat(0.1)*5),10,2)

The query produces a stream of vectors (i.e. windows) by collecting into the vectors 10 elements at the time from the stream sin(heartbeat(0.1)*5) slided with stride 2.

Since heartbeat(0.1) produces a number 10 times per second and the windows contain 10 elements displayed 10/2=5 times, the result from the expression is a stream of vectors produced once per second, i.e. with pace 5 HZ.

Stream aggregation

Since streams can be infinite, the computation of aggregated values over streams works differently from applying aggregate functions on bags, vectors or arrays, where a single value is aggregated over all elements in a collection. For example, summing all values in the stream heartbeat(0.5) would not be computable since heartbeat(0.5) is infinite and a regular application of sum would never return.

Therefore, when regular aggregate functions are applied on streams, they are applied on each element in the stream.

In the following example, the aggregate function sum is applied on the vectors representing tumbling windows of size 2 in the heartbeat(0.5) stream:

sum(winagg(heartbeat(0.5),2,2))

Moving average

A common way to reduce noisy signals is to form the moving average of sliding or tumbling windows over a stream of measurements by computing the average avg(w) for each window w.

Example:

//plot: Line plot
avg(winagg(simstream(0.01),10,5))
Exercise

As an alternative, try using median() instead of avg().

This CQ returns the moving averages larger than 0.7 of windows of simstream(0.01):

//plot: Line plot
select Stream of x
from Real x
where x in avg(winagg(simstream(0.01),5,5))
and x > 0.7

Here you will notice rather long pauses.

Stream aggregate functions

A stream aggregate function returns an aggregated value for every element in the stream it is applied on. It takes a stream as argument and returns a stream of aggregated values.

For example, the stream aggregate function rsum(Stream of Number s)->Stream of Number returns the running sum of the numbers in s as the sum of the elements in s up to the current element.

Example:

rsum(heartbeat(0.5))

The reduce function can be used for defining your own stream aggregate functions. If reduce is applied on a stream s it will returns an aggregated stream as the value of the the reductor for each element in s. The definition of the reductor is the same as for other reduce functions.

For example, the function myrsum is the same as rsum:

create function myrsum(Stream of Real s)->Stream of Real
as reduce(s, '+');
Exercise

Define the running product myrprod.

Time stamped streams

The function ts_simstream(pace) produces a stream of numbers where each element is time stamped with the wall time when it was produced. This is called a time stamped stream.

Example:

//plot: Line plot
ts_simstream(0.01)
info

Notice that the X-labels indicate the times when each element was produced. If you wait a while you will see how the stream start scrolling to the left.

Time stamped streams can be defined by time stamping the elements x of a stream's result by calling the function ts(x). For example, the following query returns the same timestamped simulated stream of numbers as ts_simstream(0.01):

//plot: Line plot
select Stream of ts(x)
from Real x
where x in simstream(0.01)

In general, the function ts(x) time stamps any kind of object. It returns a time stamped object having the value x (see Time).

Exercise

Visualize the four first elements of the time stamped stream as text.

The function sample_stream(x, pace) returns an infinite stream of the expression x evaluated every pace seconds, for example:

sample_stream("Hello stream",1)

The following CQ returns a visualized stream of time stamped random numbers:

//plot: Scatter plot
sample_stream(ts(rand(100)),0.1)

Time stamped windows

One can time stamp entire windows using the ts(x) function.

For example, the following query produces a stream of time stamped sliding windows each having 4 elements with 50% overlap:

ts(winagg(heartbeat(0.25),4,2))

Individual stream elements in windows can also be time stamped using ts(x).

Example:

winagg(ts(heartbeat(0.25)),2,2)
Exercise

Time stamp both elements and windows in the query above. How does the time stamp of each window relate to the time stamps of its elements?

Temporal windows

The kind of windows discussed so far are called counting windows in that they produce window vectors based on counting the incoming stream elements. This is a very efficient and simple method to form windows, e.g. for continuously computing statistics over windows of arriving data, as the moving average above. It works particular well if the elements arrive at a constant pace.

If the elements arrive irregularly one may need to form temporal windows whose sizes are based on elements arriving during a time period rather than on the number of arriving elements as for counting windows.

Temporal windows are formed with the OSQL function twinagg(ts, size, stride) that takes a timestamped stream of objects as input and produces a time stamped stream of vectors of objects as result. The parameters size and stride are here measured in seconds rather than number of elements as winagg().

Example:

select tv
from Timeval of Vector of Real tv
where tv in twinagg(ts_simstream(0.01),0.5,0.5)
limit 10

The following query computes the mean and median of simstream with pace 100HZ each 1/2 second:

//plot: Line plot
select avg(v), median(v)
from Timeval of Vector of Real tv, Vector of Real v
where tv in twinagg(ts_simstream(0.01),0.5,0.5)
and v = value(tv)
limit 10

Notice that you can apply any Vector function on v.

For more on windows over stream see Windowing.

Visualizing streams

We show how to use Multi plot for flexible stream visualization.

You can prefix a stream query with a JSON record specifying how to visualize the result. For example, the following query is visualized by a sliding line plot over the latest 200 values:

//plot: Multi plot
{'sa_plot': 'Line plot', 'memory': 200};
select [sin(x), cos(x)]
from Real x
where x in 10*heartbeat(.02)

Trigonometric functions lend themselves to algebraic manipulation over streams, like this amplitude modulation example:

//plot: Multi plot
{'sa_plot': 'Line plot', 'memory': 200};
select [sin(x)*sin(x/30), cos(x)*cos(x/30)]
from Real x
where x in 20*heartbeat(.01)

which is more appealing in parametric coordinates (scatter plot):

//plot: Multi plot
{'sa_plot': 'Scatter plot', 'memory': 1000};
select [sin(x)*sin(x/30), cos(x)*cos(x/30)]
from Real x
where x in 10*heartbeat(.005)

In Combining streams is is shown how to work with several streams in a query.