Queries over infinite streams
One important feature of the OSQL language is to enable processing functions and filters over streams. A stream is a possibly infinite sequence of objects. It often grows continuously over time at some pace.
Continuous queries
The function heartbeat(pace)
returns a stream of numbers
representing the time elapsed from its start every pace
seconds.
Example:
heartbeat(0.5)
The query is an example of a continuous query (CQ) since it continuously produces an infinite stream of elements until you explicitly stop it by pressing the stop button.
The in
operator can be used to extract element from a stream.
Example:
select sqrt(x)
from Real x
where x in heartbeat(0.5)
heartbeat
is a stream generator, i.e. a function that returns a
stream as an object. The in
operator extracts the elements from a
stream. If a simple function like sqrt
or operator like +
is
applied on a stream generator it will be applied on each element
of the stream thus generating a transformed stream as in the example.
You can stop an infinite CQ by specifying a limit
in a select query,
for example:
select heartbeat(0.125) limit 5
The stream generator diota(pace,l,u)
returns a stream of the
integers between l
and u
with ´pace´ delays in-between.
Example:
select diota(0.5,1,10) limit 5
A function taking a stream as argument is called a stream
function. For example, the stream function first_n(s,n)
returns a
finite stream of the n
first elements in the stream s
.
Example:
first_n(heartbeat(0.125),5)
The function timeout(s,t)
runs the stream s
for t
seconds.
Example:
timeout(heartbeat(0.125),1.1)
The function skip(s,n)
skips the n
first elements in the stream
s
.
Example:
skip(heartbeat(1),1)
The stream function changed(s)
returns a stream where successive
duplicated elements have been removed.
Example:
This stream contains duplicated elements:
select floor(heartbeat(0.25)*2.5) limit 7
The duplicated elements have been eliminated in this stream:
select changed(floor(heartbeat(0.25)*2.5)) limit 7
Stream elements are enumerated starting with number 1. The stream
function section(s,b,e)
returns a stream of the elements in stream
s
starting with element number b
and ending with element number
e
.
section(heartbeat(0.125),3,5)
Synthetic streams
Synthetic stream generators are functions returning streams whose elements are generated by some algorithm rather that originating in some physical measurements. In the documentation synthetic streams are used for illustrating the behavior of the system. They are furthermore very useful for generating simulated data streams to test models without accessing sensors and other external data sources.
A trivial example of a synthetic stream generator is the function
siota(Integer l, Integer u)->Stream of Integer
that generates a
finite stream of integers between l
and u
. It is used in throughout
the documentation to produce simple predictable streams that
illustrate the behavior of other stream functions.
Similarly, the function diota(Real pace, Integer l, Integer
u)->Stream of Integer
generates a predictable finite stream of
integers between l
and u
with a delay of pace
in-between
produced elements.
Example: This function generates a finite real-time sine wave:
//plot: Line plot
sin(diota(0.01,1,100)*3.14/50)
The function heartbeat(Real pace)->Stream of Real
is an important
built-in synthetic stream generator that can be used for defining
other real-time stream generators.
Example: This stream generator returns a real-time infinite sine wave:
create function rsin(Real pace, Real multiplier) -> Stream of Real
as sin(heartbeat(pace)*multiplier)
Let's try it:
//plot: Line plot
rsin(0.01,10)
Vary the parameters of rsin()
and see what happens.
In general, the built-in library of trigonometric functions is very
useful for generating synthetic streams. The system function
simstream(Number pace)
is such a synthetic stream generator that
returns a simulated stream of numbers with given pace
based on sine.
Example:
//plot: Line plot
simstream(0.1)
It is defined as simsig(heartbeat(pace))
, thus calling the function
simsig(x)
each x
seconds after its start at the given pace
. This
is an example of how to generate a synthetic stream.
Inspect the definitions of simstream()
and simsig()
to see how they are defined.
The function randstream(Real l, Real u)->Stream of Real
generates a
stream of random numbers between l
and u
.
Example:
//plot: Scatter plot
select randstream(0,1)
limit 1000
The guide Generate simulated audio shows how to generate synthetic audio streams using trigonometric functions.
Select stream queries
A select stream query produces a new stream where elements
fulfilling user defined conditions are filtered out. For example, the
following CQ filters out the stream elements of simstream(0.1)
larger than 0.9 visualized as a line plot:
//plot: Line plot
select Stream of x
from Real x
where x in simstream(0.01)
and x > 0.9
The CQ generates a new stream of the selected stream elements. Notice
how the result stream pauses (slows down) once in a while when when
elements less than 0.9 are produced by simstream(0.01)
.
Try running:
select Stream of x
from Real x
where x in heartbeat(0.5)
and x < 1
Why does it stop?
Stream windows
The CQ examples we have seen so far generate infinite streams of single elements (strings or numbers). However, it is often necessary to operate on streams of finite sections of the latest elements of a stream, called stream windows, e.g. a stream of the latest 10 elements in an infinite stream. In OSQL windows are represented as vectors. There are several built-in functions for forming such streams of windows.
Tumbling and sliding windows
In general the function winagg(s, sz, str)
forms a stream of windows
over a stream s
where each window has size sz
elements. The third parameter str
is called the stride and
defines how many stream elements the window moves forward over the
stream.
Example:
winagg(heartbeat(0.5),2,2)
In the example above the size and the stride are both 2, meaning that once a window of 2 elements is produced a new one is started to be formed. This is called a tumbling window.
If the stride is smaller than the size, new overlapping windows will be produced more often. This is called a sliding window.
Example:
winagg(heartbeat(0.5),2,1)
Run query returning sliding window over heartbeat(0.5)
with size 4
and stride 2.
Bar plots can be used for running visualizations of numerical windows.
Example:
//plot: Bar plot
winagg(sin(heartbeat(0.1)*5),10,2)
The query produces a stream of vectors (i.e. windows) by collecting
into the vectors 10 elements at the time from the stream
sin(heartbeat(0.1)*5)
slided with stride 2.
Since heartbeat(0.1)
produces a number 10 times per second and the
windows contain 10 elements displayed 10/2=5 times, the result from
the expression is a stream of vectors produced once per second,
i.e. with pace 5 HZ.
Stream aggregation
Since streams can be infinite, the computation of aggregated values
over streams works differently from applying aggregate functions on
bags, vectors or arrays, where a single value is aggregated over all
elements in a collection. For example, summing all values in the
stream heartbeat(0.5)
would not be computable since heartbeat(0.5)
is infinite and a regular application of sum
would never return.
Therefore, when regular aggregate functions are applied on streams, they are applied on each element in the stream.
In the following example, the aggregate function sum
is applied on
the vectors representing tumbling windows of size 2 in the
heartbeat(0.5)
stream:
sum(winagg(heartbeat(0.5),2,2))
Moving average
A common way to reduce noisy signals is to form the moving average
of sliding or tumbling windows over a stream of measurements by computing the
average avg(w)
for each window w
.
Example:
//plot: Line plot
avg(winagg(simstream(0.01),10,5))
As an alternative, try using median()
instead of avg()
.
This CQ returns the moving averages larger than 0.7 of windows of
simstream(0.01)
:
//plot: Line plot
select Stream of x
from Real x
where x in avg(winagg(simstream(0.01),5,5))
and x > 0.7
Here you will notice rather long pauses.
Stream aggregate functions
A stream aggregate function returns an aggregated value for every element in the stream it is applied on. It takes a stream as argument and returns a stream of aggregated values.
For example, the stream aggregate function rsum(Stream of Number
s)->Stream of Number
returns the running sum of the numbers in
s
as the sum of the elements in s
up to the current element.
Example:
rsum(heartbeat(0.5))
The reduce
function
can be used for defining your own stream aggregate functions. If
reduce
is applied on a stream s
it will returns an aggregated
stream as the value of the the reductor for each element in
s
. The definition of the reductor is the same as for other reduce
functions.
For example, the function myrsum
is the same as rsum
:
create function myrsum(Stream of Real s)->Stream of Real
as reduce(s, '+');
Define the running product myrprod
.
Time stamped streams
The function ts_simstream(pace)
produces a stream of numbers where
each element is time
stamped
with the wall time when it was produced. This is called a time
stamped stream.
Example:
//plot: Line plot
ts_simstream(0.01)
Notice that the X-labels indicate the times when each element was produced. If you wait a while you will see how the stream start scrolling to the left.
Time stamped streams can be defined by time stamping the elements x
of a stream's result by calling the function ts(x)
. For example, the
following query returns the same timestamped simulated stream of
numbers as ts_simstream(0.01)
:
//plot: Line plot
select Stream of ts(x)
from Real x
where x in simstream(0.01)
In general, the function ts(x)
time stamps any kind of
object. It returns a time stamped object having the value
x
(see Time).
Visualize the four first elements of the time stamped stream as text.
The function sample_stream(x, pace)
returns an infinite stream of
the expression x
evaluated every pace
seconds, for example:
sample_stream("Hello stream",1)
The following CQ returns a visualized stream of time stamped random numbers:
//plot: Scatter plot
sample_stream(ts(rand(100)),0.1)
Time stamped windows
One can time stamp entire windows using the ts(x)
function.
For example, the following query produces a stream of time stamped sliding windows each having 4 elements with 50% overlap:
ts(winagg(heartbeat(0.25),4,2))
Individual stream elements in windows can also be time stamped using ts(x)
.
Example:
winagg(ts(heartbeat(0.25)),2,2)
Time stamp both elements and windows in the query above. How does the time stamp of each window relate to the time stamps of its elements?
Temporal windows
The kind of windows discussed so far are called counting windows in that they produce window vectors based on counting the incoming stream elements. This is a very efficient and simple method to form windows, e.g. for continuously computing statistics over windows of arriving data, as the moving average above. It works particular well if the elements arrive at a constant pace.
If the elements arrive irregularly one may need to form temporal windows whose sizes are based on elements arriving during a time period rather than on the number of arriving elements as for counting windows.
Temporal windows are formed with the OSQL function
twinagg(ts, size, stride)
that takes a timestamped stream of objects
as input and produces a time stamped stream of vectors of objects as
result. The parameters size
and stride
are here measured in
seconds rather than number of elements as winagg()
.
Example:
select tv
from Timeval of Vector of Real tv
where tv in twinagg(ts_simstream(0.01),0.5,0.5)
limit 10
The following query computes the mean and median of simstream with pace 100HZ each 1/2 second:
//plot: Line plot
select avg(v), median(v)
from Timeval of Vector of Real tv, Vector of Real v
where tv in twinagg(ts_simstream(0.01),0.5,0.5)
and v = value(tv)
limit 10
Notice that you can apply any Vector
function on
v
.
For more on windows over stream see Windowing.
Visualizing streams
We show how to use Multi plot for flexible stream visualization.
You can prefix a stream query with a JSON record specifying how to visualize the result. For example, the following query is visualized by a sliding line plot over the latest 200 values:
//plot: Multi plot
{'sa_plot': 'Line plot', 'memory': 200};
select [sin(x), cos(x)]
from Real x
where x in 10*heartbeat(.02)
Trigonometric functions lend themselves to algebraic manipulation over streams, like this amplitude modulation example:
//plot: Multi plot
{'sa_plot': 'Line plot', 'memory': 200};
select [sin(x)*sin(x/30), cos(x)*cos(x/30)]
from Real x
where x in 20*heartbeat(.01)
which is more appealing in parametric coordinates (scatter plot):
//plot: Multi plot
{'sa_plot': 'Scatter plot', 'memory': 1000};
select [sin(x)*sin(x/30), cos(x)*cos(x/30)]
from Real x
where x in 10*heartbeat(.005)
In Combining streams is is shown how to work with several streams in a query.