# Stream

## Functions

`changed(Stream s)->Stream`

Stream containing the elements of stream `s`

that are different
than their predecessor

`changed(Stream of Vector s,Vector of Number indexes)->Stream of Vector`

Stream containing the vectors of stream `s`

whose elements in `indexes`

are different than their predecessor

`concat(Stream a,Stream b)->Stream`

Add stream `b`

after stream `a`

`debounce(Stream s,Number start_s,Number debounce)->Stream`

Skip elements in `s`

arriving in the first `start_s`

seconds and stop
when `s`

has not emitted any new values in the last `debounce`

seconds

`diota(Real pace,Integer l,Integer u)->Stream of Integer`

Stream of natural numbers between `l`

and `u`

with delays of `pace`

seconds between produced stream elements

`extract(Stream s)->Bag`

Extract elements in stream `s`

one by one as elements in a bag

`first(Stream s)->Object`

The first element in stream `s`

`first_n(Stream s,Number n)->Stream`

The stream of the first `n`

elements in stream `s`

`heartbeat(Real pace)->Stream of Real`

Stream of seconds from start emitted at given `pace`

in seconds

`heartbeat_wrap(Bag b,Number pace)->Stream`

Convert bag `b`

into stream with sampling frequency `pace`

`histogram(Stream s,Vector limits)->Stream of Vector of Integer`

Calculate a stream of histograms over stream `s`

, with `limits`

vector.
`limits`

is a vector with `[min,max,number of bins]`

the range for the histograms is always $[min,max)$

`histogram(Stream of Number s,Number min,Number max,Number bins)`

->Stream of Vector of Integer

Calculate a stream histograms over a stream `s`

, with `min`

, `max`

, and `bins`

the range for the histograms are always $[min,max)$

`histogram(Stream of Vector s,Vector of Vector limits)`

->Stream of Vector of Vector

Calculate a stream of histograms over a stream of vector `s`

,
with `limits`

vector. Limits must be a vector of the same dimension
as each vector in `s`

and:

the range for the histograms is always $[min,max)$

`merge(Vector of Stream vs)->Stream`

Merge streams in `vs`

`merge(Stream s1,Stream s2)->Stream`

Merge streams `s1`

and `s2`

`pivot(Vector of Stream vs,Vector iv)->Stream of Vector`

A stream of the most recently received values in `vs`

,
having the vector `iv`

as the initial element

`pivot(Vector of Stream vs)->Stream of Vector`

A stream of the most recently received elements in `vs`

`pivot_events(Vector keys,Stream of Vector bus)->Stream of Timeval of Vector`

Pivot stream `bus`

of `[timestamp, key, value]`

into stream of timestamped vectors pivoted on `keys`

`playback(Stream s)->Stream`

Playback time stamped stream of vectors `s`

with pace
according to time stamp in each first vector elements of `s`

`randstream(Real l,Real u)->Stream of Real`

Infinite stream of random numbers between `l`

and `u`

`ravg(Stream s)->Stream of Real`

Running averages of elements in stream `s`

`rcount(Stream s)->Stream of Integer`

Stream of running count of elements in stream `s`

`readlines(Charstring file,Charstring delim,Number chunk)`

->Stream of Vector of Charstring

`readlines(Charstring file)->Stream of Charstring`

Stream of lines in `file`

`read_file_chunked(Charstring file,Integer chunk)->Stream`

`remote:function(Charstring peer,Charstring fn,Vector args)->Stream`

`remote:query(Charstring peer,Charstring query)->Stream`

`remote_function_stream(Charstring peer,Charstring fn,Vector args)`

->Stream of Vector

Return stream of tuples from result of `fn(args)`

on `peer`

`rsum(Stream s)->Stream of Real`

Stream of running averages of elements in stream `s`

`sample_every(Stream s,Real pace)->Stream`

Run stream `s`

and emit values every `pace`

seconds since the last value

`sample_stream(Bag expression,Real pace)->Stream`

Stream of `expression`

evaluated every `pace`

seconds

`save_last_element(Stream s,Function f)->Stream`

`section(Stream s,Number start,Number stop)->Stream`

The section of stream `s`

starting at position `start`

and ending at `stop`

.

`simsig(Real x)->Real`

A simulated harmonic stream reading `x`

seconds from its start

`simstream(Real pace)->Stream of Real`

A simulated harmonic stream

`sink(Stream s)->Boolean`

Run stream `s`

silently without extracting any elements

`siota(Integer l,Integer u)->Stream of Integer`

Stream of natural numbers between `l`

and `u`

`skip(Stream s,Number n)->Stream`

Skip first `n`

elements in stream `s`

`skip_s(Stream s,Number sec)->Stream`

Skip all elements arriving in the first `sec`

seconds of
stream `s`

.

`streamof(Bag b)->Stream`

Convert a bag `b`

to a stream

`streamof(Stream s)->Stream`

`stream_function_tuples(Charstring peer,Charstring fn,Vector args)->Bag of Vector`

`timeout(Stream s,Number timeout)->Stream`

`timestamps(Number pace)->Stream of Charstring`

Stream of local timestamp UTC every `pace`

seconds

`timestream(Number pace)->Stream of Timeval`

Stream of time stamps every `pace`

seconds

`time_section(Stream s,Number start_s,Number stop_s)->Stream`

Skip elements in `s`

arriving in the first `start_s`

seconds and stop
after `stop_s`

seconds. This is similar to `section(stream,start,stop)`

,
except that the start and stop values are real-time seconds.

`time_spent(Stream s)->Real`

Real time spent in running `s`

`ts_simstream(Real pace)->Stream of Timeval of Real`

A sumulated time stamped harmonic stream

`vectorof(Stream b)->Vector v`

Convert finite stream to vector

`vstream(Vector v)->Stream`

Convert a vector `v`

to a stream

`writelines(Stream s,Charstring file)->Charstring`

Create `file`

by lines in stream `s`

`zip(Vector of Stream vs,Vector of Integer indices)->Stream of Vector`

A stream where each received vector in `vs`

is "zipped" together on the
`indices`

. Vector elements not in `indices`

will retain the latest seen
value

`zip(Vector of Stream vs)->Stream of Vector`

A stream where each received values in `vs`

is "zipped" together one at a
time. Should only be used of streams have the same pace.