MQTT
Module specification | |
---|---|
SA Engine version: | 4.13.0 (full system only) |
Supported platforms: | Linux(x86), Raspberry Pi, Raspberry Pi Zero |
MQTT is a standard messaging protocol for the Internet of Things (IoT). It is designed as an extremely lightweight publish/subscribe messaging transport that is ideal for connecting remote devices with a small code footprint and minimal network bandwidth. This plugin provides an API for using the MQTT messaging protocol in SA Engine.
In this documentation we use the publicly available MQTT broker test.mosquitto.org which supports communication using TLS. We also use the command line tools mosquitto_pub
and mosquitto_sub
that come bundled with Eclipse Mosquitto, an open source MQTT broker, for sending and receiving MQTT messages on the command line.
Load MQTT plugin
To use MQTT in SA Engine you first have to load the MQTT plugin:
loadsystem(startup_dir()+"../extenders/sa.mqtt","mqtt.osql");
To run this code block you must be logged in and your studio instance must be started.
Connect to broker
To connect to a broker you first have to specify your connection options:
set :mqtt_connect_opts = {
"qos": 1,
"connection": "tcp://test.mosquitto.org:1883",
"clientid": "client" + sha256(ntoa(rand(10e16)))
};
To run this code block you must be logged in and your studio instance must be started.
You then use the connection options to connect to the broker by calling mqtt:register_client
:
mqtt:register_client("mqtt",:mqtt_connect_opts);
To run this code block you must be logged in and your studio instance must be started.
All available connection options are listed in the docs for mqtt:register_client
:
doc("mqtt:register_client");
To run this code block you must be logged in and your studio instance must be started.
Listen for messages
Now that we are connected to the broker we can use the subscribe
function to listen
to a topic:
select charstring(b)
from binary b
where b in subscribe("mqtt:my/experimental");
To run this code block you must be logged in and your studio instance must be started.
The query will run until turned off and show any messages that are posted in the my/experimental
topic. You should keep the query above running and post a message to the topic to see that the SA Engine instance is listening.
You can use any means for publishing a message to the Mosquitto broker on the my/experimental
topic. Here publish the message by executing the mosquitto_pub
command in a terminal:
mosquitto_pub -h test.mosquitto.org -t my/experimental -m "Hello, SA Engine!"
The message should appear in the result for the subscribe
query above:
"Hello, SA Engine!"
Publish messages
Now we are going to publish messages on the my/experimental
topic from SA Engine.
Ensure that you have stopped the subscribe
query above and start listening to the my/experimental
topic. You can use any means to listen to the Mosquitto broker. Here we listen to the broker by executing the mosquitto_sub
command in a terminal:
mosquitto_sub -h test.mosquitto.org -t my/experimental
Now that the terminal is listening, you can run the following query in SA Engine:
publish(streamof("My first message!"), "mqtt:my/experimental");
To run this code block you must be logged in and your studio instance must be started.
This publishes the message "My first message!" and the terminal listening for messages on the topic my/experimental
should display it as received.
My first message!
Send and receive JSON
To send a message as JSON, simply publish a JSON object instead of a string:
publish(streamof({"message": "My first JSON message!"}),
"mqtt:my/experimental");
To run this code block you must be logged in and your studio instance must be started.
The terminal listening for messages on the my/experimental
topic displays the JSON message as received:
{"message":"My first JSON message!"}
To listen for JSON messages in SA Engine you simply receive the JSON as a string and then unpack the JSON object:
select json:unstringify(charstring(b))
from binary b
where b in subscribe("mqtt:my/experimental");
To run this code block you must be logged in and your studio instance must be started.
Start the above query and then publish a JSON message from an external source to the MQTT broker. Here we send a JSON message on the my/experimental
topic with the mosquitto_pub
command in a terminal:
mosquitto_pub -h test.mosquitto.org -t my/experimental -m "{\"message\": \"Here comes JSON\!\"}"
The query listening for JSON messages should display the message as received:
{"message":"Here comes JSON!"}
Send query result as message
Let's say that we have some data source that generates a stream of data, e.g., a sensor reading. Here we
will simulate such a sensor with the simstream()
function. Let's also say that we are only interested in
positive sensor values. And whenever there is a positive sensor value we want to publish the value and
a timestamp as JSON to an MQTT broker.
We start by defining the function that extracts the sensor stream (simstream
) that emits a value
every .1 second, filters the stream values (v > 0
), adds a timestamp (ts
), and wraps the result
in a stream of JSON strings (json:stringify
):
create function my_json_stream() -> Stream of Charstring
as select streamof(json:stringify(ts(v)))
from number v
where v in simstream(0.1)
and v > 0;
To run this code block you must be logged in and your studio instance must be started.
You should prepare a MQTT client that listens to the my/experimental
topic so you can verify that the results
of the function are published. Here we listen to the broker by running the mosquitto_sub
command in a terminal:
mosquitto_sub -h test.mosquitto.org -t my/experimental
Now you can run the my_json_stream
function and publish the messages on the MQTT by running the following query:
publish(my_json_stream(), "mqtt:my/experimental");
To run this code block you must be logged in and your studio instance must be started.
The terminal listening to the broker should show the messages as received looking something like this:
{"sa_time": "2022-01-26T09:46:34.290Z", "sa_value": 0.135167884385362}
{"sa_time": "2022-01-26T09:46:34.490Z", "sa_value": 0.112810435340008}
{"sa_time": "2022-01-26T09:46:34.590Z", "sa_value": 0.81064465289327}
{"sa_time": "2022-01-26T09:46:34.690Z", "sa_value": 1.05985752222502}
{"sa_time": "2022-01-26T09:46:34.790Z", "sa_value": 0.269298018030197}
{"sa_time": "2022-01-26T09:46:34.990Z", "sa_value": 0.838084629438005}
{"sa_time": "2022-01-26T09:46:35.090Z", "sa_value": 1.96788004638127}
{"sa_time": "2022-01-26T09:46:35.190Z", "sa_value": 1.26667751609015}
{"sa_time": "2022-01-26T09:46:35.390Z", "sa_value": 0.0685725375948125}
...
Custom message structure
The example in the previous section used json:stringify
on a timestamp object. This automatically produces the default data names "sa_time" and "sa_value". If we want to have custom names we can call json:stringify
on a JSON object where we define the data names ourselves:
create function my_custom_json_stream() -> Stream of Charstring
as select stream of json:stringify({
"my_value": v, "my_time": utc_time(now()) })
from number v
where v in simstream(0.1)
and v > 0;
To run this code block you must be logged in and your studio instance must be started.
If we then call publish on our new function:
publish(my_custom_json_stream(), "mqtt:my/experimental");
To run this code block you must be logged in and your studio instance must be started.
Then our listening MQTT client will show the messages with our custom data names:
{"my_value":0.135167884385362,"my_time":"2022-01-27T14:11:42.572Z"}
{"my_value":0.112810435340008,"my_time":"2022-01-27T14:11:42.772Z"}
{"my_value":0.81064465289327,"my_time":"2022-01-27T14:11:42.872Z"}
{"my_value":1.05985752222502,"my_time":"2022-01-27T14:11:42.972Z"}
{"my_value":0.269298018030197,"my_time":"2022-01-27T14:11:43.072Z"}
{"my_value":0.838084629438005,"my_time":"2022-01-27T14:11:43.272Z"}
{"my_value":1.96788004638127,"my_time":"2022-01-27T14:11:43.372Z"}
{"my_value":1.26667751609015,"my_time":"2022-01-27T14:11:43.472Z"}
{"my_value":0.0685725375948125,"my_time":"2022-01-27T14:11:43.672Z"}
{"my_value":1.96960299888449,"my_time":"2022-01-27T14:11:43.772Z"}
...
Secure MQTT
SA Engine offers secure MQTT using transport layer encryption via TLS. For secure communication over MQTT you have to communicate with the broker through TLS and supply a certificate.
To be able to communicate securely with the broker we need a certificate. We use the default CA file mosquitto.org.crt
provided by mosquitto to verify the server connection (can be downloaded from here):
http:download_file(
"http://assets.streamanalyze.com/docs/mqtt/mosquitto.org.crt",
{}, temp_folder() + "mosquitto.org.crt");
To run this code block you must be logged in and your studio instance must be started.
Now that we have a certificte we set the connection options. In the connection options we provide the default TLS port 8883
and specify the certificate file mosquitto.org.crt
we downloaded:
set :mqtt_secure_connect_opts = {
"qos": 1,
"connection": "ssl://test.mosquitto.org:8883",
"cafile": temp_folder() + "mosquitto.org.crt",
"clientid": "client" + sha256(ntoa(rand(10e16)))
};
To run this code block you must be logged in and your studio instance must be started.
Now we can use the connection options to connect to the broker:
mqtt:register_client("mqtt-secure",:mqtt_secure_connect_opts);
To run this code block you must be logged in and your studio instance must be started.
All available connection options are listed in the docs for mqtt:register_client
:
doc("mqtt:register_client");
To run this code block you must be logged in and your studio instance must be started.
Publish secure messages
Now that you have registered your client to use TLS you simply publish messages in the same way as for regular unencrypted TCP communication.
Before you publish any messages you should subscribe to the topic so you can verify that the messages get published.
Here we listen to the my/experimental
topic on the broker by running the mosquitto_sub
command in a terminal:
mosquitto_sub -h test.mosquitto.org -p 8883 --cafile mosquitto.org.crt -t my/experimental
Now you can publish messages from SA Engine using the following query:
publish(streamof("My first secure message!"), "mqtt-secure:my/experimental");
To run this code block you must be logged in and your studio instance must be started.
The message should appear in your command terminal as received:
My first secure message!