Skip to main content

Analytics on detached edge devices

This page uses Studio code blocks so you can run the examples directly in the browser. You only need to sign up for SA Studio (it's free). Once you have done that you can execute the code blocks on this page.

There are situations where the connectivity between the edge device cannot be guaranteed at all times. Or the number of edges are so many that continuos interaction with all edges would overload the communication infrastructure. In these situations it is recommended to run the edges in detached mode (ONLINE) and only connect to the server at regular intervals, or maybe even in fully disconnected mode (AUTONOMOUS).

In this section you will learn how to run edge queries in detached mode using store and forward (SNF). When running in SNF mode an edge will buffer up locally to a configurable number of elements over a configurable time period. When the time period expires it will attempt to connect to the server and try to forward these values through a configurable function on the server.

In addition, store and forward can be configured to never send streams to the server allowing for a fully detached execution of queries.

To understand SNF you will now learn how to configure an edge device to run edge queries on start.

Note

This section assumes that you have at least one edge named edge1 connected.

Configuring store and forward​

Store and forward has the following configurable options:

  • store-and-forward (String, Required) - Name identifying the SNF task. Used to retrieve forwarded data via subscribe('<EDGEID>-SNF-<name>').

  • store-and-forward-interval (Number) - Seconds between each forward attempt to the server. If not set then edge will attempt to forward once buffer-size/2 elements are buffered.

  • buffer-size (Integer, Default 1000) - Maximum number of elements to buffer before the oldest values are dropped.

  • store-and-forward-storefn (String, Default stream.charstring.charstring.record.snf_publish_storer->object) - Resolvent name of the OSQL function called on the server to process uploaded data. Built-in alternatives: snf_csv_storer, snf_json_storer.

  • store-and-forward-params (Record) - Additional parameters passed to the storer function. See storer-specific parameters below.

  • store-and-forward-local (Boolean) - When true, the edge runs the query locally without forwarding to the server.

Edge-side buffer parameters:

  • store-and-forward-buffer (String, Default "memory") - "memory" or "file". When "file", buffered rows are written to rotating segment files under <sa_home>/snf-buffer/<edgeid>/<flow>/ so they survive an edge restart.

  • store-and-forward-file-dir (String) - Override for the file buffer directory. Default: <sa_home>/snf-buffer/<edgeid>/<flow>/.

  • store-and-forward-file-segment-rows (Integer, Default 1000) - Rotate the current segment file after this many rows.

  • store-and-forward-file-segment-bytes (Integer, Default 1048576) - Rotate when file size reaches this many bytes. Whichever threshold (rows or bytes) is hit first triggers rotation.

  • store-and-forward-file-max-segments (Integer, Default 64) - Maximum closed segment files on disk. The oldest is deleted when exceeded (FIFO shedding).

These options are set in a record as the third argument to edge_cq. For example, the following query declares a record that will start an SNF named forward1, which uploads data every third second and stores up to 100 elements

set :forward1 = {
"store-and-forward": "forward1",
"store-and-forward-interval": 3,
"buffer-size": 100
}
Not connected

To run this code block you must be logged in and your studio instance must be started.

We can use :forward1 as SNF parameter opts to edge_cq(edge,cq,opts). For example, the following SNF query will start simstream(1) and store up to 100 elements before sending them north every 3 seconds:

Note

Unlike the regular behavior of edge_cq(), results from a store-and-forward query will not be visible until you subscribe to it. How to do this is described in the next section.

edge_cq("edge1","select simstream(1) limit 1000", :forward1);
Not connected

To run this code block you must be logged in and your studio instance must be started.

Server-side backpressure​

Uploads from edges are not processed synchronously on the server. Instead, uploaded data is pushed onto a server-side pub/sub producer (a named internal flow SNF-PROC-<flow-name>). A background query on the server subscribes to this flow and calls the configured storer function asynchronously.

If the server-side buffer is full (the producer queue is at capacity), the server responds with a backpressure error. The edge automatically retries on its next SNF interval. No data is lost — the edge keeps its local buffer until the upload succeeds.

This decouples edge upload speed from server processing speed, preventing slow downstream systems (databases, file I/O) from blocking edge communication.

The server-side buffer size is configurable via store-and-forward-params:

  • server-buffer-size (Integer, Default 1000) — Maximum number of upload batches the server will buffer before signaling backpressure.

Access data from store and forward.​

By default store and forward will simply publish the data on a server-side subscription with the name <edgeid>-SNF-<store-and-forward-name> in all uppercase. Let's look at the data coming from our store and forward query:

subscribe("EDGE1-SNF-FORWARD1");
Not connected

To run this code block you must be logged in and your studio instance must be started.

The following CQ views the stream in real-time from the SAS:

subscribe('edge1','forward1')
Not connected

To run this code block you must be logged in and your studio instance must be started.

The following query returns all queries currently running on edge1

select r["requests"]
from Record r
where r in edge_status()
and r["id"]="EDGE1"
Not connected

To run this code block you must be logged in and your studio instance must be started.

You can stop an edge CQ using cancel_edge_cq():

cancel_edge_cq("edge1","forward1");
Not connected

To run this code block you must be logged in and your studio instance must be started.

Now look at the running queries again:

select r["requests"]
from Record r
where r in edge_status()
and r["id"]="EDGE1"
Not connected

To run this code block you must be logged in and your studio instance must be started.

Changing the function used to forward the data to the server​

By default the SNF publisher does not persist the data forwarded to the server. It is possible to create a CQ that runs a stream, does some analytics over it, and finally stores the resulted events in an stored OSQL function. However, usually you just want to save the data on disk in a log file, or forward it on a Kafka or MQTT topic. To control what to do with the output events from a CQ running as SNF, you can specify a storer function via the store-and-forward-storefn property. A storer function snf must have the signature snf(Stream s,Charstring edgeid, Charstring flowname, Record params)->Object. The result is ignored; the storer function is a sink.

The built-in storer functions are:

  • snf_publish_storer (default) — Publishes each row to an internal data flow named <EDGEID>-SNF-<name> (or the value of params["flow-name"] if set). Subscribers receive data via subscribe('<EDGEID>-SNF-<name>'). Uses keep-alive: true so the flow persists across uploads.

  • snf_csv_storer — Writes to rotating CSV files at get_snf_file(edgeid, flow) with automatic rotation. Configurable via store-and-forward-params:

    • "segment-rows" (Integer, Default 1000) — Rotate to a new file after this many rows.
    • "max-segments" (Integer, Default 1024) — Maximum number of CSV files to keep. Oldest are deleted when exceeded.
  • snf_json_storer — Writes to rotating JSON files (one JSON object per line). Same rotation parameters as the CSV storer:

    • "segment-rows" (Integer, Default 1000)
    • "max-segments" (Integer, Default 1024)
Exercise

Look at the source code for the function named snf_publish_storer

Example: CSV storer with rotation parameters​

set :csv_opts = {
"store-and-forward": "sensor-data",
"store-and-forward-interval": 5,
"store-and-forward-storefn": "stream.charstring.charstring.record.snf_csv_storer->object",
"store-and-forward-params": {
"segment-rows": 5000,
"max-segments": 100
}
};
edge_cq("edge1", "diota(0.1,1,100)", :csv_opts);
Not connected

To run this code block you must be logged in and your studio instance must be started.

Using the built-in CSV storer function​

Let's try using the built-in CSV storer function. Start by creating a new option record:

set :forward2 = {
"store-and-forward": "forward2",
"store-and-forward-interval": 3,
"store-and-forward-storefn": "snf_csv_storer",
"buffer-size": 100
};
Not connected

To run this code block you must be logged in and your studio instance must be started.

When using the CSV storer function, data is written to rotating files under get_snf_file(edgeid, flow). To get the folder for edge1 run:

get_snf_folder("edge1")
Not connected

To run this code block you must be logged in and your studio instance must be started.

There is also a utility function where you can get all available files using get_snf_files:

get_snf_files("edge1","forward1")
Not connected

To run this code block you must be logged in and your studio instance must be started.

Let's start a store and forward with the CSV storer function:

edge_cq("edge1","select simstream(0.3) limit 1000", :forward2);
Not connected

To run this code block you must be logged in and your studio instance must be started.

After a few seconds you should start getting some files from the following call:

get_snf_files("edge1","forward2")
Not connected

To run this code block you must be logged in and your studio instance must be started.

To view the entire store and forward log for forward2 on edge1 run:

select vector of x
from Vector x
where x in get_snf_log("edge1","forward2");
Not connected

To run this code block you must be logged in and your studio instance must be started.

As before you can cancel the store and forward query using cancel_edge_cq:

cancel_edge_cq("edge1","forward2");
Not connected

To run this code block you must be logged in and your studio instance must be started.

Rotating file writers​

The CSV and JSON storers use general-purpose rotating file writer functions that are also available for direct use outside of SNF:

  • csv:write_rotating(prefix, stream) — Write a stream of vectors to rotating CSV files named <prefix>_NNNNNNNNNN.csv.
  • csv:write_rotating(prefix, segment_rows, max_segments, stream) — With explicit rotation settings.
  • json:write_rotating(prefix, stream) — Write a stream of objects to rotating JSON files.
  • json:write_rotating(prefix, segment_rows, max_segments, stream) — With explicit rotation settings.

Local store and forward​

Sometimes it is desirable to run a query that never tries to forward the data to a server. You might have a sink on the edge that will take care of the data streams by storing them locally, sending them in a queue to other programs or simply discards the output if your query updates the local edge database.

Since you can always register a sink function that handles the output of a CQ as the result of the query itself a local SNF query does not need a storer function. However, you do need to add a field store-and-forward-local to your options record. This option will tell the edge to run this query and simply throw away the result. If you wish to save the result to disk as a log file you can use functions like csv:write_file() or json:write_file() as a sink in the query to store the result.

A local store and forward example​

Start by creating a new option-record; now we only set the store-and-forward identifier and set store-and-forward-local to true:

set :localForward = {
"store-and-forward": "local-forward1",
"store-and-forward-local": true
}
Not connected

To run this code block you must be logged in and your studio instance must be started.

As an example, we produce a data flow named my-flow by a publish sink for the edge query simstream(0.02) with SNF options :localForward:

edge_cq("edge1", "select publish(simstream(0.02),'my-flow') limit 10000",
:localForward);
Not connected

To run this code block you must be logged in and your studio instance must be started.

On the SAS we can now subscribe to the published flow my-flow from edge1 by using subscribe(Charstring edge, Charstring flow):

subscribe("edge1","my-flow");
Not connected

To run this code block you must be logged in and your studio instance must be started.

As usual you can use cancel_edge_cq to stop it:

cancel_edge_cq("edge1","local-forward1");
Not connected

To run this code block you must be logged in and your studio instance must be started.