Bridging Domains: Using the RTI Routing Service Adapter to Connect MSQL Databases
Written by Tim McGuire
March 2, 2020
Large Industrial Internet of Things (IIoT) systems are composed of multiple connectivity technologies. Given that these systems often incorporate new and legacy components, it is not realistic to assume that one connectivity standard can be applied to an entire IIoT system. Gateways are necessary to bridge different connectivity technologies in order for them to co-exist and communicate as part of a larger system. They are common in a layered databus architecture, as referenced in the IIC Industrial Internet Reference Architecture guide. The RTI Routing Service is the enabler of the layered databus. It also provides gateway capabilities to different technologies.
Figure 1: Layered databus reference architecture using gateways
The RTI Routing Service is necessary to allow the Data Distribution Service™ (DDS) architectural framework to communicate with non-DDS subsytems. This blog details how to write a Routing Service Adapter to read from and write to a MSQL database. It also defines a strategy for debugging a Routing Service Adapter.
Figure 2: RTI Routing Service Adapter to MSQL Database
In the following example, the Routing Service reads and writes from a database. However, it could easily be modified to interface with a file, socket or something else. Also, a storage plugin for the Recording Service could have been used instead, which would have provided the same flexibility (“provide your own schema”) and the option to store discovery data as well.
The following environment was used with this example.
- Windows 10 (x64)
- Connext DDS Professional 6.0.1 with target x64Win64VS2017. Note: the target must have support for RTI Routing Service and C++11
- CMake version 3.12 or higher
- Microsoft Visual Studio 2017 or 2019. C++11 compiler is necessary
- Microsoft SQL Server Express 2017
- Microsoft SQL Server Management Studio (SSMS) - database administration
The following information was helpful in building this adapter.
- RTI Routing Service Users Manual
- C++ RTI Routing Service Adapter API
- DynamicType and DynamicData Use Cases
- C++11 Routing Service File Adapter
- The code for this example can be found in GitHub at routing_service_msql_adapter
This example assumes a database called Shapes has been created, along with a login that has access to the database. In our example, we used a login name (rti_shapes_rs), selected SQL Server authentication and entered a password (abc123).
Create Shapes Database Table
Create a table in the Shapes database called Shapes_table to store the shapes data. Design the table so it has the following columns.
Figure 3: Database table to store Shapes data
- Right click on id and select Set Primary Key.
- Select the id Column. In the Column Properties under Identity Specification, set Is Identity to Yes. This will ensure that every entry has a unique database key value and therefore has a mechanism to store all published DDS samples.
Routing Service Adapter
- This example includes two Routing Service configurations.
- RS_DDS_To_MSQL - Reads DDS Shapes on Domain 0 and writes non-DDS data to a Microsoft MSQL Database. It has one DDS input and three non-DDS outputs.
- RS_MSQL_To_DDS - Reads records from the Microsoft MSQL Database and publishes DDS data on Domain 0. It has three non-DDS inputs and one DDS output.
Debugging the RTI Routing Service Adapter
Connext DDS Professional 6.0.1 ships with the release version of the Routing Service executable, which requires the user to build a release version of the dynamic library (DLL) adapter. Mixing release and debug versions can have adverse effects. This means the dynamic library adapter code cannot be debugged directly. There are two strategies for debugging adapter code when building a dynamic library:
- Build a Debug executable that instantiates a routing service. This executable references the same user code that the dynamic library uses and links with debug libraries, including the debug Routing Service library. Once you are confident the application works correctly, you can easily switch to building a Release dynamic library for the adapter code.
- Request the Connext DDS 6.0.1 Debug Services package for your target, which includes the debug Routing Service executable. This will allow you to start a debug Routing Service executable and load the debug version of your dynamic library adapter code.
Routing Service Configuration XML Files
In this example, there are two directories: rs_dll and rs_main. The XML files in rs_dll are used to dynamically load a Routing Service Adapter dynamic library from the Routing Service. The dynamic library XML requires a <plugin_library> tag that specifies the name of the library and the create function.
<!- - Configures a custom, adapter-based connection - - >
<connection name="msql_connection" plugin_name="MsqlDataPlugins: :MsqlAdapter">
The XML files in rs_main are used to instantiate a Routing Service from a user-defined application. The XML file's plugin_name "myMsqlAdapter" is referenced in the C++ code. There is no <plugin_library> tag.
<!- - Configures a custom, adapter-based connection - - >
<connection name="msql_connection" plugin_name="myMsqlAdapter">
Figure 5: XML for Routing Service instantiated from an executable
The C++ code to instantiate the Routing Service is below. The attach_plugin method transfers ownership of the adapter plugin to the Routing Service. Do not delete the adapter plugin pointer. The Routing Service is responsible for this action.
RsAdapterPlugin *myRsAdapterPlugin = new RsAdapterPlugin(properties);
Figure 6: C++ code for executable
Each of the two directories, rs_dll and rs_main, contain two XML configuration files. The *_w_types.xml files have the types defined in the XML file. This is useful if the types cannot be discovered dynamically. The *_wo_types.xml files do not have the types defined in the XML file and are discovered dynamically.
Whenever there is a non-DDS input (i.e., MSQL_To_DDS), set the configuration’s input creation_mode to ON_DOMAIN_MATCH. This mode enables the RsInputDiscoveryStreamReader to signal both the presence of the stream (which triggers creation) and disposal of the stream (which triggers deletion).
In the following example, I provide sample code that is purposely generic, for illustrative purposes. For example, it is only the implementation of the connector which needs to know the DynamicData members. This should make it easier to create new connectors or to change the type in the connector.
rs_main/rsmain.cxx - Contains the main() function for the executable that instantiates the Routing Service.
- Declares and defines the Adapter Plugin Create Function by specifying the class as the macro parameter.
// declare entry point class
// definition of entry point class
Figure 7: Plugin macros define the entry point class
- create_connection() - Called during Routing Service startup when the DomainRoute is created, it creates the connection passing in input and output DiscoveryStreamReaders if necessary.
- constructor - Uses the StreamReaderListener parameter to create the inputDiscoveryStreamReader.
- create_stream_writer() - This method is called for each Route's output when the associated 'creation mode' condition is met. The operation is called on the Connection as specified through the 'connection' attribute in the <output> tag. The stream name is defined in the <stream_name> XML tag for each output. This is passed to the RsStreamWriter constructor.
- create_stream_reader() - This method is called for each Route's input when the associated 'creation mode' condition is met. The operation is called on the Connection as specified through the 'connection' attribute in the <input> tag. The stream name is defined in the <stream_name> XML tag for each input. This is passed to the RsStreamReader constructor.
- input_stream_discovery_reader() - Returns the inputDiscoveryStreamReader created in the constructor.
- dispose_discovery_stream() - Signals the disposing of a stream to the Routing Service which triggers deletion.
RsStreamWriter - There is one Stream Writer for each <output> tag.
- constructor - Creates the output connector. Since the example has three Stream Writers, there will be three output connectors.
- write() - Writes the Shapes data to the output connector and returns the number of samples written.
RsStreamReader - There is one Stream Writer for each <input> tag.
- constructor - Creates the input connector. Since the example has three Stream Readers, there will be three input connectors. It sets a sampling period of 50ms and keeps a reference to the RsConnection that created it. It also keeps a reference to the listener parameter that is used to notify the RsStreamReader that data is available.
- destructor - Signals the readDataThread to shut down.
- readDataThread() - Reads data by calling the readData() method. If the number of records read is greater than zero, "on_data_available(this)" is called to notify the RsStreamReader data is available to be read.
- take() - This is called when the RsStreamReader data is available to be read. It populates the sample to be published in the DDS domain. The take() method with the selector parameter only calls this take() method, and therefore the selector is ignored.
- return_loan() - Returns a loan on the read or taken data samples and info samples. RTI Routing Service calls this method to indicate that it is done accessing the collection of data samples and info samples obtained by an earlier invocation of any variation of take().
RsInputDiscoveryStreamReader - A special kind of StreamReader that provides discovery information about the available streams and their types.
- constructor - Sets a reference to the InputStreamDiscoveryListener parameter. Creates discovery information about the Square, Circle and Triangle streams and then notifies the InputStreamDiscoveryListener that discovery data about the streams are available.
- take() - Takes a collection of all available StreamInfo samples to be used by the Routing Service.
- return-loan() - When RTI Routing Service is done with the StreamInfo samples, it will return them to the DiscoveryStreamReader by calling this method.
- dispose() - Creates a new StreamInfo sample and sets disposed to true. It then notifies the InputStreamDiscoveryListener that data is available, which indicates the stream is no longer available.
ConnectorBase - Abstract base class that defines default methods and methods that must be implemented by derived classes. It also stores the topic name. To create a connector implementation, the derived class must minimally implement writeData() and readData().
- connect() - Default implementation. Sets the connected_ attribute to true and returns true.
- disconnect() - Default implementation. Sets the connected_ attribute to false.
- connected() – Default implementation that returns true.
- writeData() – Pure virtual method that uses DynamicData for the sample parameter. This makes the method generic and easier to implement in a derived class for any type.
- readData() – Pure virtual method uses a reference to DynamicData for the sample parameter. This makes the method generic and easier to implement in a derived class for any type.
ConnectorMsqlDb - This is a MSQL database implementation of a ConnectorBase.
- constructor - Forms the connection string from the properties parameter and then connects to the MSQL database.
- destructor - Disconnects from the MSQL database.
- connect() – Connects to the database. Returns true is successful and sets connected _attribute to true.
- disconnect() – Disconnects from the database and sets connected_ attribute to false.
- connected() – Returns whether the connector is connected.
- writeData() – Converts DynamicData sample to Shapes data and writes the record to the database. Returns true on success, otherwise false.
- readData() - Reads one database record at a time, where the topic name is either Square, Circle or Triangle. Converts it to a DynamicData sample and returns the number of records read.
- setConnectionInfo() - Sets the properties found in the XML to connect to the database.
- msqlShowError() - Outputs error information when SQL tasks fail.
Running the example
DDS to MSQL Database
Follow the instructions with the source code to build and run either the rs_dll or the rs_main example. Use the Shapes demo to publish Squares, Circles and Triangles on Domain 0 and start the Routing Service with the RS_DDS_To_MSQL configuration. The database table should be populated similar to below.
Figure 8: Shape data written to MSQL database table from Routing Service Adapter
MSQL Database to DDS
Now that the database table has entries, from Shapes demo subscribe to Squares, Circles and Triangles on Domain 0 - you can start the routing service with the RS_MSQL_To_DDS configuration. You should see the shapes appear in the Shapes demo.
- This example writes to and reads from one database table. Because of this, the table needs a column for the topic name; ConnectorBase stores this name. The example could easily be modified to use three tables - Square, Circle and Triangle. The topic name column would no longer be needed in the tables and ConnectorBase would not have to store the topicName. Each Connector would interact with one of these tables. This means the SQL command to read/write the database records would change slightly.
- The RstreamReader reads one sample at a time. It could be modified to read n samples and populate the samples vector with the data. This would be more efficient than reading one at a time.
- The RstreamReader reads data with a sampling period of 50 milliseconds. This value is hardcoded and could be easily passed in as a property in the XML configuration file.
- The example could have used the session period instead of creating a RsStreamReader custom thread that periodically reads data from the database.
- The username and password appear in the XML to demonstrate how to pass properties to the application. However, this type of information should probably not be in an XML file for security purposes.
- If a write to the database fails, it logs an error. It does not retry writing the data or resetting the connection.
- If a read from the database fails, an error is logged and the number of records returned is 0. This causes the RsStreamReader to shut down its reading thread.
The RTI Routing Service is used by many RTI customers. Whether it is bridging DDS domains, bridging DDS domains to non-DDS domains or allowing non-DDS devices to participate in DDS environments through the use of adapters, the Routing Service has become instrumental in IIoT systems. This example above highlighted the use of an adapter and a strategy for debugging adapters in general.
However, the Routing Service can do much more. It can not only convert between DDS and non-DDS data, but it can also transform, split, aggregate and filter the data. The user has complete control of the subset of data that is forwarded to other domains. The Routing Service also can ensure that only entities that need to discover each other do, by bridging different domains. This layered databus architecture has the effect of reducing discovery and user data traffic on the network, which allows the IIoT system to scale massively.
About the author
Tim McGuire is a Senior Field Applications Engineer at RTI. He holds a Bachelor of Computer Science at Carleton University in Ottawa, Canada. Before joining RTI, Tim worked in software development at Zeligsoft, where he designed, developed and managed projects for modeling and code generation tools for the Software Communications Architecture (SCA) domain, as well as tools and middleware for Data Distribution Service (DDS). Prior to transitioning to development, Tim was a Field Applications Engineer responsible for supporting products, training, consulting and collaborating with sales and marketing to identify and secure new business opportunities. Tim brings over 25 years of experience in telecommunications, software design and development, consulting and product support.