Skip to main content

Fusion models

This page uses Studio code blocks so you can run the examples directly in the browser. You only need to sign up for SA Studio (it's free). Once you have done that you can execute the code blocks on this page.

So far we have developed data stream models either in the stream server or in an edge client. This section will outline how to define a fusion model that combines in the stream sever data streams from several edge clients running the detect-shake model.

The tutorial assumes that there are at least two edge clients having accelerometers registered in the stream server. For example, it can be two Android devices or an Android and a MangOH Red device.

A fusion model will be developed that detects when all the devices with an accelerometer are shaking at the same time.

Post-processing the detect-shake edge model​

In order for the stream server to run a fusion query to detect when all the devices are shaking at the same time, a post-processing function for the detect-shake edge model is needed. This function adds to the result stream of shakes() a time-stamp and the peer id where it is running. The post-processing produces a stream of triples:

[timestamp, peer_id, shake_status]

Drill down to the edge device android1 and define the following function:

//peer: android1
create function detect_shake(Number threshold) -> Stream of Vector
as select Stream of [now(),this_peerid(),sh]
from Number sh
where sh in shakes(threshold)
Not connected

To run this code block you must be logged in and your studio instance must be started.

Test it:

//peer: android1
Not connected

To run this code block you must be logged in and your studio instance must be started.


Add the function detect_shake(threshold) to the master file for the model detect-shake.

Deploying detect-shake on edges​

Since detect-shake requires an accelerometer on the device the model should be deployed only on edges with an accelerometer. This can be done with models:deploy() together with edges_with_signal() as follows:

models:deploy(edges_with_signal('accelerometer'), 'detect-shake');
Not connected

To run this code block you must be logged in and your studio instance must be started.

Now that the detect-shake model is installed on all edges with an accelerometer the function detect_shake(threshold) can be run on all of the edges having an accelerometer with:

edge_cq(edges_with_signal("accelerometer"), "detect_shake(5)");
Not connected

To run this code block you must be logged in and your studio instance must be started.


edge_cq(Vector of Charstring edges , Charstring q) -> Stream will send the same edge query to each edge in the vector edges and then merge the result from all of the edges into one stream on the server.

Stream of shake status vectors​

In the fusion model in the stream server we need to compare the latest values from each edge model running detect_shake(5) to see whether they all are shaking at the same time, thus having shake status 1. To simplify this we first construct a shake status vector whose elements are 1 for the shaking units and 0 otherwise:

select pivot_events(edges, merged)
from Vector of Charstring edges, Stream of Vector merged
where edges = edges_with_signal("accelerometer")
and merged = edge_cq(edges, "detect_shake(5)")
Not connected

To run this code block you must be logged in and your studio instance must be started.

The important part of the query is the function pivot_events() that has the signature:

pivot_events(Vector variables,Stream of Vector s) -> Stream of Timeval of Vector

The function transposes a stream s of event triples [timestamp, variable, value] into a stream of Timestamped status vectors Timeval of vector with one value for each variable in vars, [V1,...,Vk]. In our example the variables are edge identifiers.

Package the query into a function same_time_shake_status():

create function same_time_shake_status() -> Stream of Timeval of Vector
as select pivot_events(edges, merged)
from Vector of Charstring edges, Stream of Vector merged
where edges = edges_with_signal("accelerometer")
and merged = edge_cq(edges, "detect_shake(5)")
Not connected

To run this code block you must be logged in and your studio instance must be started.

Test it:

Not connected

To run this code block you must be logged in and your studio instance must be started.

The final fusion model​

The status vectors in the stream same_time_shake_status() contains a 1 or a 0 for each edge device indicating whether it is shaking or not. Therefore all edges are shaking at the same time when the sum of the status vector elements is equal to its dimension. We define same_time_shakes() as:

create function same_time_shakes() -> Stream of Timeval of Charstring
as changed(select Stream of ts(ts,msg)
from Charstring msg, Vector status, Timeval ts
where ts in same_time_shake_status()
and status = value(ts)
and msg = case when sum(status)=dim(status) then "Shaking"
else "Calm" end)
Not connected

To run this code block you must be logged in and your studio instance must be started.

Test it in the stream server:

Not connected

To run this code block you must be logged in and your studio instance must be started.


Save the definitions of same_time_shake_status() and same_time_shakes() in a new model named shake-fusion.