Skip to the main content.

Did you know?

 

RTI is the world’s largest DDS supplier and Connext is the most trusted software framework for critical systems.

Success-Plan-Services-DSSuccess-Plan Services

Our Professional Services and Customer Success teams bring extensive experience to train, problem-solve, mentor, and accelerate customer success.

Learn more

Developers

From downloads to Hello World, we've got you covered. Find all of the tutorials, documentation, peer conversations and inspiration you need to get started using Connext today.

Try the Connectivity Selection Tool ⇢

Resources

RTI provides a broad range of technical and high-level resources designed to assist in understanding industry applications, the RTI Connext product line and its underlying data-centric technology.

Company

RTI is the infrastructure software company for smart-world systems. The company’s RTI Connext product is the world's leading software framework for intelligent distributed systems.

Contact Us

News & Events
Cooperation

4 min read

Telegraf Plugin for Connext DDS: Build a Time-Series Monitoring System with DDS and InfluxDB

Telegraf Plugin for Connext DDS: Build a Time-Series Monitoring System with DDS and InfluxDB

A few months ago, I wrote a blog about monitoring your IIoT systems. The blog mainly covered the overview of a monitoring architecture with Connext DDS and a time-series database. This blog is a continuation of that discussion, with new details of a key element of the architecture. 

To demonstrate the monitoring architecture, InfluxDB from InfluxData was used as a time-series database for monitoring. In doing this work, I found that a Telegraf input plugin for Connext DDS is a critical missing element in realizing the architecture. So I developed a prototype of the input plugin for Connext DDS. Today, I’m pleased to share it with you as the latest project of RTI Labs – a free program that provides customers with early access to new RTI technology including software projects, downloads and tools.

In this blog, I will share the internals and examples of the Telegraf plugin. But first, let’s review the basics of Telegraf.

What is Telegraf?

Telegraf, part of InfluxData’s time-series platform, is an agent for collecting, processing, aggregating and writing metrics. It supports a plugin system so developers can easily add a plugin for metrics. It includes four distinct plugin types:

  • Input Plugins collect metrics from the system, services or 3rd party APIs.
  • Processor Plugins transform, enrich and/or filter metrics.
  • Aggregator Plugins create aggregate metrics (e.g. mean, min, max, quantiles, etc.).
  • Output Plugins write metrics to various destinations.

What are the main components of Telegraf?

Telegraf uses the InfluxDB’s data model for metrics as an internal representation. The data model contains four main components:

  • Measurement name is a namespace for metrics.
  • Tags are key/value pairs to identify metrics.
  • Fields are key/value pairs for metric data.
  • Timestamp is the date and time associated with fields.

Telegraf supports several input data formats to parse metrics in different formats to its internal representation. After metrics are parsed, they exist in memory and will be converted to a concrete representation to be transmitted via an output plugin. For that, it supports several serialization formats as well. Currently, the input plugin for Connext DDS only works with the JSON input data format.

Telegraf's configuration file is written using TOML, and it includes configurations of agents (e.g., data collection interval and flushing interval) and plugins. Detailed descriptions can be found here. For the plugin for Connext DDS, it requires an additional configuration file for XML App Creation. It does include configurations of types, topics and DDS entities used by the Telegraf input plugin.

rti-diagram-databus-telegraf

How was the Telegraf input plugin for Connext DDS developed?

As mentioned above, Telegraf provides a plugin system that can allow developers to add plugins for metrics. To develop an input plugin, it requires implementing the following functions defined in the interface for input plugin.

If an input plugin is a service plugin, it requires implementing the following functions defined in the interface for input service plugin. A service plugin differs from a regular plugin in that it operates a background service while Telegraf is running.

The input plugin for Connext DDS is a service plugin because it pushes metrics to Telegraf when it receives DDS data instead of pulling metrics at a configured interval. Therefore, it runs a background thread that checks arrival of DDS data via WaitSets. If you are interested in how it was implemented, please check this out here.

Demonstrations with example configurations

The nicest benefit of working with Telegaf is that it provides a bunch of out-of-the-box plugins. Once you have the input plugin for Connext DDS, you can easily pick up existing plugins to apply them to incoming any DDS data. I will go through example commands and configurations for the input plugin for Connext DDS and other plugins. You can find the example configuration files that I used here at GitHub.

A default config file can be generated by telegraf.

$ telegraf config > telegraf.conf

To generate a file with specific inputs and outputs, you can use the “--input-filter” and “--output-filter” flags. The following command will generate a default config file with the input plugin for Connext DDS and the output plugin for file.

$ telegraf --input-filter dds_consumer --output-filter file config > dds_to_file.conf

After then, you can run Telegraf with the generated config file.

$ telegraf -config dds_to_file.conf

The default configuration of the input plugin for Connext DDS is like the following:

[[inputs.dds_consumer]]
## XML configuration file path
config_path = "example_configs/ShapeExample.xml"

## Configuration name for DDS Participant from a description in XML
participant_config = "MyParticipantLibrary::Zero"

## Configuration name for DDS DataReader from a description in XML
reader_config = "MySubscriber::MySquareReader"

## Tag key is an array of keys that should be added as tags.
tag_keys = ["color"]

## Override the base name of the measurement
name_override = "shapes"

## Data format to consume.
data_format = "json"

The default configuration will use an XML configuration file for Connext DDS located at “example_configs/ShapeExample.xml” and create a participant (MyParticipantLibrary::Zero) and a reader (MySubscriber::MySquareReader) defined in the configuration file. Currently, it only works with JSON format so the “data_format” config should be set to “json”.

The reader defined in XML subscribes to a topic named “Square” and uses the type for RTI Shapes demo. Because the type of Shapes demo uses the color attribute as a key, the color attribute is added as a tag. As a default, the measurement name becomes the name of a service input name (dds_consumer for the plugin). If you want to use a different measurement name, you can set it with the “name_override” config.

To test this configuration, you can run RTI Shapes demo and publish a Square topic data. After publishing a Square topic data, you can see received DDS metrics at “/tmp/metrics.out”, which is the default path for the output plugin for file.

If you want to send the same DDS data to a different output plugin like InfluxDB, you can simply run the following command to generate a config file for InfluxDB: 

$ telegraf --input-filter dds_consumer --output-filter file config > dds_to_influx.conf

$ telegraf -config dds_to_influx.conf

There are several aggregating and processing plugins provided by Telegraf. You can add the following lines in your Telegraf configuration file to aggregate your metrics with basic statistics (e.g., min, max, mean, stdev). It will aggregate metrics with basic statistics every 10 seconds.

[[aggregators.basicstats]]
## The period on which to flush & clear the aggregator.
period = "10s"
## If true, the original metric will be dropped by the aggregator and will not get sent to the output plugins.
drop_original = false

You can rename your fields by using a processor plugin. This example swaps “x” and “y” coordinates of shapes.

[[processors.rename]]
[[processors.rename.replace]]
field = "x"
dest = "y_trans"
[[processors.rename.replace]]
field = "y"
dest = "x_trans"

These are some of the interesting features and internals of Telegraf and the plugin for Connext DDS. I hope you will find them useful to build a time-series monitoring system with DDS and InfluxDB.

Read the official announcement about the first DDS plugin for the Telegraf agent here.

While I introduced some specific plugins as examples, there are lots of other plugins that can work with your DDS systems. To play with the plugin for Connext DDS, please check out the following GitHub. https://github.com/rticommunity/telegraf

Please let me know how you like the latest deliverable from the RTI Labs. Visit the RTI Community forum or create an issue directly at GitHub if you have any comments, questions or issues.