Skip to the main content.

Did you know?

 

RTI is the world’s largest DDS supplier and Connext is the most trusted software framework for critical systems.

Success-Plan-Services-DSSuccess-Plan Services

Our Professional Services and Customer Success teams bring extensive experience to train, problem-solve, mentor, and accelerate customer success.

Learn more

Developers

From downloads to Hello World, we've got you covered. Find all of the tutorials, documentation, peer conversations and inspiration you need to get started using Connext today.

Try the Connectivity Selection Tool ⇢

Resources

RTI provides a broad range of technical and high-level resources designed to assist in understanding industry applications, the RTI Connext product line and its underlying data-centric technology.

Company

RTI is the infrastructure software company for smart-world systems. The company’s RTI Connext product is the world's leading software framework for intelligent distributed systems.

Contact Us

News & Events
Cooperation

5 min read

Implementing Simple Introspection with Connext DDS in C++14

Implementing Simple Introspection with Connext DDS in C++14

When I was first introduced to RTI Connext® DDS, it wasn't very long (after seeing the powerful tools) before I wanted to know how difficult it would be to implement domain introspection in its most basic form. Obviously tools, such as the Admin Console, are complex but that doesn’t mean that the basic principle on which they’re based – domain introspection – has to be. So I set about trying my hand at creating the simplest example of domain introspection that would have some demonstrable utility. This blog post covers my journey into this effort.

What is introspection?

Wikipedia defines "type introspection" as follows:

"In computing, type introspection is the ability of a program to examine the type or properties of an object at runtime. Some programming languages possess this capability."

This means that our utility will have to be able to examine the properties of an object (type) at runtime. What does this mean in the context of DDS? Let's consider what objects exist in a DDS domain. Primarily there are:

  • Topics
  • Types

In order to examine these objects we need to be able to get an inventory of topics, and for each of those topics, the associated type. In addition, once we have gathered these properties, we need to do something useful with them. For the purpose of this exploration, I decided that "something" would be the ability to build a matching dynamic type for a particular topic and then to subscribe to that single, user-selected topic, and display the data being published to that topic.

This led to the following rough sketch of required actions:

  • Obtain a list of all topics on a particular domain
  • Allow the user to select one of those topics
  • Build a dynamic type that matches the topic
  • Create a subscriber topic dynamically using the above dynamic type
  • Create a datareader for the topic
  • Attach an "on_data_available" callback to the datareader in order to receive samples from the topic
  • Display the received samples in a coherent format using the type information retrieved

Those were the marching orders.

At this point, before reading any further, I would recommend that you download, build and run (against the ubiquitous RTI DDS Shapes Demo) the utility from the source package here (this should build with minor tweaks to the makefile on Mac or Linux). 

Ok, so you have seen how the utility operates. Now let’s dive under the covers.

First, we are going to need a subscriber class to handle all of this. I decided on the name GenericSubscriber. GenericSubscriber has a very simple public interface:

class GenericSubscriber {
public:
   /**
    * Creates the DDS entities for the subscriber.
    * @param domain_id the domain ID.
    * @param verbose provide a higher level of output.
    */
   GenericSubscriber(int domain_id,
                     bool verbose);

   /**
    * Subscriber destructor
    */
   ~GenericSubscriber();

    /**
    * List the topics available in domain
    */
   void list_topics(void);

   /**
    * Start receiving the data.
    */
   void receive(const std::string& topicName);

private:
   class GenericSubscriberImpl;
   std::unique_ptr<GenericSubscriberImpl> impl;
};

Leaving aside command line argument and error handling, this is how GenericSubscriber would be used to satisfy step one – obtain a list of topics – from the sketch above:

        dds::domain::qos::DomainParticipantFactoryQos qos;
   qos << dds::core::policy::EntityFactory::ManuallyEnable();
   dds::domain::DomainParticipant::participant_factory_qos(qos);

   GenericSubscriber
subscriber(domain.Get(), verbose.Get());
subscriber.list_topics();

Wait, what are those first 3 lines for? This code is a key requirement of DDS introspection. Because DDS performs automatic discovery at startup, and because discovery is the phase where the exchange of topic and type information is performed, and because our requirements are that we must capture that information, we need to ensure that as GenericSubscriber instantiates its entities, they are created in "disabled" mode. This is so that GenericSubscriber has time to attach a "data_available" callback to one of the built-in topics before the participant entity begins the discovery process. This could have been buried in GenericSubscriber if it were to refrain from implicit construction of the participant entity, and instead instantiated the participant explicitly (using operator new) in the constructor. However, I felt that would have gone against the spirit of a class instantiation of having no observable global side effects.

It didn't take long for us to get into details about sequencing, so let’s look ahead a bit and take a look at a sequence diagram for the list_topics() operation.

listTopicsSD.svg

One of the things I like most about sequence diagrams is the ability to see which particular thread the operations are running on. This is particularly important in understanding what is going on here. As you can see from the diagram, the discovery operation doesn't happen instantly and the callback is invoked in the context of an internal DDS thread (shown in yellow). This is important to keep in mind for a DDS application in general, but it is particularly relevant here since we need to wait for the list of topics to be populated. In this case, a very crude (and undoubtedly generally incorrect) mechanism is employed to detect when the topic list has been populated, and that is to poll the topics vector until the size is non-zero. Given that the purpose of this exercise is to expose the mechanisms for introspection, this simple mechanism was deemed adequate.

In the sequence diagram above, this interlock occurs between the label "Locates built-in topic, adds listener to built-in reader" and "Calls wait_for_discovery". Please take a moment now to review the sequence diagram.

Switching to code, here are the constructor and the list_topics() functions:

GenericSubscriberImpl(int dom_id, bool verbose): m_participant(dom_id),

                                       m_ear(m_discovered_topics),
                                       m_builtin_subscriber(builtin_subscriber(m_participant)),
                                       m_verbose(verbose)
{
   // locate the built-in subscriber for the topic names
   find<DataReader<PublicationBuiltinTopicData>>(
                m_builtin_subscriber,
                dds::topic::publication_topic_name(),
                std::back_inserter(m_rdr_list));

   // attach listener to all readers
   for(auto& rdr: m_rdr_list) {
       rdr.listener(&m_ear, StatusMask::data_available());
   }

   m_participant.enable();
}

Here in the constructor, the steps are:

Locate all built-in datareaders of type "PublicationBuiltinTopicData" and iterate through that list, adding our listener (ear) for "data_available", then enable the participant.

// list all available topics we discovered on the domain
void list_topics(void)
{
   wait_for_discovery();   

   std::cout << "Number of topics discovered: " << m_discovered_topics.size()
             << std::endl;
   for(auto topic: m_discovered_topics) {
       std::cout << "Discovered topic: " << std::get<0>(topic) << std::endl;
   }
}

The actual list_topics function is trivial. Essentially, we employ the interlock we talked about previously to ensure the discovered_topics list is populated, and then print out the topic name for each topic. Job done for the first step in our requirements.

The next requirement to fulfill is the ability for a user to select a topic. This is accomplished easily by command-line processing and can be seen clearly in the code (introspect.cpp), making the next challenge to build a dynamic type based on what has been discovered about the topic.

For the next requirement, the logic in both the constructor and wait_for_discovery() is identical to what has already been discussed, so the bulk of the new work is to extract the type information for the topic that the user selects.

The key methods to fulfill this requirement are create_type_from_topic(), and get_topic_columns(). The create_type_from_topic() method, takes a xtypes:DynamicType and inspects it in order to extract the necessary information to build a matching type dynamically.

The first thing we need to do is decide on what data needs to be captured for each element (member/column) of the topic type. While a more complete implementation might need to capture more information in order to replicate any given type, for the purpose of this example, we only need to capture the following attributes:

  • Member (column) name
  • Member (column) type
  • Whether the member (column) is a key
  • Whether the member (column) is optional

This information is captured into a four tuple declared as:

std::tuple<TopicColumnName, TopicColumnType, TopicColumnKey, TopicColumnOptional>

A vector of these tuples is captured by the method get_topic_columns() called from create_type_from_topic().

The get_topic_columns() method first checks to ensure that the xtypes:DynamicType is an aggregate type, then checks to see whether it is an extensible or fixed type. If it is an extensible type, it processes the base type first, followed by the extended members.

Once get_topic_columns returns, the m_columns vector is populated and create_type_from_topic() then creates a local instance of xtypes::StructType, and iterates over the m_columns vector adding members to the local dynamic type instance using the data captured into the four tuple.

The majority of the work is now complete. All that remains is to take the dynamically-generated type and instantiate a GenericTopic and GenericReader (in the receive() method) and add a listener to the reader instance. The listener then iterates over the samples and the m_columns vector in order to output data received on the topic to stdout in a formatted manner (which is the last point in our initial requirements sketch).

Hopefully, you have found this an enlightening introduction into introspection in DDS. If you are using a language other than modern C++, or would like to see a more complete example, then please visit community.rti.com.

The following is a sample of the type of output you should have obtained when running the utility against the Shapes demo.

./introspect --t Square

Enabling subscribing on topic Square which is of type ShapeTypeExtended which is a kind of Aggregation

color     |           x |           y |   shapesize     |    fillKind     |       angle  |

   BLUE             30            112                30                 0            210
   BLUE             27            113                30                 0            216
   BLUE             24            114                30                 0            222
   BLUE             23            115                30                 0            228
   BLUE             26            116                30                 0            230
   BLUE             29            117                30                 0            240
   BLUE             32            118                30                 0            246