Skip to content

Conversation

@lionel-
Copy link
Contributor

@lionel- lionel- commented Feb 9, 2026

Branched from #1027 (DAP integration tests)
Progress towards #1033 (Make DAP protocol tests deterministic)
Progress towards #1026

This PR merges the comm manager into:

  • The IOPub thread, for outgoing comm_msg emission
  • The Shell thread, for incoming comm_msg dispatching and outgoing comm_open/comm_close emission

The main goal is for outgoing comm_msg and other IOPub messages to go through the same crossbeam channel. Previously, these took separate paths to the frontend:

  • IOPub messages: sender > iopub_tx > IOPub thread > ZMQ socket
  • Comm messages: sender > outgoing_tx on CommSocket > CommManager thread > iopub_tx > IOPub thread > ZMQ socket

The comm manager intermediary caused non-deterministic message ordering between comm and IOPub messages. When R code on the same thread emits a stream output followed by a comm message, or a comm message followed by an Idle boundary, the frontend could receive them in either order. After this PR, all comm outgoing messages go directly through iopub_tx, the same channel used by all other IOPub messages. Ordering is now deterministic: messages sent sequentially from the same thread arrive at the frontend in that order.

Motivation

While working on #1027 I observed non-deterministic ordering between busy/idle status messages (sent via IOPub) and stop_debug/start_debug comm messages (sent via the DAP comm through CommManager). The frontend handled this fine since these messages are consumed by different components, but it caused significant problems writing robust and deterministic protocol tests: tests could not make sequential assertions about message ordering without resorting to fuzzy accumulator-based matching that was fragile and hid bugs.

More generally, any code that depends on the relative ordering of comm messages and other IOPub messages has a potential race. With the unified path, ordering matches programmer expectations by construction, eliminating this class of issues.

The change also reduces overall complexity. The CommManager was a dedicated thread that existed solely to bridge comm channels to IOPub, using crossbeam::Select over a dynamic set of receivers and maintaining a lookup table for RPC headers. Removing it eliminates a thread and ~260 lines. The responsibilities it held are absorbed by Shell and IOPub, where they naturally belong: Shell already handles comm_open, comm_msg, and comm_close from the frontend, so it's the right place to manage the set of open comms. IOPub already serializes all messages to the frontend, so it's the right place to handle comm outgoing messages.

The direct payoff is the follow-up PR (#1033), which leverages the ordering guarantee to replace fuzzy message matching with strict sequential assertions.

Architecture changes

Delete CommManager

The CommManager thread is removed entirely (~260 lines). It previously:

  • Held the list of open comms
  • Used crossbeam::Select over a dynamic set of comm outgoing_rx channels
  • Maintained a pending_rpcs HashMap to match RPC replies to their original Jupyter headers
  • Relayed comm messages to IOPub

All of these responsibilities are absorbed by Shell and IOPub.

CommOutgoingTx wrapper

A new CommOutgoingTx type wraps Sender<IOPubMessage> with a comm_id. When a comm handler calls outgoing_tx.send(msg), it sends IOPubMessage::CommOutgoing(comm_id, msg) through the IOPub channel. This is the key mechanism allowing message determinism.

The outgoing_rx side of CommSocket is removed. Comm handlers no longer have a dedicated receive channel for outgoing messages since there's no CommManager polling them.

CommMsg::Rpc carries the parent header

Previously, when the frontend sent an RPC to a comm, the Shell thread would store the Jupyter header in CommManager's pending_rpcs map. When the comm replied, CommManager would look up the header to create a properly parented IOPub message.

Now CommMsg::Rpc is a named struct that carries the parent_header directly:

CommMsg::Rpc {
    id: String,
    parent_header: JupyterHeader,
    data: Value,
}

The header travels with the message through the comm handler and back, so a lookup table is no longer needed.

Shell manages open comms

Shell now holds open_comms: Vec<CommSocket> and handles comm lifecycle directly:

  • Frontend-initiated comms (comm_open, comm_msg, comm_close on the Shell socket) are handled in-place
  • Backend-initiated comms (from comm_manager_rx) arrive when R code opens a comm (data explorer, variables pane, connections, etc.)

The challenge is that Shell is already in a zmq_poll() loop waiting on its ZMQ socket, but backend comm events arrive on a crossbeam channel. You can't mix ZMQ poll() and crossbeam Select in a single wait. To solve this problem, I generalised the solution implemented for StdIn in #58

The bridge mechanism was cleaned up and generalised to allow management of several sockets/entities. As part of this work, the "notifier" thread is now called the "channel bridge" thread, and the "forwarding" thread is called the "socket bridge" thread.

IOPub processes CommOutgoing

A new IOPubMessage::CommOutgoing(String, CommMsg) variant is handled in the IOPub thread. It flushes any buffered stream output before forwarding the comm message, preserving the ordering from the sender's perspective. This unconditional flush on every comm message is the key to the ordering guarantee. For the expected message rates it has negligible overhead (stream flushing is a no-op when the buffer is empty).

What's mechanical

About 70% of the diff is mechanical plumbing:

  • Threading iopub_tx through CommSocket::new() and every start() function that creates comms (data explorer, variables, connections, reticulate, plots, help)
  • Adapting tests to receive comm responses from an IOPub channel instead of outgoing_rx
  • Updating CommMsg::Rpc(id, data) to CommMsg::Rpc { id, parent_header, data } at all call sites

The core logic changes are in:

  • crates/amalthea/src/kernel.rs (channel bridge thread, Forwarder)
  • crates/amalthea/src/socket/shell.rs (comm management, RefCell removal)
  • crates/amalthea/src/socket/iopub.rs (CommOutgoing handling)
  • crates/amalthea/src/socket/comm.rs (CommOutgoingTx)

The header now travels with the message instead of being tracked in a separate
`pending_rpcs` HashMap in CommManager:

- CommMsg::Rpc(String, Value) -> CommMsg::Rpc { id, parent_header, data }
- Remove CommManagerEvent::PendingRpc variant
- Remove pending_rpcs HashMap from CommManager
- CommManager reads header directly from CommMsg::Rpc
- Update all construction/matching sites
IOPub gains the ability to handle comm messages:
- Add IOPubMessage::CommOutgoing(String, CommMsg) variant
- Add process_comm_outgoing() method to IOPub
- Handle CommOutgoing in process_outbound_message() (flush stream first)

Nothing sends CommOutgoing yet - this is a pure addition.
Establish the ordering guarantee - comm messages now go through IOPub:

- Introduce CommOutgoingTx wrapper that routes through IOPub
- CommSocket::new takes iopub_tx, remove outgoing_rx field
- Update ServerComm, ServerHandler trait to use CommOutgoingTx
- CommManager no longer polls outgoing_rx (comms route directly)
- Ark-side: use Console::get_iopub_tx() for CommSocket creation
- Add IOPubReceiverExt helper trait to ark_test
- Update all test files to use iopub_rx.recv_comm_msg()

The ordering guarantee is now active: comm messages and other IOPub
messages (like ExecuteResult) go through the same channel.
Eliminate the CommManager thread - Shell now owns comm lifecycle:

- Shell: add open_comms, handle comm events directly via
process_comm_event()
- Shell: handle comm_info_request locally (no longer queries
CommManager)
- Shell: add comm notification socket + receiver (comm_notif_socket,
comm_manager_rx)
- Kernel: unified notifier thread with Forwarder<T> pattern
- Forwarder: consume from source -> forward to destination -> ZMQ
notification
- Kernel: remove comm_manager_tx from connect() signature
- Delete comm_manager.rs
- Clean up event.rs (remove CommManagerRequest, CommManagerInfoReply)
- Remove CommMsgReply, CommMsgEvent from IOPubMessage (now dead code)
- IOPub: use Select instead of select! macro for unified event loop
Post-migration cleanup:
- Update all test files to use iopub_rx.recv_comm_msg() pattern
- Simplify r_variables::send_event() - always sends Data (RPCs use handle_request)
- Remove request_id parameter from update() and send_event()
- Clean up test assertions to use new recv_comm_msg helpers
- Update socket_rpc_request calls to include iopub_rx parameter
@lionel- lionel- force-pushed the task/iopub-comm-unification branch 3 times, most recently from 326fe59 to b440eac Compare February 9, 2026 16:40
When running tests, the `ark` crate is compiled both as a dependency
(via
`ark_test`) and as the test target. Functions marked `#[no_mangle]` were
included in both, causing duplicate symbol linker errors.

Fix by gating `#[no_mangle]` functions with `#[cfg(not(test))]` so
they're
only compiled into the library. Use `extern "C-unwind"` declarations in
`sys/*/console.rs` so the code can still reference those symbols for
passing as callbacks to R.

Also fix minor warnings:
- Remove unused `use super::*` in harp
- Fix function pointer casts in traps.rs
@lionel- lionel- force-pushed the task/iopub-comm-unification branch from b440eac to 1f2a4a4 Compare February 9, 2026 17:30
Blocking send is fine here: for inproc PAIR sockets there's no HWM so send is
just a memory copy once connected. If the peer hasn't fully connected yet
(observed on Windows), blocking simply waits the few microseconds until it does.
Copy link
Contributor

@jmcphers jmcphers left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice improvement!

Previous approach caused RA to see dead code
Comment on lines 519 to +520
Ok(data) => {
// If we were given a request ID, send the response as an RPC;
// otherwise, send it as an event
let comm_msg = match request_id {
Some(id) => CommMsg::Rpc(id, data),
None => CommMsg::Data(data),
};

self.comm.outgoing_tx.send(comm_msg).unwrap()
self.comm.outgoing_tx.send(CommMsg::Data(data)).log_err();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's happened with request_id here?

}

#[no_mangle]
#[cfg_attr(not(test), no_mangle)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's going on here?

When...not running tests we don't mangle? So in release we don't mangle? But in tests we mangle?

What am I missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is fixing a gnarly linker error in tests. I didn't do a deep dive so can't tell more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's pretty strange to me that we'd need this because I thought that we'd want no_mangle no matter what

Comment on lines 62 to +63
comm_manager_tx: Sender<CommManagerEvent>,
iopub_tx: Sender<IOPubMessage>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm seeing both comm_manager_tx: Sender<CommManagerEvent> and iopub_tx: Sender<IOPubMessage> side by side in a lot of the function signatures now.

That feels very good!

Something about that seems a lot more correct, but it's hard to describe exactly what I mean.

Comment on lines +333 to +334
console.get_comm_manager_tx().clone(),
console.get_iopub_tx().clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like this feels more correct to me now, but it's hard to explain why!

let msg = amalthea::comm_rpc_message!("execute", command = debug_request_command(cmd));

tx.send(msg).unwrap();
tx.send(msg).log_err();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 places in this file it switched away from unwrap during a send()

but we still have things like self.r_request_tx.send(RRequest::DebugCommand(cmd)).unwrap()

I thought we had said that for a send(), we typically just assume that is going to work, and thats one of the few things we unconditionally unwrap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I typically log_err since it's so easy. Especially with these non-core components, no need to bring down entire threads (and potentially the main thread along with it) if the R session might still work.

I'd unwrap/panic if IOPub is down for instance, but not the DAP or one of the UI comms.

@@ -23,29 +21,6 @@ pub enum CommManagerEvent {
/// second value is the message.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be fine with CommManagerEvent -> CommEvent if you think that makes sense at this point

};

self.outgoing_tx.send(response).unwrap();
self.outgoing_tx.send(response).log_err();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm noticing more of the unwrap -> log_err transition

I feel like these are probably pretty critical messages?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice you put something in AGENTS about this, and that is probably contributing to this. Maybe send() should be an exception that unwraps() still.

Copy link
Contributor Author

@lionel- lionel- Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment above

let mut info = serde_json::Map::new();

for comm in comms.into_iter() {
for comm in open_comms.iter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice simplication to not have to ask for these from the comm manager

Comment on lines 422 to 427
fn handle_comm_open(
&self,
iopub_tx: &Sender<IOPubMessage>,
shell_handler: &mut Box<dyn ShellHandler>,
server_handlers: &HashMap<String, Arc<Mutex<dyn ServerHandler>>>,
open_comms: &mut Vec<CommSocket>,
msg: &CommOpen,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a bit weird to me that this isn't &mut self

I see you supply open_comms, which is from self, and you modify that directly.

Is this just a weird case where you'd "over borrow" too much if you did &mut self and Rust wouldn't be happy about that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like right now it makes the method look "pure" but it's really not, as it modifies self state. And that's somewhat confusing IMO

self.comm_manager_tx
.send(CommManagerEvent::Closed(msg.comm_id.clone()))
.unwrap();
fn handle_comm_close(open_comms: &mut Vec<CommSocket>, msg: &CommClose) -> crate::Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here of course about feeling like this should be &self mut

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants