The official Rust binding for Eclipse Cyclone DDS.
Install the C library and headers to your system path or use the bundled-sources
via the vendored feature. Then add the package to your Cargo.toml via:
[dependencies]
# This expects you to have an installation of the Cyclone DDS C library available
# which allows you to tailor the underlying C library as you wish.
#
# NOTE: you can also point to Cyclone DDS using the `CYCLONEDDS_HOME` variable if
# performing a full installation is undesirable.
eclipse-cyclonedds = "0.0.4"
# Using the `vendored` feature will result in us compiling and linking in a version
# of Cyclone DDS for you.
eclipse-cyclonedds = { version = "0.0.4", features = ["vendored"] }Then see the example and the docs to get started.
The Data Distribution Service (DDS) is a publish-subscribe middleware standard for real-time, data-centric communication. It is used in a variety of mission critical applications in domains such as aerospace, defense, autonomous systems (e.g. vehicles, robotics), industrial control, smart energy grids, transportation, simulation, and medical devices.
Participants
within a specific
Domain
discover each other automatically via the DDSI/RTPS discovery protocol. Once two
endpoints sharing the same topic name, type information, and compatible
Quality of Service (QoS)
discover each other, the middleware establishes a connection between them.
Publishers
and
Subscribers
allow you to group
Writers
and
Readers
respectively to allow you to set their collective behavior. These
Writers
and
Readers
exchange typed samples via
Topics.
DOMAIN
│
┌──────────────────┴──────────────────┐
│ │
PARTICIPANT PARTICIPANT
│ T ≡ struct Position {x, y} │
┌────┴────┐ ┌────┴────┐
│ │ │ │
PUBLISHER TOPIC<T> ═══════════════════ TOPIC<T> SUBSCRIBER
│ ║ ║ │
│ "Position" "Position" │
│ ║ ║ │
WRITER<T> ═══╝ ╚═══ READER<T>
╰───────── matched via Topic<T> ─────────╯
Node 01 Node 02
───────── ─────────
Data delivery characteristics, such as how samples are buffered, retransmitted,
and received, are controlled via
Quality of Service,
a collection of
QoS policies
that configure characteristics such as:
durability(whether late-joining readers receive historical samples)reliability(best-effort vs reliable delivery)history depth(the number of samples to store in history)deadline(whether a signal should be generated when a sample is not received within a specified period) Policies are set independently on the writer and reader side, and compatibility is checked at discovery time. A writer's offeredQoSmust be compatible with a reader's requestedQoSfor the two endpoints to match.
There are a variety of other elements to the DDS API such as:
WaitSets: to allow you to block until a particular status occurs on a DDS entity.Listeners: to notify applications of a change in the status of a particular entity.GuardConditions,StatusConditions,ReadConditions, andQueryConditions: Mechanisms to trigger the condition associated with a waitset.
See the DDS Specification and the OMG DDS Wiki for these other elements and see the Rust Documentation for what is supported by this API.
use cyclonedds::QoS;
use cyclonedds::qos::policy;
use cyclonedds::sample::View;
use cyclonedds::{Domain, Duration, Participant, Reader, Topic, Writer, Topicable};
#[derive(Topicable, serde::Serialize, serde::Deserialize, Clone, Default, Debug)]
struct Sensor {
#[dds(key)]
id: u32,
temperature: f32,
}
fn main() -> cyclonedds::Result<()> {
let domain = Domain::default();
let participant = Participant::new(&domain)?;
let topic = Topic::<Sensor>::new(&participant, "Sensor")?;
let qos = QoS::new()
.with_durability(policy::Durability::TransientLocal)
.with_history(policy::History::KeepLast { depth: 10 })
.with_reliability(policy::Reliability::Reliable {
max_blocking_time: Duration::from_millis(100),
});
let reader = Reader::builder(&topic).with_qos(&qos).build()?;
let writer = Writer::builder(&topic).with_qos(&qos).build()?;
writer.write(&Sensor {
id: 1,
temperature: 21.5,
})?;
writer.write(&Sensor {
id: 2,
temperature: 35.5,
})?;
// Get available samples from the reader history (draining them from the
// reader).
//
// NOTE: use reader.read() to leave them in place for future reads.
for sample in reader.take()? {
// `sample` is a SampleOrKey<Sensor> which allows you to:
// - distinguish between the sample and key-only cases
// - access fields directly (with non-key fields default-initialized
// for key-only samples)
// - access sample info via `.info()`
// You can precisely match on the result of `.view()` to handle
// either case explicitly. This replaces checking the sample info for the
// `valid_data` field (which does not exist in the Rust API).
match sample.view() {
View::Sample(sample) if sample.temperature > 30.0 => {
println!("sample[{}] is hot: {}°C", sample.id, sample.temperature)
}
View::Sample(sample) => {
println!("sample[{}] is cool: {}°C", sample.id, sample.temperature)
}
View::Key(key) => println!(
"received a key-only sample due to an unregister or dispose: {key:?}"
),
}
// Alternatively, you can directly access the fields with key-only samples
// having their non-key fields default-initialized via
// `Topicable::from_key`.
if sample.temperature > 30.0 {
println!("sample[{}] is hot: {}°C", sample.id, sample.temperature);
}
// Access the sample info.
println!(
"sample[{}] was produced at: {:?}",
sample.id,
sample.info().source_timestamp
);
}
Ok(())
}A Writer and Reader only exchange samples after discovery finds matching
topic names, compatible type information, and compatible QoS policies. If
samples are not arriving, check the effective QoS on both endpoints before
assuming the network or serialization layer is at fault.
QoS compatibility follows an offer/request model: the writer offers a QoS
and the reader requests one. Compatibility is asymmetric. A writer offering
Reliable delivery is compatible with a reader requesting BestEffort, but not
the inverse. The same asymmetry applies to Durability: a writer offering
TransientLocal is compatible with a reader requesting Volatile, but a reader
requesting TransientLocal will not match a writer offering only Volatile.
The default durability does not make DDS behave like a retained-message
broker. A reader that joins after a writer has already published will not
receive historical samples unless both sides are configured with compatible
durability (TransientLocal or stronger) and sufficient history depth.
Volatile durability, the default, discards samples the moment no matched
reader exists to receive them.
Reliable delivery asks the middleware to retransmit missing samples, but does
not provide infinite buffering. The default history policy is KeepLast with a
depth of 1, so only the most recent sample is retained per writer instance.
History depth, resource limits, and slow readers all constrain how much data the
middleware retains. A writer paired with a slow reliable reader may block or
drop samples once its send queue is exhausted, depending on the
max_blocking_time and resource limit settings in effect.
Participants on different domain IDs are completely isolated. Discovery will not cross domain boundaries, so two nodes that are otherwise correctly configured will be invisible to each other if their domain IDs differ.
Partitions introduce a second matching layer on top of topic name and QoS. A
writer and reader on the same topic will not exchange samples unless they share
at least one partition string. Note that the empty string "" is a valid and
distinct partition.
Liveliness configuration determines when the middleware considers a writer
dead. If the lease duration is shorter than the writer's actual publish rate,
the writer may appear to die and recover under load, causing spurious
matched/unmatched transitions on the reader side.
read and take have different cache semantics. read leaves matching samples
in the reader cache, so subsequent reads with overlapping state masks may return
the same samples again. take removes them, making those samples unavailable to
any later call. Use take for queue-like consumption and read when you need
to inspect the current state without draining the sample.
Cyclone DDS also includes a peek call which reads without updating any sample
or view state, so repeated peeks always see the same samples regardless of state
masks. Use it for non-destructive inspection when state transitions are
undesirable.
For now, the MSRV is the latest stable Rust version at the time of release.