Skip to main content

MQTT

Module specification 
SA Engine version:4.13.0 (full system only)
Supported platforms:Linux(x86), Raspberry Pi, Raspberry Pi Zero

MQTT is a standard messaging protocol for the Internet of Things (IoT). It is designed as an extremely lightweight publish/subscribe messaging transport that is ideal for connecting remote devices with a small code footprint and minimal network bandwidth. This plugin provides an API for using the MQTT messaging protocol in SA Engine.

In this documentation we use the publicly available MQTT broker test.mosquitto.org which supports communication using TLS. We also use the command line tools mosquitto_pub and mosquitto_sub that come bundled with Eclipse Mosquitto, an open source MQTT broker, for sending and receiving MQTT messages on the command line.

Load MQTT plugin

To use MQTT in SA Engine you first have to load the MQTT plugin:

loadsystem(startup_dir()+"../extenders/sa.mqtt","mqtt.osql");
Not connected

To run this code block you must be logged in and your studio instance must be started.

Connect to broker

To connect to a broker you first have to specify your connection options:

seed_random_using_secure();
set :mqtt_connect_opts = {
"qos": 1,
"connection": "tcp://test.mosquitto.org:1883",
"clientid": "client" + sha256(ntoa(rand(10e16)))
};
Not connected

To run this code block you must be logged in and your studio instance must be started.

You then use the connection options to connect to the broker by calling mqtt:register_client:

mqtt:register_client("mqtt",:mqtt_connect_opts);
Not connected

To run this code block you must be logged in and your studio instance must be started.

All available connection options are listed in the docs for mqtt:register_client:

doc("mqtt:register_client");
Not connected

To run this code block you must be logged in and your studio instance must be started.

Listen for messages

Now that we are connected to the broker we can use the subscribe function to listen to a topic:

select charstring(b)
from binary b
where b in subscribe("mqtt:my/experimental");
Not connected

To run this code block you must be logged in and your studio instance must be started.

The query will run until turned off and show any messages that are posted in the my/experimental topic. You should keep the query above running and post a message to the topic to see that the SA Engine instance is listening.

You can use any means for publishing a message to the Mosquitto broker on the my/experimental topic. Here publish the message by executing the mosquitto_pub command in a terminal:

mosquitto_pub -h test.mosquitto.org -t my/experimental -m "Hello, SA Engine!"

The message should appear in the result for the subscribe query above:

"Hello, SA Engine!"

Publish messages

Now we are going to publish messages on the my/experimental topic from SA Engine.

Ensure that you have stopped the subscribe query above and start listening to the my/experimental topic. You can use any means to listen to the Mosquitto broker. Here we listen to the broker by executing the mosquitto_sub command in a terminal:

mosquitto_sub -h test.mosquitto.org -t my/experimental

Now that the terminal is listening, you can run the following query in SA Engine:

publish(streamof("My first message!"), "mqtt:my/experimental");
Not connected

To run this code block you must be logged in and your studio instance must be started.

This publishes the message "My first message!" and the terminal listening for messages on the topic my/experimental should display it as received.

My first message!

Send and receive JSON

To send a message as JSON, simply publish a JSON object instead of a string:

publish(streamof({"message": "My first JSON message!"}),
"mqtt:my/experimental");
Not connected

To run this code block you must be logged in and your studio instance must be started.

The terminal listening for messages on the my/experimental topic displays the JSON message as received:

{"message":"My first JSON message!"}

To listen for JSON messages in SA Engine you simply receive the JSON as a string and then unpack the JSON object:

select json:unstringify(charstring(b))
from binary b
where b in subscribe("mqtt:my/experimental");
Not connected

To run this code block you must be logged in and your studio instance must be started.

Start the above query and then publish a JSON message from an external source to the MQTT broker. Here we send a JSON message on the my/experimental topic with the mosquitto_pub command in a terminal:

mosquitto_pub -h test.mosquitto.org -t my/experimental -m "{\"message\": \"Here comes JSON\!\"}"

The query listening for JSON messages should display the message as received:

{"message":"Here comes JSON!"}

Send query result as message

Let's say that we have some data source that generates a stream of data, e.g., a sensor reading. Here we will simulate such a sensor with the simstream() function. Let's also say that we are only interested in positive sensor values. And whenever there is a positive sensor value we want to publish the value and a timestamp as JSON to an MQTT broker.

We start by defining the function that extracts the sensor stream (simstream) that emits a value every .1 second, filters the stream values (v > 0), adds a timestamp (ts), and wraps the result in a stream of JSON strings (json:stringify):

create function my_json_stream() -> Stream of Charstring
as select streamof(json:stringify(ts(v)))
from number v
where v in simstream(0.1)
and v > 0;
Not connected

To run this code block you must be logged in and your studio instance must be started.

You should prepare a MQTT client that listens to the my/experimental topic so you can verify that the results of the function are published. Here we listen to the broker by running the mosquitto_sub command in a terminal:

mosquitto_sub -h test.mosquitto.org -t my/experimental

Now you can run the my_json_stream function and publish the messages on the MQTT by running the following query:

publish(my_json_stream(), "mqtt:my/experimental");
Not connected

To run this code block you must be logged in and your studio instance must be started.

The terminal listening to the broker should show the messages as received looking something like this:

{"sa_time": "2022-01-26T09:46:34.290Z", "sa_value": 0.135167884385362}
{"sa_time": "2022-01-26T09:46:34.490Z", "sa_value": 0.112810435340008}
{"sa_time": "2022-01-26T09:46:34.590Z", "sa_value": 0.81064465289327}
{"sa_time": "2022-01-26T09:46:34.690Z", "sa_value": 1.05985752222502}
{"sa_time": "2022-01-26T09:46:34.790Z", "sa_value": 0.269298018030197}
{"sa_time": "2022-01-26T09:46:34.990Z", "sa_value": 0.838084629438005}
{"sa_time": "2022-01-26T09:46:35.090Z", "sa_value": 1.96788004638127}
{"sa_time": "2022-01-26T09:46:35.190Z", "sa_value": 1.26667751609015}
{"sa_time": "2022-01-26T09:46:35.390Z", "sa_value": 0.0685725375948125}
...

Custom message structure

The example in the previous section used json:stringify on a timestamp object. This automatically produces the default data names "sa_time" and "sa_value". If we want to have custom names we can call json:stringify on a JSON object where we define the data names ourselves:

create function my_custom_json_stream() -> Stream of Charstring
as select stream of json:stringify({
"my_value": v, "my_time": utc_time(now()) })
from number v
where v in simstream(0.1)
and v > 0;
Not connected

To run this code block you must be logged in and your studio instance must be started.

If we then call publish on our new function:

publish(my_custom_json_stream(), "mqtt:my/experimental");
Not connected

To run this code block you must be logged in and your studio instance must be started.

Then our listening MQTT client will show the messages with our custom data names:

{"my_value":0.135167884385362,"my_time":"2022-01-27T14:11:42.572Z"}
{"my_value":0.112810435340008,"my_time":"2022-01-27T14:11:42.772Z"}
{"my_value":0.81064465289327,"my_time":"2022-01-27T14:11:42.872Z"}
{"my_value":1.05985752222502,"my_time":"2022-01-27T14:11:42.972Z"}
{"my_value":0.269298018030197,"my_time":"2022-01-27T14:11:43.072Z"}
{"my_value":0.838084629438005,"my_time":"2022-01-27T14:11:43.272Z"}
{"my_value":1.96788004638127,"my_time":"2022-01-27T14:11:43.372Z"}
{"my_value":1.26667751609015,"my_time":"2022-01-27T14:11:43.472Z"}
{"my_value":0.0685725375948125,"my_time":"2022-01-27T14:11:43.672Z"}
{"my_value":1.96960299888449,"my_time":"2022-01-27T14:11:43.772Z"}
...

Secure MQTT

SA Engine offers secure MQTT using transport layer encryption via TLS. For secure communication over MQTT you have to communicate with the broker through TLS and supply a certificate.

To be able to communicate securely with the broker we need a certificate. We use the default CA file mosquitto.org.crt provided by mosquitto to verify the server connection (can be downloaded from here):

http:download_file(
"http://assets.streamanalyze.com/docs/mqtt/mosquitto.org.crt",
{}, temp_folder() + "mosquitto.org.crt");
Not connected

To run this code block you must be logged in and your studio instance must be started.

Now that we have a certificte we set the connection options. In the connection options we provide the default TLS port 8883 and specify the certificate file mosquitto.org.crt we downloaded:

set :mqtt_secure_connect_opts = {
"qos": 1,
"connection": "ssl://test.mosquitto.org:8883",
"cafile": temp_folder() + "mosquitto.org.crt",
"clientid": "client" + sha256(ntoa(rand(10e16)))
};
Not connected

To run this code block you must be logged in and your studio instance must be started.

Now we can use the connection options to connect to the broker:

mqtt:register_client("mqtt-secure",:mqtt_secure_connect_opts);
Not connected

To run this code block you must be logged in and your studio instance must be started.

All available connection options are listed in the docs for mqtt:register_client:

doc("mqtt:register_client");
Not connected

To run this code block you must be logged in and your studio instance must be started.

Publish secure messages

Now that you have registered your client to use TLS you simply publish messages in the same way as for regular unencrypted TCP communication.

Before you publish any messages you should subscribe to the topic so you can verify that the messages get published. Here we listen to the my/experimental topic on the broker by running the mosquitto_sub command in a terminal:

mosquitto_sub -h test.mosquitto.org -p 8883 --cafile mosquitto.org.crt -t my/experimental

Now you can publish messages from SA Engine using the following query:

publish(streamof("My first secure message!"), "mqtt-secure:my/experimental");
Not connected

To run this code block you must be logged in and your studio instance must be started.

The message should appear in your command terminal as received:

My first secure message!