Detached analytics on edge devices
In this section you will learn how to run edge queries in store and forward (SNF) mode. When running in SNF mode an edge device doesn't need to be connected to the server at all times. It will buffer up locally to a configurable number of elements over a configurable time period. When the time period expires it will attempt to connect to the server and try to forward these values through a configurable function on the server.
In addition, store and forward can be configured to never send streams to the server allowing for a fully detached execution of queries.
To understand SNF you will now learn how to configure an edge device to run edge queries on start.
This section assumes that you have at least one edge named edge1 connected.
Configuring store and forward
Store and forward has the following configurable options:
store-and-forward
(Required) - Name of the registered store and forward query, which is used for identifying the task for accessing data and for stopping the query.store-and-forward-interval
- Number of Seconds between each forward attempt north to the SAS where the edge is registered. If not set then edge will attempt to forward oncebuffer-size/2
elements are buffered.buffer-size
(Default 1000) - Maximum number of elements to buffer between each forward attempt. If the stream produces more thanbuffer-size
elements with thestore-and-forward-interval
the oldest values will be dropped. Ifstore-and-forward-interval
is not set edge will attempt to forward elements oncebuffer-size/2
elements are buffered.store-and-forward-storefn
(Defaultstream.charstring.charstring.record.snf_publish_storer->boolean
) - The OSQL function to call in the server when requesting pending stored data from the edge.store-and-forward-params
- Optional record with further parameters to thestore-and-forward-storefn
.store-and-forward-local
- true or false, when true the edge will never try to forward streams to the server, i.e. it will only run in the background locally. When set, only thestore-and-forward
option has any effect.
These options are set in a record as the third argument to
edge_cq
. For example, the following query declares a record that
will start an SNF named forward1, which uploads data every third
second and stores up to 100 elements
set :forward1 = {
"store-and-forward": "forward1",
"store-and-forward-interval": 3,
"buffer-size": 100
}
To run this code block you must be logged in and your studio instance must be started.
We can use :forward1
as SNF parameter opts
to
edge_cq(edge,cq,opts)
. For example, the following SNF query will
start simstream(1)
and store up to 100 elements before sending them north
every 3 seconds:
Unlike the regular behavior of edge_cq()
, results
from a store-and-forward query will not be visible until you subscribe to it.
How to do this is described in the next section.
edge_cq("edge1","select simstream(1) limit 1000", :forward1);
To run this code block you must be logged in and your studio instance must be started.
Access data from store and forward.
By default store and forward will simply publish the data on a
server-side subscription with the name
<edgeid>-SNF-<store-and-forward-name>
in all uppercase. Let's look
at the data coming from our store and forward query:
subscribe("EDGE1-SNF-FORWARD1");
To run this code block you must be logged in and your studio instance must be started.
The following CQ views the stream in real-time from the SAS:
subscribe('edge1','forward1')
To run this code block you must be logged in and your studio instance must be started.
The following query returns all queries currently running on edge1
select r["requests"]
from Record r
where r in edge_status()
and r["id"]="EDGE1"
To run this code block you must be logged in and your studio instance must be started.
You can stop an edge CQ using cancel_edge_cq()
:
cancel_edge_cq("edge1","forward1");
To run this code block you must be logged in and your studio instance must be started.
Now look at the running queries again:
select r["requests"]
from Record r
where r in edge_status()
and r["id"]="EDGE1"
To run this code block you must be logged in and your studio instance must be started.
Changing the function used to forward the data to the server
By default the SNF publisher does not persist the data forwarded to
the server. It is possible to create a CQ that runs a stream, does
some analytics over it, and finally stores the resulted events in an
stored OSQL function. However, usually you just want to save the data
on disk in a log file, or forward it on a Kafka or MQTT
topic. To control what to do with the output events from a CQ running
as SNF, you can specify a storer function via the
store-and-forward-storefn
property. A storer function snf
must
have the signature snf(Stream s,Charstring edgeid, Charstring
flowname, Record params)->Object
. The result is ignored; the storer
function is a sink.
Look at the source code for the function named
snf_publish_storer
Using the built-in CSV storer function
Let's try using the built-in CSV storer function. Start by creating a new option record:
set :forward2 = {
"store-and-forward": "forward2",
"store-and-forward-interval": 3,
"store-and-forward-storefn": "snf_csv_storer",
"buffer-size": 100
};
To run this code block you must be logged in and your studio instance must be started.
When using the CSV storer function each forward from the edge will
create a file under sa_home()+"/logs/snf/<EDGE-ID>
for each forward
from the edge. To get the folder for edge1
run:
get_snf_folder("edge1")
To run this code block you must be logged in and your studio instance must be started.
There is also a utility function where you can get all available files
using get_snf_files
:
get_snf_files("edge1","forward1")
To run this code block you must be logged in and your studio instance must be started.
Let's start a store and forward with the CSV storer function:
edge_cq("edge1","select simstream(0.3) limit 1000", :forward2);
To run this code block you must be logged in and your studio instance must be started.
After a few seconds you should start getting some files from the following call:
get_snf_files("edge1","forward2")
To run this code block you must be logged in and your studio instance must be started.
Each forward is tagged with a time stamp at the end of the filename. To view the entire store and forward log for forward2 on edge1 run:
select vector of x
from Vector x
where x in get_snf_log("edge1","forward2");
To run this code block you must be logged in and your studio instance must be started.
As before you can cancel the store and forward query using
cancel_edge_cq
:
cancel_edge_cq("edge1","forward2");
To run this code block you must be logged in and your studio instance must be started.
Local store and forward
Sometimes it is desirable to run a query that never tries to forward the data to a server. You might have a sink on the edge that will take care of the data streams by storing them locally, sending them in a queue to other programs or simply discards the output if your query updates the local edge database.
Since you can always register a sink function that handles the output
of a CQ as the result of the query itself a local SNF query does not
need a storer function. However, you do need to add a field
store-and-forward-local
to your options record. This option will
tell the edge to run this query and simply throw away the result. If
you wish to save the result to disk as a log file you can use
functions like csv:write_file()
or json:write_file()
as a sink in
the query to store the result.
A local store and forward example
Start by creating a new option-record; now we only set the
store-and-forward
identifier and set store-and-forward-local
to true:
set :localForward = {
"store-and-forward": "local-forward1",
"store-and-forward-local": true
}
To run this code block you must be logged in and your studio instance must be started.
As an example, we produce a data flow named my-flow
by a publish
sink for the edge query simstream(0.02)
with SNF options
:localForward
:
edge_cq("edge1", "select publish(simstream(0.02),'my-flow') limit 10000",
:localForward);
To run this code block you must be logged in and your studio instance must be started.
On the SAS we can now subscribe to the published flow my-flow
from edge1
by using subscribe(Charstring edge, Charstring flow)
:
subscribe("edge1","my-flow");
To run this code block you must be logged in and your studio instance must be started.
As usual you can use cancel_edge_cq
to stop it:
cancel_edge_cq("edge1","local-forward1");
To run this code block you must be logged in and your studio instance must be started.
Persistent edge queries
By now you have learned how to run regular edge queries with SNF. This
section describes how to create persistent edge queries that run
autonomously. Persistent edge queries are defined using the OSQL type
EdgeQuery
. All instances of type EdgeQuery
are always started when an
edge listener is started.
The type EdgeQuery
has the propeties:
create type EdgeQuery;
create function name(EdgeQuery eq) -> Charstring key as stored;
create function statement(EdgeQuery eq) -> Charstring as stored;
create function options(EdgeQuery eq) -> Record as stored;
As the function names name
, statement
, and options
suggest, the
properties of EdgeQuery
are very similar to the arguments of
edge_cq
. The main difference is that the property name
is
essentially the id of the query when it runs autonomously on the edge.
When an edge starts it will go through all instances of EdgeQuery
and start them. If you dynamically create more EdgeQuery
objects
during an interactive session they will be used at the next edge
restart. You can activate a persistent edge query eq
by calling
run_stored_edge_query(eq)
.
Documentation of properties of the type Edgequery
:
select mddoc(f)
from Function f
where f in methods(typenamed("Edgequery"))
To run this code block you must be logged in and your studio instance must be started.
You can cancel a persistent edge query as usual with
cancel_edge_cq()
. However if you do not want the query to run again
on the next restart of the edge you will have to update and save the
local database to remove the instance of EdgeQuery
.