Skip to content

Create DDS Transport Protocol#1174

Open
Kaweees wants to merge 19 commits intodevfrom
miguel/dds_transport
Open

Create DDS Transport Protocol#1174
Kaweees wants to merge 19 commits intodevfrom
miguel/dds_transport

Conversation

@Kaweees
Copy link
Member

@Kaweees Kaweees commented Feb 3, 2026

This merge request implements DDS (Data Distribution Service) transport layer using CycloneDDS, providing a new high-performance pub/sub transport option.

Quick Start

Note

We currently use Eclipse Cyclone DDS as our DDS implementation. Its IdlStruct feature lets you define DDS topic types in pure Python, eliminating the need for separate IDL files, with automatic serialization support.

from dataclasses import dataclass
from cyclonedds.idl import IdlStruct
from dimos.protocol.pubsub.ddspubsub import DDS, Topic

@dataclass
class SensorReading(IdlStruct):
    value: float

dds = DDS()
dds.start()

received = []
sensor_topic = Topic(name="sensors/temperature", data_type=SensorReading)

dds.subscribe(sensor_topic, lambda msg, t: received.append(msg))
dds.publish(sensor_topic, SensorReading(value=22.5))

import time
time.sleep(0.1)

print(f"Received: {received}")
dds.stop()
Received: [SensorReading(value=22.5)]

Unit Tests/Benchmarks

uv run pytest -svm tool dimos/protocol/pubsub/benchmark/test_benchmark.py --override-ini="addopts=" -k "DDS"
uv run pytest -svm tool dimos/protocol/pubsub/benchmark/test_benchmark.py
image

This builds off of #1144, which was closed due to having too many merge conflicts with dev and #1036, which was closed because the branch was renamed to miguel/dds_transport

@Kaweees Kaweees self-assigned this Feb 3, 2026
@greptile-apps
Copy link

greptile-apps bot commented Feb 3, 2026

Greptile Overview

Greptile Summary

This PR adds DDS (Data Distribution Service) transport support using Eclipse CycloneDDS, providing a high-performance pub/sub transport option for network communication.

Key Changes:

  • Implements DDSTransport class with thread-safe initialization and lifecycle management
  • Adds DDS pub/sub implementation with proper listener pattern for callbacks
  • Implements DDSService with singleton DomainParticipant management per domain ID
  • Includes benchmark test cases comparing DDS with high-throughput and reliable QoS presets
  • Adds comprehensive documentation and installation instructions
  • Makes DDS an optional dependency via the [dds] extra

Implementation Details:

  • Uses RLock for thread-safe start/stop operations in DDSTransport
  • Properly implements the Transport.subscribe interface contract by returning unsubscribe functions
  • Handles optional CycloneDDS imports gracefully with feature flags
  • Follows existing patterns from LCM and ROS transports
  • Includes error handling in message listener callbacks

Confidence Score: 5/5

  • This PR is safe to merge - implementation follows established patterns and includes proper thread safety
  • The code is well-structured, follows existing transport patterns, includes comprehensive error handling, and properly manages resources. All previous review comments have been addressed.
  • No files require special attention

Important Files Changed

Filename Overview
dimos/core/transport.py Adds DDSTransport class with thread-safe start/stop and proper subscribe return type
dimos/protocol/pubsub/impl/ddspubsub.py New DDS pub/sub implementation with proper listener pattern and error handling
dimos/protocol/service/ddsservice.py New DDS service with singleton domain participant management per domain ID
dimos/protocol/pubsub/benchmark/testdata.py Adds DDS benchmark test cases with high-throughput and reliable QoS presets
docs/concepts/transports.md Adds DDS transport documentation with usage example
docs/development/README.md Adds DDS installation instructions for Ubuntu/Debian systems

Sequence Diagram

sequenceDiagram
    participant User
    participant DDSTransport
    participant DDS
    participant DDSService
    participant DomainParticipant
    participant DataWriter
    participant DataReader
    participant Listener

    User->>DDSTransport: __init__(topic, type, **kwargs)
    DDSTransport->>DDS: __init__(**kwargs)
    DDSTransport->>DDSTransport: Create RLock

    User->>DDSTransport: publish(message)
    DDSTransport->>DDSTransport: Check _started (with lock)
    alt Not started
        DDSTransport->>DDS: start()
        DDS->>DDSService: start()
        DDSService->>DDSService: Check _participants dict
        alt Domain not in _participants
            DDSService->>DomainParticipant: Create participant(domain_id)
            DDSService->>DDSService: Store in _participants[domain_id]
        end
    end
    DDSTransport->>DDS: publish(topic, message)
    DDS->>DDS: _get_writer(topic)
    alt Writer doesn't exist
        DDS->>DDSTopic: Create DDSTopic
        DDS->>DataWriter: Create DataWriter(participant, topic, qos)
        DDS->>DDS: Store in _writers[topic]
    end
    DDS->>DataWriter: write(message)

    User->>DDSTransport: subscribe(callback)
    DDSTransport->>DDSTransport: Check _started (with lock)
    alt Not started
        DDSTransport->>DDS: start()
    end
    DDSTransport->>DDS: subscribe(topic, callback)
    DDS->>DDS: _get_listener(topic)
    alt Listener doesn't exist
        DDS->>DDSTopic: Create DDSTopic
        DDS->>Listener: Create _DDSMessageListener(topic)
        DDS->>DataReader: Create DataReader(participant, topic, qos, listener)
        DDS->>DDS: Store reader and listener
    end
    DDS->>Listener: add_callback(callback)
    DDS-->>DDSTransport: Return unsubscribe lambda
    DDSTransport-->>User: Return unsubscribe function

    DataReader->>Listener: on_data_available(reader)
    Listener->>DataReader: take()
    loop For each sample
        loop For each callback
            Listener->>User: callback(sample, topic)
        end
    end

    User->>DDSTransport: stop()
    DDSTransport->>DDS: stop()
    DDS->>DDS: Clear _readers, _listeners, _writers
    DDS->>DDSService: stop()
    Note over DDSService,DomainParticipant: DomainParticipant persists<br/>for potential reuse
Loading

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4 files reviewed, 5 comments

Edit Code Review Agent Settings | Greptile

@Kaweees
Copy link
Member Author

Kaweees commented Feb 3, 2026

@greptile

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 files reviewed, 4 comments

Edit Code Review Agent Settings | Greptile

@paul-nechifor
Copy link
Contributor

When installing I get this:

uv sync --all-extras
warning: The `extra-build-dependencies` option is experimental and may change without warning. Pass `--preview-features extra-build-dependencies` to disable this warning.
Resolved 452 packages in 0.95ms
  × Failed to build `cyclonedds==0.10.5`
  ├─▶ The build backend returned an error
  ╰─▶ Call to `setuptools.build_meta.build_wheel` failed (exit status: 1)

      [stdout]
      Could not locate cyclonedds. Try to set CYCLONEDDS_HOME or CMAKE_PREFIX_PATH

      [stderr]
      /home/p/.cache/uv/builds-v0/.tmp5R206l/lib/python3.12/site-packages/wheel/bdist_wheel.py:4: FutureWarning: The 'wheel' package is no longer the canonical location of the
      'bdist_wheel' command, and will be removed in a future release. Please update to setuptools v70.1 or later which contains an integrated version of this command.
        warn(

      hint: This usually indicates a problem with the package or the build environment.
  help: `cyclonedds` (v0.10.5) was included because `dimos` (v0.0.9) depends on `cyclonedds`

I think you need to add to the docs how to build it. I've tried sudo apt install cyclonedds-dev, but it doesn't look like it's enough.

@Kaweees
Copy link
Member Author

Kaweees commented Feb 4, 2026

When installing I get this:

uv sync --all-extras
warning: The `extra-build-dependencies` option is experimental and may change without warning. Pass `--preview-features extra-build-dependencies` to disable this warning.
Resolved 452 packages in 0.95ms
  × Failed to build `cyclonedds==0.10.5`
  ├─▶ The build backend returned an error
  ╰─▶ Call to `setuptools.build_meta.build_wheel` failed (exit status: 1)

      [stdout]
      Could not locate cyclonedds. Try to set CYCLONEDDS_HOME or CMAKE_PREFIX_PATH

      [stderr]
      /home/p/.cache/uv/builds-v0/.tmp5R206l/lib/python3.12/site-packages/wheel/bdist_wheel.py:4: FutureWarning: The 'wheel' package is no longer the canonical location of the
      'bdist_wheel' command, and will be removed in a future release. Please update to setuptools v70.1 or later which contains an integrated version of this command.
        warn(

      hint: This usually indicates a problem with the package or the build environment.
  help: `cyclonedds` (v0.10.5) was included because `dimos` (v0.0.9) depends on `cyclonedds`

I think you need to add to the docs how to build it. I've tried sudo apt install cyclonedds-dev, but it doesn't look like it's enough.

This makes sense, because I use Nix to run this. You need to build build CycloneDDS from source otherwise. Where should I add this functionality?

@paul-nechifor
Copy link
Contributor

paul-nechifor commented Feb 4, 2026

This makes sense, because I use Nix to run this. You need to build build CycloneDDS from source otherwise. Where should I add this functionality?

Since "cyclonedds>=0.10.5" is in the root dependencies it should be in README.md and docs/development/README.md.

But, should it be in the root dependencies? That is, I don't think everyone needs it. Currently, running uv sync requires we configure cyclonedds first.

@leshy
Copy link
Contributor

leshy commented Feb 4, 2026

This makes sense, because I use Nix to run this. You need to build build CycloneDDS from source otherwise. Where should I add this functionality?

ugh.. are we sure we can't just install cyclonedds on ubuntu? or ros-ubuntu?
I think assume system level is sorted out for your feature yes, this is good to merge imo and can check this in a follow up
but yeah as paul said, remove from root deps..

this will be annoying because
uv sync --all-extras
will break and tests/imports will crash tho? we don't have a policy yet for this, Paul what do you think?

@Kaweees
Copy link
Member Author

Kaweees commented Feb 4, 2026

Got it, moved to a extra group so now it is installed via
uv sync --extra dds

@paul-nechifor
Copy link
Contributor

@Kaweees @leshy

Now the build is failing because core imports actually depend on the cyclonedds. I'm not sure what the solution is here. Options are:

  1. Keep the package in root deps, but then we have to add to prolong the README because everyone will need cyclone dds to run dimos.

  2. Make sure the package is truly optional. I'm not sure how much work this is.


Separately, these are the instructions for how I got it running on Ubuntu (technically I'm running Linux Mint, but it's based on Ubuntu 24.04). You probably want to add this to the docs.

### Optional: DDS Transport Support

The `dds` extra provides DDS (Data Distribution Service) transport support via Eclipse Cyclone DDS. This requires installing system libraries before the Python package can be built.

**Ubuntu/Debian:**

```bash
# Install the CycloneDDS development library
sudo apt install cyclonedds-dev

# Create a compatibility directory structure
# (required because Ubuntu's multiarch layout doesn't match the expected CMake layout)
sudo mkdir -p /opt/cyclonedds/{lib,bin,include}
sudo ln -sf /usr/lib/x86_64-linux-gnu/libddsc.so* /opt/cyclonedds/lib/
sudo ln -sf /usr/lib/x86_64-linux-gnu/libcycloneddsidl.so* /opt/cyclonedds/lib/
sudo ln -sf /usr/bin/idlc /opt/cyclonedds/bin/
sudo ln -sf /usr/bin/ddsperf /opt/cyclonedds/bin/
sudo ln -sf /usr/include/dds /opt/cyclonedds/include/

# Install with the dds extra
CYCLONEDDS_HOME=/opt/cyclonedds uv pip install -e '.[dds]'
```

To install all extras including DDS:

```bash
CYCLONEDDS_HOME=/opt/cyclonedds uv sync --all-extras
```

@paul-nechifor
Copy link
Contributor

@Kaweees What is Deploy_SimToReal_RL_Go2 ? Seems unrelated.

@paul-nechifor
Copy link
Contributor

My results for reference (very similar):

2026-02-05_01-29

@Kaweees Kaweees force-pushed the miguel/dds_transport branch from fd7fd0c to 0246eae Compare February 4, 2026 23:52
@Kaweees
Copy link
Member Author

Kaweees commented Feb 4, 2026

@Kaweees What is Deploy_SimToReal_RL_Go2 ? Seems unrelated.

I've been trying to figure out how we would deploy locomotion polcies on the Go2 and G1 in preparation of AGI Bot. That was not supposed to be commit, just rebased it out

@Kaweees
Copy link
Member Author

Kaweees commented Feb 5, 2026

  1. Make sure the package is truly optional. I'm not sure how much work this is.

Perhaps in all cases where DDS is imported, we just try except as a hacky way to keep it optional.

@paul-nechifor
Copy link
Contributor

Perhaps in all cases where DDS is imported, we just try except as a hacky way to keep it optional.

👍

@spomichter
Copy link
Contributor

@Kaweees @leshy

Now the build is failing because core imports actually depend on the cyclonedds. I'm not sure what the solution is here. Options are:

  1. Keep the package in root deps, but then we have to add to prolong the README because everyone will need cyclone dds to run dimos.
  2. Make sure the package is truly optional. I'm not sure how much work this is.

Separately, these are the instructions for how I got it running on Ubuntu (technically I'm running Linux Mint, but it's based on Ubuntu 24.04). You probably want to add this to the docs.

### Optional: DDS Transport Support

The `dds` extra provides DDS (Data Distribution Service) transport support via Eclipse Cyclone DDS. This requires installing system libraries before the Python package can be built.

**Ubuntu/Debian:**

```bash
# Install the CycloneDDS development library
sudo apt install cyclonedds-dev

# Create a compatibility directory structure
# (required because Ubuntu's multiarch layout doesn't match the expected CMake layout)
sudo mkdir -p /opt/cyclonedds/{lib,bin,include}
sudo ln -sf /usr/lib/x86_64-linux-gnu/libddsc.so* /opt/cyclonedds/lib/
sudo ln -sf /usr/lib/x86_64-linux-gnu/libcycloneddsidl.so* /opt/cyclonedds/lib/
sudo ln -sf /usr/bin/idlc /opt/cyclonedds/bin/
sudo ln -sf /usr/bin/ddsperf /opt/cyclonedds/bin/
sudo ln -sf /usr/include/dds /opt/cyclonedds/include/

# Install with the dds extra
CYCLONEDDS_HOME=/opt/cyclonedds uv pip install -e '.[dds]'

To install all extras including DDS:

CYCLONEDDS_HOME=/opt/cyclonedds uv sync --all-extras

Yeah definitely can't have DDS in core deps so good removal there. Current failing tests should be fixed by just catching all of the cycloneDDS imports with try/catch importerror.

@Kaweees
Copy link
Member Author

Kaweees commented Feb 6, 2026

@spomichter @paul-nechifor Docs have been updated and DDS has been made an optional transport.

@Kaweees
Copy link
Member Author

Kaweees commented Feb 6, 2026

@greptile

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

6 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

paul-nechifor
paul-nechifor previously approved these changes Feb 6, 2026
@Kaweees
Copy link
Member Author

Kaweees commented Feb 6, 2026

It works on my machine, I think its because the tests aren't installing the dds extra

uv sync --extra dds

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants