How to run OPC UA PubSub over MQTT

 using open source OPC UA stack open62541

Background:

Many people inaccurately limit OPC UA only as a “last mile” communication between the field device and edge applications while mistakenly assuming MQTT is restricted to connectivity between edge and cloud. The OPC Foundation favors MQTT as one (of many) protocols supporting cloud connectivity and underlying infrastructure for “OPC UA over MQTT,” transporting standardized information. MQTT is a payload-agnostic protocol, possessing no definition of the message payload, similar to the earlier years of Modbus – certainly, very simple – but just without a content-standardized description of the data, which is, today, no longer state of the art in this age of digitalization.

All major IoT vendors, such as AWS, Google Cloud, IBM, Microsoft, SAP, and SIEMENS, support this secure, standardized information exchange backing edge-to-cloud applications based on OPC UA – the first plugfest is already actively underway. See the press release from OPC Foundation with more information here: Leading IoT Vendors Commit to OPC UA Adoption – OPC Foundation.

“The OPC Foundation effort has the potential to solve industrial interoperability challenges for ever” – Bhagath, CEO Kalycito.

If all manufacturers of controllers were to define their own manufacturer-specific mapping to MQTT – and if other consortia were to do the same – then there would be a zoo of MQTT mappings! However, no longer does this have anything to do with a simple interoperability plug-and-produce solution; instead, the industry needs to agree on ONE mapping: The OPC Foundation published OPC UA PubSub (over UDP and MQTT) in February 2018 – OPC UA is the IEC 62541 standard.

Scope of our work at Kalycito and this article:

Back in 2018, we at Kalycito joined hands with Fraunhofer IOSB and OSADL to release the world’s first implementation of OPC UA PubSub. In this article, we will be looking at how to send data from an IOT edge device to the cloud leveraging this work. To make it easier, let us imagine a use case where we have to publish the value of motor speed to an external MQTT broker. Let us see how we can use the open-source OPC UA stack open62541 for the same.

The following code snippets will give you an idea of how you can run your own OPC UA server and then use OPC UA PubSub over MQTT to send the data to the cloud.

git clone https://github.com/open62541/open62541.git
cd open62541
git checkout v1.3-rc2-ef2

The server

In this section, we will create an OPC UA server with a standard Information model. The application is created in examples/pubsub/pubsub_mqtt.c.


#include <open62541/plugin/log_stdout.h>
#include <open62541/server.h>
#include <open62541/server_config_default.h>

#include "ua_pubsub.h"
#include "ua_network_pubsub_mqtt.h"

#include <signal.h>
#include <stdlib.h>

static volatile UA_Boolean running = true;
static void stopHandler(int sig) {
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_USERLAND, "received ctrl-c");
    running = false;
}

int main(void) {
    signal(SIGINT, stopHandler);
    signal(SIGTERM, stopHandler);

    UA_Server *server = UA_Server_new();
    UA_ServerConfig *config = UA_Server_getConfig(server);
    UA_ServerConfig_setDefault(config);


    - "Add motor speed variable"

    - "Add PubSub transport layer"

    - "Add PubSub connection"

    - "Add publisher dataset"

    - "Add dataset field"

    - "Add writer group"

    - "Add dataset writer"

    UA_StatusCode retval = UA_Server_run(server, &running);

    UA_Server_delete(server);
    return retval == UA_STATUSCODE_GOOD ? EXIT_SUCCESS : EXIT_FAILURE;
}

Add motor speed variable

Let’s create a Pump Object with a Component motorRPM Variable.

To add a new variable to the open62541 server, we shall configure the list of the attributes defined for VariableNode in the UA_VariableAttributes structure.


static void
addDataSourceMotorVariable(UA_Server *server) {
    UA_NodeId pumpId;
    UA_ObjectAttributes oAttr = UA_ObjectAttributes_default;
    oAttr.displayName = UA_LOCALIZEDTEXT("en-US", "Pump");
    UA_Server_addObjectNode(server, UA_NODEID_NULL, UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER),
                            UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES), UA_QUALIFIEDNAME(1, "Pump"),
                            UA_NODEID_NUMERIC(0, UA_NS0ID_BASEOBJECTTYPE),
                            oAttr, NULL, &pumpId);

    UA_DataSource scaleTestDataSource;
    UA_VariableAttributes attr = UA_VariableAttributes_default;
    attr.dataType = UA_TYPES[UA_TYPES_DOUBLE].typeId;
    attr.accessLevel = UA_ACCESSLEVELMASK_READ | UA_ACCESSLEVELMASK_WRITE;
    attr.writeMask = UA_WRITEMASK_DISPLAYNAME | UA_WRITEMASK_DESCRIPTION;
    attr.userWriteMask = UA_WRITEMASK_DISPLAYNAME | UA_WRITEMASK_DESCRIPTION;
    attr.valueRank = UA_VALUERANK_SCALAR;

    scaleTestDataSource.read = readRandomDoubleData;
    attr.displayName = UA_LOCALIZEDTEXT("en-US", "MotorRPM");
    UA_QualifiedName qualifiedName = UA_QUALIFIEDNAME(1, "MotorRPM");
    UA_NodeId referenceTypeId = UA_NODEID_NUMERIC(0, UA_NS0ID_HASCOMPONENT);
    UA_NodeId requestedNodeId = UA_NODEID_STRING(1, "MotorRPM");
    UA_Server_addDataSourceVariableNode(server, requestedNodeId, pumpId,
                                        referenceTypeId, qualifiedName,
                                        UA_NODEID_NUMERIC(0, UA_NS0ID_BASEDATAVARIABLETYPE),
                                        attr, scaleTestDataSource, NULL, NULL);
}

Simulate the motor speed

We use random data generation to simulate the motorRPM values, which can be replaced with a physical pump connected to the system.


static UA_StatusCode
readRandomDoubleData (UA_Server *server,
                      const UA_NodeId *sessionId, void *sessionContext,
                      const UA_NodeId *nodeId, void *nodeContext,
                      UA_Boolean sourceTimeStamp,
                      const UA_NumericRange *range, UA_DataValue *value) {
    if(range) {
        value->hasStatus = true;
        value->status = UA_STATUSCODE_BADINDEXRANGEINVALID;
        return UA_STATUSCODE_GOOD;
    }

    UA_Double toggle = (UA_Double)UA_UInt32_random();
    UA_Variant_setScalarCopy(&value->value, &toggle, &UA_TYPES[UA_TYPES_DOUBLE]);
    value->hasValue = true;
    if(sourceTimeStamp) {
        value->hasSourceTimestamp = true;
        value->sourceTimestamp = UA_DateTime_now();
    }

    return UA_STATUSCODE_GOOD;
}

Build open62541

Before building open62541, let us modify the CMakeLists.txt present in the examples/CMakeLists.txt to add the newly created application.

We can add the below line at the end of CMakeLists.txt.

add_example(pubsub_mqtt pubsub/pubsub_mqtt.c)

To build the open62541 application.

mkdir build && cd build
cmake -DUA_BUILD_EXAMPLES=ON -DUA_ENABLE_PUBSUB=ON -DUA_ENABLE_PUBSUB_MQTT=ON -DUA_ENABLE_JSON_ENCODING=ON ..
make -j$(nproc) pubsub_mqtt

Run the server

We can now start the OPC UA server and check the motorRPM Variable value using an OPC UA client tool. ./bin/examples/pubsub_mqtt We can monitor the value by using opcua-commander which is a lightweight OPC UA client that works in a console and does not require any GUI framework. npx opcua-commander -e opc.tcp://localhost:4840
We can now navigate to the variable, /RootFolder/Objects/Pump/MotorRPM and monitor the value change.
open62541 server run

Add PubSub support

Now we have the server component up and running. To include support for the MQTT publisher, a list of objects must be added to the Information Model, which will be done in this section. The list of objects required in the Information Model for a PubSub communication to take place is represented in a diagrammatical form below.


+-----------+
| UA_Server |
+-----------+
 |    |
 |    |
 |    |
 |    |  +----------------------+
 |    +--> UA_PubSubConnection  |  UA_Server_addPubSubConnection
 |       +----------------------+
 |             |
 |             |    +----------------+
 |             +----> UA_WriterGroup |  UA_PubSubConnection_addWriterGroup
 |                  +----------------+
 |                       |
 |                       |    +------------------+
 |                       +----> UA_DataSetWriter |  UA_WriterGroup_addDataSetWriter     +-+
 |                            +------------------+                                        |r
 |                                                                                        |e
 |       +---------------------------+                                                    |f
 +-------> UA_PubSubPublishedDataSet |  UA_Server_addPublishedDataSet                   <-+
         +---------------------------+
               |
               |    +-----------------+
               +----> UA_DataSetField |  UA_PublishedDataSet_addDataSetField
                    +-----------------+

We shall now add the PubSub extension to our server by creating an MQTT JSON publisher and a dataset writer that publishes our motorRPM value.

Add PubSub transport layer

Publishers use standard messaging protocols like AMQP or MQTT to communicate with the broker. Use the following line of code to enable MQTT as PubSub transport layer.

UA_ServerConfig_addPubSubTransportLayer(config, UA_PubSubTransportLayerMQTT());

Add PubSub connection

The PubSubConnection contains the address of the MQTT broker. For our demo, we will use the mosquito broker (installed at the local machine – steps captured further below in this article) at "opc.mqtt://localhost:1883".


static void
addPubSubConnection(UA_Server *server, char *addressUrl) {
    /* Details about the connection configuration and handling are located
     * in the pubsub connection tutorial */
    UA_PubSubConnectionConfig connectionConfig;
    memset(&connectionConfig, 0, sizeof(connectionConfig));
    connectionConfig.name = UA_STRING("MQTT Publisher Connection");
    connectionConfig.transportProfileUri = UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt");
    connectionConfig.enabled = UA_TRUE;

    /* configure address of the mqtt broker (local on default port) */
    UA_NetworkAddressUrlDataType networkAddressUrl = {UA_STRING_NULL , UA_STRING(addressUrl)};
    UA_Variant_setScalar(&connectionConfig.address, &networkAddressUrl,
                         &UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
    /* Changed to static publisherId from random generation to identify
     * the publisher on Subscriber side */
    connectionConfig.publisherId.numeric = 2234;

    /* configure options, set mqtt client id */
    const int connectionOptionsCount = 1;

    UA_KeyValuePair connectionOptions[connectionOptionsCount];

    size_t connectionOptionIndex = 0;
    connectionOptions[connectionOptionIndex].key = UA_QUALIFIEDNAME(0, "mqttClientId");
    UA_String mqttClientId = UA_STRING("TESTCLIENTPUBSUBMQTT");
    UA_Variant_setScalar(&connectionOptions[connectionOptionIndex++].value, &mqttClientId, &UA_TYPES[UA_TYPES_STRING]);

    connectionConfig.connectionProperties = connectionOptions;
    connectionConfig.connectionPropertiesSize = connectionOptionIndex;

    UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
}

Add publisher dataset

The PublishedDataSet (PDS) and PubSubConnection are the top-level entities and can exist alone. The PDS contains the collection of the published fields.


static void
addPublishedDataSet(UA_Server *server) {
    /* The PublishedDataSetConfig contains all necessary public
    * information for the creation of a new PublishedDataSet */
    UA_PublishedDataSetConfig publishedDataSetConfig;
    memset(&publishedDataSetConfig, 0, sizeof(UA_PublishedDataSetConfig));
    publishedDataSetConfig.publishedDataSetType = UA_PUBSUB_DATASET_PUBLISHEDITEMS;
    publishedDataSetConfig.name = UA_STRING("Demo PDS");
    /* Create new PublishedDataSet based on the PublishedDataSetConfig. */
    UA_Server_addPublishedDataSet(server, &publishedDataSetConfig, &publishedDataSetIdent);
}

Add dataset field

The DataSetField (DSF) is part of the PDS and describes exactly one published field.

In our case, the nodeId of the variable to monitor is "ns=1;s= MotorRPM".


static void
addDataSetField(UA_Server *server) {
    /* Add a field to the previous created PublishedDataSet */
    UA_DataSetFieldConfig dataSetFieldConfig;
    memset(&dataSetFieldConfig, 0, sizeof(UA_DataSetFieldConfig));
    dataSetFieldConfig.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;

    dataSetFieldConfig.field.variable.fieldNameAlias = UA_STRING("motor RPM");
    dataSetFieldConfig.field.variable.promotedField = UA_FALSE;
    dataSetFieldConfig.field.variable.publishParameters.publishedVariable = 
            UA_NODEID_STRING(1, "MotorRPM");
    dataSetFieldConfig.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE;
    UA_Server_addDataSetField(server, publishedDataSetIdent, &dataSetFieldConfig, NULL);
}

Add writer group

The WriterGroup (WG) is part of the connection and contains the primary configuration parameters for the message creation.

The writerGroupConfig.publishingInterval indicates the rate at which the data will be published to the broker.


static UA_StatusCode
addWriterGroup(UA_Server *server, char *topic, int interval) {
    UA_StatusCode retval = UA_STATUSCODE_GOOD;
    /* Now we create a new WriterGroupConfig and add the group to the existing PubSubConnection. */
    UA_WriterGroupConfig writerGroupConfig;
    memset(&writerGroupConfig, 0, sizeof(UA_WriterGroupConfig));
    writerGroupConfig.name = UA_STRING("Demo WriterGroup");
    writerGroupConfig.publishingInterval = interval;
    writerGroupConfig.enabled = UA_FALSE;
    writerGroupConfig.writerGroupId = 100;

#ifndef UA_ENABLE_JSON_ENCODING
    UA_UadpWriterGroupMessageDataType *writerGroupMessage;
#endif

    /* decide whether to use JSON or UADP encoding*/
#ifdef UA_ENABLE_JSON_ENCODING
    UA_JsonWriterGroupMessageDataType *Json_writerGroupMessage;

    writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_JSON;
    writerGroupConfig.messageSettings.encoding             = UA_EXTENSIONOBJECT_DECODED;

    writerGroupConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_JSONWRITERGROUPMESSAGEDATATYPE];
    /* The configuration flags for the messages are encapsulated inside the
     * message- and transport settings extension objects. These extension
     * objects are defined by the standard. e.g.
     * UadpWriterGroupMessageDataType */
    Json_writerGroupMessage = UA_JsonWriterGroupMessageDataType_new();
    /* Change message settings of writerGroup to send PublisherId,
     * DataSetMessageHeader, SingleDataSetMessage and DataSetClassId in PayloadHeader
     * of NetworkMessage */
    Json_writerGroupMessage->networkMessageContentMask =
        (UA_JsonNetworkMessageContentMask)(UA_JSONNETWORKMESSAGECONTENTMASK_NETWORKMESSAGEHEADER |
        (UA_JsonNetworkMessageContentMask)UA_JSONNETWORKMESSAGECONTENTMASK_DATASETMESSAGEHEADER |
        (UA_JsonNetworkMessageContentMask)UA_JSONNETWORKMESSAGECONTENTMASK_SINGLEDATASETMESSAGE |
        (UA_JsonNetworkMessageContentMask)UA_JSONNETWORKMESSAGECONTENTMASK_PUBLISHERID |
        (UA_JsonNetworkMessageContentMask)UA_JSONNETWORKMESSAGECONTENTMASK_DATASETCLASSID);
    writerGroupConfig.messageSettings.content.decoded.data = Json_writerGroupMessage;
#else

    writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
    writerGroupConfig.messageSettings.encoding             = UA_EXTENSIONOBJECT_DECODED;
    writerGroupConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
    /* The configuration flags for the messages are encapsulated inside the
     * message- and transport settings extension objects. These extension
     * objects are defined by the standard. e.g.
     * UadpWriterGroupMessageDataType */
    writerGroupMessage  = UA_UadpWriterGroupMessageDataType_new();
    /* Change message settings of writerGroup to send PublisherId,
     * WriterGroupId in GroupHeader and DataSetWriterId in PayloadHeader
     * of NetworkMessage */
    writerGroupMessage->networkMessageContentMask =
        (UA_UadpNetworkMessageContentMask)(UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID |
        (UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER |
        (UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID |
        (UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER);
    writerGroupConfig.messageSettings.content.decoded.data = writerGroupMessage;
#endif

    /* configure the mqtt publish topic */
    UA_BrokerWriterGroupTransportDataType brokerTransportSettings;
    memset(&brokerTransportSettings, 0, sizeof(UA_BrokerWriterGroupTransportDataType));
    /* Assign the Topic at which MQTT publish should happen */
    /*ToDo: Pass the topic as argument from the writer group */
    brokerTransportSettings.queueName = UA_STRING(topic);
    brokerTransportSettings.resourceUri = UA_STRING_NULL;
    brokerTransportSettings.authenticationProfileUri = UA_STRING_NULL;

    /* Choose the QOS Level for MQTT */
    brokerTransportSettings.requestedDeliveryGuarantee = UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT;

    /* Encapsulate config in transportSettings */
    UA_ExtensionObject transportSettings;
    memset(&transportSettings, 0, sizeof(UA_ExtensionObject));
    transportSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
    transportSettings.content.decoded.type = &UA_TYPES[UA_TYPES_BROKERWRITERGROUPTRANSPORTDATATYPE];
    transportSettings.content.decoded.data = &brokerTransportSettings;

    writerGroupConfig.transportSettings = transportSettings;
    retval = UA_Server_addWriterGroup(server, connectionIdent, &writerGroupConfig, &writerGroupIdent);

    if (retval == UA_STATUSCODE_GOOD)
        UA_Server_setWriterGroupOperational(server, writerGroupIdent);

#ifdef UA_ENABLE_JSON_ENCODING
    UA_JsonWriterGroupMessageDataType_delete(Json_writerGroupMessage);
#endif

#ifndef UA_ENABLE_JSON_ENCODING
    UA_UadpWriterGroupMessageDataType_delete(writerGroupMessage);
#endif

    return retval;
}

Add dataset writer

The DataSetWriters are the glue between the WriterGroups and the PublishedDataSets. The DataSetWriter contains configuration parameters and flags that influence DataSet messages’ creation.

The queueName parameters contains the MQTT topic we want to publish to customTopic.


static void
addDataSetWriter(UA_Server *server, char *topic) {
    /* We need now a DataSetWriter within the WriterGroup. This means we must
     * create a new DataSetWriterConfig and add call the addWriterGroup function. */
    UA_NodeId dataSetWriterIdent;
    UA_DataSetWriterConfig dataSetWriterConfig;
    memset(&dataSetWriterConfig, 0, sizeof(UA_DataSetWriterConfig));
    dataSetWriterConfig.name = UA_STRING("Demo DataSetWriter");
    dataSetWriterConfig.dataSetWriterId = 62541;
    dataSetWriterConfig.keyFrameCount = 10;

#ifdef UA_ENABLE_JSON_ENCODING
    UA_JsonDataSetWriterMessageDataType jsonDswMd;
    UA_ExtensionObject messageSettings;
    /* JSON config for the dataSetWriter */
    jsonDswMd.dataSetMessageContentMask = (UA_JsonDataSetMessageContentMask)
        (UA_JSONDATASETMESSAGECONTENTMASK_DATASETWRITERID |
         UA_JSONDATASETMESSAGECONTENTMASK_SEQUENCENUMBER |
         UA_JSONDATASETMESSAGECONTENTMASK_STATUS |
         UA_JSONDATASETMESSAGECONTENTMASK_METADATAVERSION |
         UA_JSONDATASETMESSAGECONTENTMASK_TIMESTAMP);

    messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
    messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_JSONDATASETWRITERMESSAGEDATATYPE];
    messageSettings.content.decoded.data = &jsonDswMd;

    dataSetWriterConfig.messageSettings = messageSettings;
#endif
    /*TODO: Modify MQTT send to add DataSetWriters broker transport settings */
    /*TODO: Pass the topic as argument from the writer group */
    /*TODO: Publish Metadata to metaDataQueueName */
    /* configure the mqtt publish topic */
    UA_BrokerDataSetWriterTransportDataType brokerTransportSettings;
    memset(&brokerTransportSettings, 0, sizeof(UA_BrokerDataSetWriterTransportDataType));

    /* Assign the Topic at which MQTT publish should happen */
    brokerTransportSettings.queueName = UA_STRING(topic);
    brokerTransportSettings.resourceUri = UA_STRING_NULL;
    brokerTransportSettings.authenticationProfileUri = UA_STRING_NULL;
    brokerTransportSettings.metaDataQueueName = UA_STRING("MetaDataTopic");
    brokerTransportSettings.metaDataUpdateTime = 0;

    /* Choose the QOS Level for MQTT */
    brokerTransportSettings.requestedDeliveryGuarantee = UA_BROKERTRANSPORTQUALITYOFSERVICE_BESTEFFORT;

    /* Encapsulate config in transportSettings */
    UA_ExtensionObject transportSettings;
    memset(&transportSettings, 0, sizeof(UA_ExtensionObject));
    transportSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
    transportSettings.content.decoded.type = &UA_TYPES[UA_TYPES_BROKERDATASETWRITERTRANSPORTDATATYPE];
    transportSettings.content.decoded.data = &brokerTransportSettings;

    dataSetWriterConfig.transportSettings = transportSettings;
    UA_Server_addDataSetWriter(server, writerGroupIdent, publishedDataSetIdent,
                               &dataSetWriterConfig, &dataSetWriterIdent);
}

Starting the server along with MQTT Publisher

We can now compile the Publisher application. cd build
make -j$(nproc) pubsub_mqtt
Before starting the application let us install the mosquito MQTT broker. sudo apt-get install mosquitto mosquitto-client
./bin/examples/pubsub_mqtt

Subscribing to the OPC UA PubSub topic

We can now verify that a MQTT client can subscribe to the published data. mosquitto_sub -t "customTopic"
Subscribing to the OPCUA PubSub topic

Next Step

We have demonstrated how to add MQTT PubSub support to an open62541 server. Also, we have shown how to publish the simulated data of a motorRPM variable to an MQTT broker by using a standardized OPCUA JSON payload. The above-captured demo can be leveraged to transfer data to the cloud in the future. If you are interested in doing the same using NodeOPCUA, follow this link: Node-OPCUA goes PubSub – Episode 1.