Analytics on detached edge devices
There are situations where the connectivity between the edge device cannot be guaranteed at all times. Or the number of edges are so many that continuos interaction with all edges would overload the communication infrastructure. In these situations it is recommended to run the edges in detached mode (ONLINE) and only connect to the server at regular intervals, or maybe even in fully disconnected mode (AUTONOMOUS).
In this section you will learn how to run edge queries in detached mode using store and forward (SNF). When running in SNF mode an edge will buffer up locally to a configurable number of elements over a configurable time period. When the time period expires it will attempt to connect to the server and try to forward these values through a configurable function on the server.
In addition, store and forward can be configured to never send streams to the server allowing for a fully detached execution of queries.
To understand SNF you will now learn how to configure an edge device to run edge queries on start.
This section assumes that you have at least one edge named edge1 connected.
Configuring store and forward​
Store and forward has the following configurable options:
store-and-forward(String, Required) - Name identifying the SNF task. Used to retrieve forwarded data viasubscribe('<EDGEID>-SNF-<name>').store-and-forward-interval(Number) - Seconds between each forward attempt to the server. If not set then edge will attempt to forward oncebuffer-size/2elements are buffered.buffer-size(Integer, Default 1000) - Maximum number of elements to buffer before the oldest values are dropped.store-and-forward-storefn(String, Defaultstream.charstring.charstring.record.snf_publish_storer->object) - Resolvent name of the OSQL function called on the server to process uploaded data. Built-in alternatives:snf_csv_storer,snf_json_storer.store-and-forward-params(Record) - Additional parameters passed to the storer function. See storer-specific parameters below.store-and-forward-local(Boolean) - When true, the edge runs the query locally without forwarding to the server.
Edge-side buffer parameters:
store-and-forward-buffer(String, Default"memory") -"memory"or"file". When"file", buffered rows are written to rotating segment files under<sa_home>/snf-buffer/<edgeid>/<flow>/so they survive an edge restart.store-and-forward-file-dir(String) - Override for the file buffer directory. Default:<sa_home>/snf-buffer/<edgeid>/<flow>/.store-and-forward-file-segment-rows(Integer, Default 1000) - Rotate the current segment file after this many rows.store-and-forward-file-segment-bytes(Integer, Default 1048576) - Rotate when file size reaches this many bytes. Whichever threshold (rows or bytes) is hit first triggers rotation.store-and-forward-file-max-segments(Integer, Default 64) - Maximum closed segment files on disk. The oldest is deleted when exceeded (FIFO shedding).
These options are set in a record as the third argument to
edge_cq. For example, the following query declares a record that
will start an SNF named forward1, which uploads data every third
second and stores up to 100 elements
set :forward1 = {
"store-and-forward": "forward1",
"store-and-forward-interval": 3,
"buffer-size": 100
}
To run this code block you must be logged in and your studio instance must be started.
We can use :forward1 as SNF parameter opts to
edge_cq(edge,cq,opts). For example, the following SNF query will
start simstream(1) and store up to 100 elements before sending them north
every 3 seconds:
Unlike the regular behavior of edge_cq(), results
from a store-and-forward query will not be visible until you subscribe to it.
How to do this is described in the next section.
edge_cq("edge1","select simstream(1) limit 1000", :forward1);
To run this code block you must be logged in and your studio instance must be started.
Server-side backpressure​
Uploads from edges are not processed synchronously on the server.
Instead, uploaded data is pushed onto a server-side pub/sub producer (a
named internal flow SNF-PROC-<flow-name>). A background query on the
server subscribes to this flow and calls the configured storer function
asynchronously.
If the server-side buffer is full (the producer queue is at capacity), the server responds with a backpressure error. The edge automatically retries on its next SNF interval. No data is lost — the edge keeps its local buffer until the upload succeeds.
This decouples edge upload speed from server processing speed, preventing slow downstream systems (databases, file I/O) from blocking edge communication.
The server-side buffer size is configurable via
store-and-forward-params:
server-buffer-size(Integer, Default 1000) — Maximum number of upload batches the server will buffer before signaling backpressure.
Access data from store and forward.​
By default store and forward will simply publish the data on a
server-side subscription with the name
<edgeid>-SNF-<store-and-forward-name> in all uppercase. Let's look
at the data coming from our store and forward query:
subscribe("EDGE1-SNF-FORWARD1");
To run this code block you must be logged in and your studio instance must be started.
The following CQ views the stream in real-time from the SAS:
subscribe('edge1','forward1')
To run this code block you must be logged in and your studio instance must be started.
The following query returns all queries currently running on edge1
select r["requests"]
from Record r
where r in edge_status()
and r["id"]="EDGE1"
To run this code block you must be logged in and your studio instance must be started.
You can stop an edge CQ using cancel_edge_cq():
cancel_edge_cq("edge1","forward1");
To run this code block you must be logged in and your studio instance must be started.
Now look at the running queries again:
select r["requests"]
from Record r
where r in edge_status()
and r["id"]="EDGE1"
To run this code block you must be logged in and your studio instance must be started.
Changing the function used to forward the data to the server​
By default the SNF publisher does not persist the data forwarded to
the server. It is possible to create a CQ that runs a stream, does
some analytics over it, and finally stores the resulted events in an
stored OSQL function. However, usually you just want to save the data
on disk in a log file, or forward it on a Kafka or MQTT
topic. To control what to do with the output events from a CQ running
as SNF, you can specify a storer function via the
store-and-forward-storefn property. A storer function snf must
have the signature snf(Stream s,Charstring edgeid, Charstring
flowname, Record params)->Object. The result is ignored; the storer
function is a sink.
The built-in storer functions are:
snf_publish_storer(default) — Publishes each row to an internal data flow named<EDGEID>-SNF-<name>(or the value ofparams["flow-name"]if set). Subscribers receive data viasubscribe('<EDGEID>-SNF-<name>'). Useskeep-alive: trueso the flow persists across uploads.snf_csv_storer— Writes to rotating CSV files atget_snf_file(edgeid, flow)with automatic rotation. Configurable viastore-and-forward-params:"segment-rows"(Integer, Default 1000) — Rotate to a new file after this many rows."max-segments"(Integer, Default 1024) — Maximum number of CSV files to keep. Oldest are deleted when exceeded.
snf_json_storer— Writes to rotating JSON files (one JSON object per line). Same rotation parameters as the CSV storer:"segment-rows"(Integer, Default 1000)"max-segments"(Integer, Default 1024)
Look at the source code for the function named
snf_publish_storer
Example: CSV storer with rotation parameters​
set :csv_opts = {
"store-and-forward": "sensor-data",
"store-and-forward-interval": 5,
"store-and-forward-storefn": "stream.charstring.charstring.record.snf_csv_storer->object",
"store-and-forward-params": {
"segment-rows": 5000,
"max-segments": 100
}
};
edge_cq("edge1", "diota(0.1,1,100)", :csv_opts);
To run this code block you must be logged in and your studio instance must be started.
Using the built-in CSV storer function​
Let's try using the built-in CSV storer function. Start by creating a new option record:
set :forward2 = {
"store-and-forward": "forward2",
"store-and-forward-interval": 3,
"store-and-forward-storefn": "snf_csv_storer",
"buffer-size": 100
};
To run this code block you must be logged in and your studio instance must be started.
When using the CSV storer function, data is written to rotating files
under get_snf_file(edgeid, flow). To get the folder for edge1 run:
get_snf_folder("edge1")
To run this code block you must be logged in and your studio instance must be started.
There is also a utility function where you can get all available files
using get_snf_files:
get_snf_files("edge1","forward1")
To run this code block you must be logged in and your studio instance must be started.
Let's start a store and forward with the CSV storer function:
edge_cq("edge1","select simstream(0.3) limit 1000", :forward2);
To run this code block you must be logged in and your studio instance must be started.
After a few seconds you should start getting some files from the following call:
get_snf_files("edge1","forward2")
To run this code block you must be logged in and your studio instance must be started.
To view the entire store and forward log for forward2 on edge1 run:
select vector of x
from Vector x
where x in get_snf_log("edge1","forward2");
To run this code block you must be logged in and your studio instance must be started.
As before you can cancel the store and forward query using
cancel_edge_cq:
cancel_edge_cq("edge1","forward2");
To run this code block you must be logged in and your studio instance must be started.
Rotating file writers​
The CSV and JSON storers use general-purpose rotating file writer functions that are also available for direct use outside of SNF:
csv:write_rotating(prefix, stream)— Write a stream of vectors to rotating CSV files named<prefix>_NNNNNNNNNN.csv.csv:write_rotating(prefix, segment_rows, max_segments, stream)— With explicit rotation settings.json:write_rotating(prefix, stream)— Write a stream of objects to rotating JSON files.json:write_rotating(prefix, segment_rows, max_segments, stream)— With explicit rotation settings.
Local store and forward​
Sometimes it is desirable to run a query that never tries to forward the data to a server. You might have a sink on the edge that will take care of the data streams by storing them locally, sending them in a queue to other programs or simply discards the output if your query updates the local edge database.
Since you can always register a sink function that handles the output
of a CQ as the result of the query itself a local SNF query does not
need a storer function. However, you do need to add a field
store-and-forward-local to your options record. This option will
tell the edge to run this query and simply throw away the result. If
you wish to save the result to disk as a log file you can use
functions like csv:write_file() or json:write_file() as a sink in
the query to store the result.
A local store and forward example​
Start by creating a new option-record; now we only set the
store-and-forward identifier and set store-and-forward-local to true:
set :localForward = {
"store-and-forward": "local-forward1",
"store-and-forward-local": true
}
To run this code block you must be logged in and your studio instance must be started.
As an example, we produce a data flow named my-flow by a publish
sink for the edge query simstream(0.02) with SNF options
:localForward:
edge_cq("edge1", "select publish(simstream(0.02),'my-flow') limit 10000",
:localForward);
To run this code block you must be logged in and your studio instance must be started.
On the SAS we can now subscribe to the published flow my-flow from edge1
by using subscribe(Charstring edge, Charstring flow):
subscribe("edge1","my-flow");
To run this code block you must be logged in and your studio instance must be started.
As usual you can use cancel_edge_cq to stop it:
cancel_edge_cq("edge1","local-forward1");
To run this code block you must be logged in and your studio instance must be started.