Working with Publish/Subscribe¶
Work in progress: This Tutorial will be continuously extended during the next PubSub batches. More details about the PubSub extension and corresponding open62541 API are located here: PubSub.
Publishing Fields¶
The PubSub publish example demonstrates the simplest way to publish information from the information model over UDP multicast using the UADP encoding.
Connection handling
PubSubConnections can be created and deleted on runtime. More details about
the system preconfiguration and connection can be found in
tutorial_pubsub_connection.c
.
#include <open62541/plugin/log_stdout.h>
#include <open62541/server.h>
#include <open62541/server_pubsub.h>
#include <stdio.h>
#include <stdlib.h>
UA_NodeId connectionIdent, publishedDataSetIdent, writerGroupIdent;
static void
addPubSubConnection(UA_Server *server, UA_String *transportProfile,
UA_NetworkAddressUrlDataType *networkAddressUrl){
/* Details about the connection configuration and handling are located
* in the pubsub connection tutorial */
UA_PubSubConnectionConfig connectionConfig;
memset(&connectionConfig, 0, sizeof(connectionConfig));
connectionConfig.name = UA_STRING("UADP Connection 1");
connectionConfig.transportProfileUri = *transportProfile;
connectionConfig.enabled = UA_TRUE;
UA_Variant_setScalar(&connectionConfig.address, networkAddressUrl,
&UA_TYPES[UA_TYPES_NETWORKADDRESSURLDATATYPE]);
/* Changed to static publisherId from random generation to identify
* the publisher on Subscriber side */
connectionConfig.publisherId.idType = UA_PUBLISHERIDTYPE_UINT16;
connectionConfig.publisherId.id.uint16 = 2234;
UA_Server_addPubSubConnection(server, &connectionConfig, &connectionIdent);
}
PublishedDataSet handling
The PublishedDataSet (PDS) and PubSubConnection are the toplevel entities and can exist alone. The PDS contains the collection of the published fields. All other PubSub elements are directly or indirectly linked with the PDS or connection.
static void
addPublishedDataSet(UA_Server *server) {
/* The PublishedDataSetConfig contains all necessary public
* information for the creation of a new PublishedDataSet */
UA_PublishedDataSetConfig publishedDataSetConfig;
memset(&publishedDataSetConfig, 0, sizeof(UA_PublishedDataSetConfig));
publishedDataSetConfig.publishedDataSetType = UA_PUBSUB_DATASET_PUBLISHEDITEMS;
publishedDataSetConfig.name = UA_STRING("Demo PDS");
/* Create new PublishedDataSet based on the PublishedDataSetConfig. */
UA_Server_addPublishedDataSet(server, &publishedDataSetConfig, &publishedDataSetIdent);
}
DataSetField handling
The DataSetField (DSF) is part of the PDS and describes exactly one published field.
static void
addDataSetField(UA_Server *server) {
/* Add a field to the previous created PublishedDataSet */
UA_NodeId dataSetFieldIdent;
UA_DataSetFieldConfig dataSetFieldConfig;
memset(&dataSetFieldConfig, 0, sizeof(UA_DataSetFieldConfig));
dataSetFieldConfig.dataSetFieldType = UA_PUBSUB_DATASETFIELD_VARIABLE;
dataSetFieldConfig.field.variable.fieldNameAlias = UA_STRING("Server localtime");
dataSetFieldConfig.field.variable.promotedField = UA_FALSE;
dataSetFieldConfig.field.variable.publishParameters.publishedVariable =
UA_NODEID_NUMERIC(0, UA_NS0ID_SERVER_SERVERSTATUS_CURRENTTIME);
dataSetFieldConfig.field.variable.publishParameters.attributeId = UA_ATTRIBUTEID_VALUE;
UA_Server_addDataSetField(server, publishedDataSetIdent,
&dataSetFieldConfig, &dataSetFieldIdent);
}
WriterGroup handling
The WriterGroup (WG) is part of the connection and contains the primary configuration parameters for the message creation.
static void
addWriterGroup(UA_Server *server) {
/* Now we create a new WriterGroupConfig and add the group to the existing
* PubSubConnection. */
UA_WriterGroupConfig writerGroupConfig;
memset(&writerGroupConfig, 0, sizeof(UA_WriterGroupConfig));
writerGroupConfig.name = UA_STRING("Demo WriterGroup");
writerGroupConfig.publishingInterval = 100;
writerGroupConfig.enabled = UA_FALSE;
writerGroupConfig.writerGroupId = 100;
writerGroupConfig.encodingMimeType = UA_PUBSUB_ENCODING_UADP;
writerGroupConfig.messageSettings.encoding = UA_EXTENSIONOBJECT_DECODED;
writerGroupConfig.messageSettings.content.decoded.type = &UA_TYPES[UA_TYPES_UADPWRITERGROUPMESSAGEDATATYPE];
/* The configuration flags for the messages are encapsulated inside the
* message- and transport settings extension objects. These extension
* objects are defined by the standard. e.g.
* UadpWriterGroupMessageDataType */
UA_UadpWriterGroupMessageDataType *writerGroupMessage = UA_UadpWriterGroupMessageDataType_new();
/* Change message settings of writerGroup to send PublisherId,
* WriterGroupId in GroupHeader and DataSetWriterId in PayloadHeader
* of NetworkMessage */
writerGroupMessage->networkMessageContentMask = (UA_UadpNetworkMessageContentMask)(UA_UADPNETWORKMESSAGECONTENTMASK_PUBLISHERID |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_GROUPHEADER |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_WRITERGROUPID |
(UA_UadpNetworkMessageContentMask)UA_UADPNETWORKMESSAGECONTENTMASK_PAYLOADHEADER);
writerGroupConfig.messageSettings.content.decoded.data = writerGroupMessage;
UA_Server_addWriterGroup(server, connectionIdent, &writerGroupConfig, &writerGroupIdent);
UA_Server_enableWriterGroup(server, writerGroupIdent);
UA_UadpWriterGroupMessageDataType_delete(writerGroupMessage);
}
DataSetWriter handling
A DataSetWriter (DSW) is the glue between the WG and the PDS. The DSW is linked to exactly one PDS and contains additional information for the message generation.
static void
addDataSetWriter(UA_Server *server) {
/* We need now a DataSetWriter within the WriterGroup. This means we must
* create a new DataSetWriterConfig and add call the addWriterGroup function. */
UA_NodeId dataSetWriterIdent;
UA_DataSetWriterConfig dataSetWriterConfig;
memset(&dataSetWriterConfig, 0, sizeof(UA_DataSetWriterConfig));
dataSetWriterConfig.name = UA_STRING("Demo DataSetWriter");
dataSetWriterConfig.dataSetWriterId = 62541;
dataSetWriterConfig.keyFrameCount = 10;
UA_Server_addDataSetWriter(server, writerGroupIdent, publishedDataSetIdent,
&dataSetWriterConfig, &dataSetWriterIdent);
}
That’s it! You’re now publishing the selected fields. Open a packet inspection tool of trust e.g. wireshark and take a look on the outgoing packages. The following graphic figures out the packages created by this tutorial.
The open62541 subscriber API will be released later. If you want to process
the the datagrams, take a look on the ua_network_pubsub_networkmessage.c
which already contains the decoding code for UADP messages.
It follows the main server code, making use of the above definitions.
static int run(UA_String *transportProfile,
UA_NetworkAddressUrlDataType *networkAddressUrl) {
UA_Server *server = UA_Server_new();
addPubSubConnection(server, transportProfile, networkAddressUrl);
addPublishedDataSet(server);
addDataSetField(server);
addWriterGroup(server);
addDataSetWriter(server);
UA_StatusCode retval = UA_Server_runUntilInterrupt(server);
UA_Server_delete(server);
return retval == UA_STATUSCODE_GOOD ? EXIT_SUCCESS : EXIT_FAILURE;
}
static void
usage(char *progname) {
printf("usage: %s <uri> [device]\n", progname);
}
int main(int argc, char **argv) {
UA_String transportProfile =
UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-udp-uadp");
UA_NetworkAddressUrlDataType networkAddressUrl =
{UA_STRING_NULL , UA_STRING("opc.udp://224.0.0.22:4840/")};
if (argc > 1) {
if (strcmp(argv[1], "-h") == 0) {
usage(argv[0]);
return EXIT_SUCCESS;
} else if (strncmp(argv[1], "opc.udp://", 10) == 0) {
networkAddressUrl.url = UA_STRING(argv[1]);
} else if (strncmp(argv[1], "opc.eth://", 10) == 0) {
transportProfile =
UA_STRING("http://opcfoundation.org/UA-Profile/Transport/pubsub-eth-uadp");
if (argc < 3) {
printf("Error: UADP/ETH needs an interface name\n");
return EXIT_FAILURE;
}
networkAddressUrl.url = UA_STRING(argv[1]);
} else {
printf("Error: unknown URI\n");
return EXIT_FAILURE;
}
}
if (argc > 2) {
networkAddressUrl.networkInterface = UA_STRING(argv[2]);
}
return run(&transportProfile, &networkAddressUrl);
}