Skip to main content

Azure Event Hub

When interfacing sa.engine with Event Hub there are two main actions: publishing object streams to Even Hubs, and subscribing on object streams coming from Event Hubs.

This introduction assumes that you have some experience on using sa.engine. It is expected that you have done the tutorial in the documentation of the visual analyzer.

Installing the Event Hub plugin

Event Hub is installed by unzipping the file sa_event_hub.zip under the sa.engine home directory. The sa.engine home directory is Documents/SA on windows and OSX and ~/SA on Linux based systems. If you are unsure where to find the sa home directory you can start sa.engine and call the function sa_home().

After unzipping sa_event_hub.zip you should at least have the following file tree in sa.engine home:

  • bin
    • sa_event_hub.jar
    • azure-eventhubs-2.2.0.jar
    • azure-eventhubs-eph-2.4.0.jar
    • azure-keyvault-core-1.0.0.jar
    • azure-storage-8.0.0.jar
    • commons-lang3-3.4.jar
    • gson-2.8.5.jar
    • guava-20.0.jar
    • jackson-core-2.9.4.jar
    • proton-j-0.31.0.jar
    • qpid-proton-j-extensions-1.1.0.jar
    • slf4j-api-1.7.25.jar
  • models
    • event_hub
      • master.osql

Once you've unzipped the contents of sa_event_hub.zip correctly you can start sa.engine and load the Event Hub model with models:load("event_hub").

Event Hub interface

The Event Hub plugin consists of three OSQL functions with signatures:

event_hub_config(Charstring nm) -> Record
event_hub_subscribe(Record conf) -> Stream of Record
event_hub_publish(Stream data, Record props) -> Stream

The function event_hub_config returns the Event Hub configuration named nm.

The function event_hub_subscribe subscribes to an Event Hub specified by the configuration record conf.

The function event_hub_publish publishes a stream an Event Hub specified by the configuration record conf.

Configuration records

When publishing and subscribing to Event Hub, sa.engine needs to know how to connect to the Event Hub service in question. This is done with a configuration record that specifies configuration properties and their corresponding values. The format of Event Hub configuration records is the following:

{
"namespace": "_EVENT-HUB-NAMESPACE_",
"event_hub_name": "_EVENT-HUB-NAME_",
"policy_name": "_POLICY-NAME_",
"primary_sas_key": "_PRIMARY-SAS-KEY_",
"storage_connection_string": _STORAGE-CONNECTION-STRING_",
"storage_container_name": "_STORAGE-CONTAINER-NAME_",
"host_name_prefix":"_HOST-NAME-PREFIX_"
};

Publishing sa.engine streams to Event Hubs

Publishing streams to an Event Hub is done with the function event_hub_publish. To publish an sa.engine stream s on an Event Hub service defined as a config named “test”, call:

event_hub_publish(s, event_hub_config("test"));

The event_hub_publish function returns a stream, which allows you to publish stream elements inside continuous queries, for example:

select x
from Stream of Number x, Stream of Number y, Record conf
where conf = event_hub_config("test")
and y = event_hub_publish(heartbeat(0.5), conf)
and x = event_hub_publish(y+1, conf)

Subscribing to an Event Hub.

Subscribing to an Event Hub service is done with the function event_hub_subscribe. To subscribe to the Event Hub service configured in event hub config “test”, call:

event_hub_subscribe(event_hub_config(test));

The result from such a subscription is a stream of records where each record has the structure: {"value": v}. Here v is actual data element received. If you have published the stream heartbeat(0.5) to the Event Hub the result would be a stream of records:

{
"value": 0
}

{
"value": 0.5
}

{
"value": 1
}

etc...

Customized data encoding

By default sa.engine will publish data as a stream of JSON objects and the subscription will likewise receive a stream of JSON object for a topic. The system supports other formats as well, as described for CSV below and the user can define own (de)serialization OSQL functions for other formats.

Serialization functions

Data element must be converted to a linear format (usually strings) when being published with Event Hub. If you have a different format than the default JSON on the published Event Hub data you can add your own serialization by registering to sa.engine a serializer function s with signature s(Object o)->Charstring that takes an OSQL object o as argument and converts it to a string. Below is an example on how to create and register your own serializer function for CSV and then using it when publishing a stream:

create function my_serializer(Object o) -> Charstring
as stringify_csv(o);

set event_hub_config('test') =
{
...
'serializer': 'my_serializer'
};

event_hub_publish(heartbeat(1), event_hub_config('test'));

Deserialization functions

If you have a different format than JSON for data subscribed to from Event Hub you can add your own deserialization by registering a deserializer function d with signature d(Charstring s) -> Object, which takes a received Event Hub string s and converts it to a corresponding OSQL object, for example:

create function my_deserializer(Charstring c) -> Object
as unstringify_json(c);

set event_hub_config("test") =
{
...
"deserializer": "my_deserializer"
};

event_hub _subscribe(event_hub _config(test'));