It has been almost an year and a half since my last post on reactive stream processing with RTI Connext DDS. A lot has happened since then and I've a lot of interesting updates to share with you.
For starters, here's a quick reason why reactive programming and specifically, stream processing makes a lot of sense in Industrial Internet Systems. As you probably know the Industrial Internet of Things (IoT) has really taken off, and billions and billions of new devices will be connected to the Internet in this decade. Many of them will belong to national critical infrastructure (e.g., smart power grids, smart roads, smart hospitals, smart cities). Most of the data generated by these devices cannot and will not be sent to the cloud due to bandwidth, latency, and exorbitant costs needed to do so. Therefore, much of the data must be correlated, merged, filtered, and analyzed at the edge so that only summary and exceptional data is sent to the cloud. This is Edge Computing (a.k.a. Fog).
Rx4DDS is an elegant solution to this problem that is productive, composable, concurrency-friendly, and scales well. Rx4DDS is a collection of Reactive Extensions (Rx) adapters for RTI Connext DDS. Rx is a collection of open-source libraries for functional-style composable asynchronous data processing. It is available in a number of flavors including RxJava, RxJS, RxCpp, Rx.Net, RxScala, RxPy and many more. The official site of all things Rx is ReactiveX.io.
The original Rx4DDS work started in C#. Since then, Rx4DDS has been implemented in C++ and JavaScript (Node.js). Recently, I presented Rx4DDS and 3 demos at CppCon'15. The session recording, all demo videos, and the slides are now available. And yeah, there's a research paper too.
Connext DDS Modern C++ API in Rx4DDS
The Rx4DDS C++ adapter uses the modern C++ API for DDS (DDS-C++PSM) at its core. DDS-C++PSM has it's own reactive flavor to it. For instance, DDS ReadCondition accepts a lambda to process data. User's thread of control automatically invokes the lambda provided earlier. As a result, the dispatch logic that programmers have to write is greatly simplified (often just one line). See an example here.
Rx4DDS takes DDS reactive programming to a whole new level. The best way to learn what it can do for you is watching a demonstration and understanding the corresponding code.
So, here is one of the demos that shows transformation of keyed data streams using GroupBy and Map.
The demo includes multiple parallel stateful data pipelines. The transformation pipeline is replicated for each key (i.e., shape color). The resulting topics ("Circle" and "Triangle") follow the lifecycle of the original instance in the "Square" topic. I.e., when a new square arrives, a new circle and triangle orbit around it. Similarly, when a square is disposed, circle and triangle are disposed with it.
Lets look at some code.
auto circle_writer = ...
auto triangle_writer = ...
int circle_degree = 0;
int tri_degree = 0;
using KeyedShapeObservable =
rx::grouped_observable<string, LoanedSample>;
rx4dds::TopicSubscription
topic_sub(participant, "Square", waitset, worker);
auto grouped_stream =
topic_sub.create_observable()
>> rx4dds::group_by_instance ([](ShapeType & shape) {
return shape.color();
});
auto subscription =
grouped_stream
.flat_map([circle_writer, triangle_writer]
(KeyedShapeObservable go)
{
rx::observable inner_transformed =
go >> rx4dds::to_unkeyed()
>> rx4dds::complete_on_dispose()
>> rx4dds::error_on_no_alive_writers()
>> filter([](LoanedSample s) {
return s.info().valid();
})
>> map([](LoanedSample valid) {
return valid.data();
})
>> map([circle_degree](ShapeType & square) mutable {
circle_degree = (circle_degree + 3) % 360;
return shape_location(square, circle_degree);
})
>> rx4dds::publish_over_dds(circle_writer, ShapeType(go.key())
>> map([tri_degree](ShapeType & circle) mutable {
tri_degree = (tri_degree + 9) % 360;
return shape_location(circle, tri_degree);
})
>> rx4dds::publish_over_dds(triangle_writer, ShapeType(go.key());
return inner_transformed;
}).subscribe();
Diving Deep
There's a lot going on in here. But the first thing you might have noticed is that there's not a single loop. You don't need them here. Rx4DDS lets you write more declarative code. Think Linux command-line pipes and i/o redirection.
The TopicSubscription
object is the entry point. As you might have suspected, it hides the underlying DDS DataReader. The create_observable
function creates an observable from the TopicSubscription
. This observable is a stream of ShapeType
objects. Note that whether TopicSubscription
uses a waitset or a listener to retrieve DDS data is up to the user. Just use the right constructor. In either case, create_observable
returns an observable. The rest of the program structure is identical in either case. This is important.
"Rx4DDS decouples data retrieval from consumption."
The group_by_instance
is an Rx4DDS "operator" that shards the incoming ShapeType
objects across "inner" keyed observables (KeyedShapeObservable
) which are bound to a specific key. It uses the the viewstate and instance handle information in SampleInfo
for sharding. It also accepts a lambda to retrieve the key for each sample.
The grouped_stream
object is a stream-of-streams--a natural mapping for keyed topics. After all, a keyed topic contains many instances and each instance has its own lifecycle as in Create-->Update-->Dispose. This lifecycle of instances maps naturally to observables as they have the same lifecycle expressed in terms of OnNext*[OnError|OnCompleted]. I.e., OnNext
may be called zero or more times and one of OnError and OnCompleted
if/when the observable ends.
We call flat_map
on the grouped_stream
. The flat_map
operator has many uses. First, when there's a stream-of-streams, it flattens it out to just a stream. It does that by simply merging the individual streams together. Of course, sharding an incoming stream and merging each sharded streams back won't be useful at all. That's equivalent to a no-op. So flat_map
lets you map each incoming stream to some other stream. Hence there's a lambda that returns an inner_transformed
observable, which is a transformation of every instance-specific observable (go).
So what's the transformation? It's just concatenation of operators that simply describe what they do. First, we drop the "keyed" aspect and get a regular key-less ShapeType
observable. We'll use the key shortly.
The complete_on_dispose
and error_on_no_alive_writers
operators constitute the data-centric magic sauce. When an instance is disposed, the complete_on_dispose
operator recognizes it and propagates an OnCompleted
event through the instance-specific observable. Likewise, when an instance runs into some sort of error (e.g., no alive writers), the error_on_no_alive_writers
operator recognizes it and propagates an OnError
event through the instance-specific observable.
"Data-centric interpretation of the data and SampleInfo is wired into the Rx4DDS programming model."
The best part is that it is quite extensible. If not-alive-writers does not mean an "error" in your application, you can either not use it or write your own "interpretations" of SampleInfo
as you please. The error_on_no_alive_writers
operator is very simple and gives you idea of what's possible.
The rest of the data pipeline is quite straight-forward. filter eliminates invalid samples and each valid sample is mapped to its data right after filter. At this point the source observable (go) is transformed to a pure observable<ShapeType>
. What comes next is just business-logic.
Each ShapeType
objects that is a location of the square is transformed to a circle position using basic geometry (not shown). Each circle position is also a ShapeType
. It is then published over DDS via the circle_writer
. Similar transformation occurs for triangles.
The publish_over_dds
is an interesting operator. It's one more ingredient that goes in the making of data-centric magic sauce. Note that it accepts a ShapeType
object created from the key. When OnComplete
or OnError
is propagated, publish_over_dds
disposes the instance selected by the key. For instance, When a blue Square is disposed, the complete_on_dispose
operator recognizes it and propagates an OnCompleted
event. In response to OnCompleted
the the publish_over_dds
operator disposes both blue circle and blue triangle.
In summary,
"The finalization actions are baked into each data pipeline at the time of creation."
This is declarative programming as it completely hides explicit control flow necessary to do cleanup. This is one of the main strengths of Rx4DDS.
Finally, the flat_map
operator simply merges the transformed (inner) streams together. The result of merge is ignored as there's no subsequent user-defined stage. The final subscribe
function call sets the whole machinery going by creating the only user-visible subscription that the user-code has to manage.
The CppCon'15 presentation touches upon several concepts discussed here and more.
DDS and Rx Side-by-Side
I hope you are starting to see that Rx and DDS are a great match. More compatibilities reveal themselves as you start looking deeper. Here's how I think Rx and DDS are related architecturally.
|
DDS |
Rx |
Design Methodology |
Data-Centric |
Reactive, Compositional |
Architecture |
Distributed Publish-subscribe |
In-memory Observable-Observer. Monadic |
Anonymity/loose coupling |
Publisher does not know about subscribers |
Observable does not know about observers |
Data Streaming |
A Topic<T> is a stream of data samples of type T |
An Observable<T> is a stream of object of type T |
Stream lifecycle |
Instances (keys) have lifecycle (New->Alive->Disposed) |
Observables have lifecycle OnNext*[OnCompleted|OnError] |
Data Publication |
Publisher may publish without any subscriber |
"Hot" Observables |
Data Reception |
Subscriber may read the same data over and over |
"Cold" Observables |
Rx4DDS allows you to map many DDS concepts to declarative Rx programming model. Some of my favorite mappings are listed below. This table is based on the C# implementation of Rx4DDS.
DDS Concept |
Rx Concept/Type/Operator |
Topic of type T |
An object that implements IObservable<T>, which internally creates a DataReader<T> |
Communication status, Discovery event streams |
IObservable<SampleLostStatus> IObservable<SubscriptionBuiltinTopicData> IObservable<PublicationBuiltinTopicData> |
Topic of type T with key type=Key |
IObservable<IGroupedObservable<Key, T>> |
Detect a new instance |
Notify Observers about a new IGroupedObservable<Key, T> with key==instance. Notification using IObserver<IGroupedObservable<Key, T>>.OnNext() |
Dispose an instance |
Notify Observers through IObserver<IGroupedObservable<Key,T>>.OnCompleted |
Take an instance update of type T |
Notify Observers about a new value of T using Iobserver<T>.OnNext() |
Read with history=N |
IObservable<T>.Replay(N) |
Query Conditions |
Iobservable<T>.Where(…) OR Iobservable<T>.GroupBy(…) |
SELECT in CFT expression |
IObservable<T>.Select(...) |
FROM in CFT expression |
DDSObservable.FromTopic("Topic1") DDSObservable.FromKeyedTopic("Topic2") |
WHERE in CFT expression |
IObservable<T>.Where(...) |
ORDER BY in CFT expression |
IObservable<T>.OrderBy(...) |
MultiTopic (INNER JOIN) |
IObservable<T>.Join().Where().Select() |
Joins between DDS and non-DDS data |
Join, CombineLatest, Zip |
Phew! This blogpost is already quite longer than expected. Yet, there's so much more to talk about. I would encourage you look at the other demonstration videos available here.
If JavaScript is your thing, you might also want to find out how we used DDS, RxJS, and Node.js Connector to implement Log Analytics and Anomaly Detection in Remote DataCenters. Check out these slides and a research paper.
Finally, we welcome your comments on this blog & we would be interested in discussing how this technology may fit your needs.