From 1e505a9066742098efc3cfa18860b05cae2acdb0 Mon Sep 17 00:00:00 2001 From: jadamcrain Date: Thu, 16 Oct 2025 10:04:20 -0700 Subject: [PATCH 01/10] add configuration type --- rodbus/src/types.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/rodbus/src/types.rs b/rodbus/src/types.rs index 5277a713..c33a100e 100644 --- a/rodbus/src/types.rs +++ b/rodbus/src/types.rs @@ -1,6 +1,7 @@ use crate::decode::AppDecodeLevel; use crate::error::{AduParseError, InvalidRange}; use crate::DecodeLevel; +use std::num::NonZeroUsize; use scursor::ReadCursor; @@ -401,10 +402,13 @@ pub struct ClientOptions { pub(crate) channel_logging: ChannelLoggingMode, pub(crate) max_queued_requests: usize, pub(crate) decode_level: DecodeLevel, + pub(crate) max_failed_requests: Option, } impl ClientOptions { /// Set the channel logging type + /// + /// Note: defaults to [`ChannelLoggingMode::Verbose`] pub fn channel_logging(self, channel_logging: ChannelLoggingMode) -> Self { Self { channel_logging, @@ -413,6 +417,8 @@ impl ClientOptions { } /// Set the maximum number of queued requests + /// + /// Note: defaults to 16 pub fn max_queued_requests(self, max_queued_requests: usize) -> Self { Self { max_queued_requests, @@ -421,12 +427,24 @@ impl ClientOptions { } /// Set the decode level + /// + /// Note: defaults to [`DecodeLevel::default()`] pub fn decode_level(self, decode_level: DecodeLevel) -> Self { Self { decode_level, ..self } } + + /// Set the maximum number of failed requests after which the channel is closed and re-opened + /// + /// Note: defaults to None meaning that there is no limit + pub fn max_failed_requests(self, max_failed_requests: Option) -> Self { + Self { + max_failed_requests, + ..self + } + } } impl Default for ClientOptions { @@ -435,6 +453,7 @@ impl Default for ClientOptions { channel_logging: ChannelLoggingMode::default(), max_queued_requests: 16, decode_level: DecodeLevel::default(), + max_failed_requests: None, } } } From 26fe8c39e950220c10e3fd8a002bb9a5c9de05b1 Mon Sep 17 00:00:00 2001 From: jadamcrain Date: Thu, 16 Oct 2025 11:01:12 -0700 Subject: [PATCH 02/10] integrate new feature that can fail sessions after a configurable number of failed requests --- rodbus/src/client/task.rs | 82 +++++++++++++++++++++++++++++++++---- rodbus/src/serial/client.rs | 4 +- rodbus/src/tcp/client.rs | 6 ++- 3 files changed, 81 insertions(+), 11 deletions(-) diff --git a/rodbus/src/client/task.rs b/rodbus/src/client/task.rs index f9c5ee00..0141e648 100644 --- a/rodbus/src/client/task.rs +++ b/rodbus/src/client/task.rs @@ -1,3 +1,4 @@ +use std::num::NonZeroUsize; use std::time::Duration; use tracing::Instrument; @@ -21,6 +22,8 @@ pub(crate) enum SessionError { BadFrame, /// channel was disabled Disabled, + /// maximum number of sequential failed requests reached + MaxFailedRequests(usize), /// the mpsc is closed (dropped) on the sender side Shutdown, } @@ -58,6 +61,9 @@ impl std::fmt::Display for SessionError { SessionError::Shutdown => { write!(f, "Shutdown was requested") } + SessionError::MaxFailedRequests(max) => { + write!(f, "Maximum number ({max}) of sequential failed requests reached") + } } } } @@ -73,11 +79,58 @@ impl SessionError { } } +enum FailedRequestState { + Disabled, + Enabled { current: usize, max: usize } +} + +struct FailedRequestTracker { + state: FailedRequestState, +} + +impl FailedRequestTracker { + fn new(max_failed_requests: Option) -> Self { + Self { + state: match max_failed_requests { + None => FailedRequestState::Disabled, + Some(max) => FailedRequestState::Enabled { + current: 0, + max: max.get(), + } + } + } + } + + fn reset(&mut self) { + match &mut self.state { + FailedRequestState::Disabled => {} + FailedRequestState::Enabled { current, .. } => { + *current = 0; + } + } + } + + fn increment(&mut self) -> Result<(), SessionError> { + match &mut self.state { + FailedRequestState::Disabled => Ok(()), + FailedRequestState::Enabled { current, max } => { + if current >= max { + Err(SessionError::MaxFailedRequests(*max)) + } else { + *current += 1; + Ok(()) + } + } + } + } +} + pub(crate) struct ClientLoop { rx: crate::channel::Receiver, writer: FrameWriter, reader: FramedReader, tx_id: TxId, + failed_requests: FailedRequestTracker, decode: DecodeLevel, enabled: bool, } @@ -88,12 +141,14 @@ impl ClientLoop { writer: FrameWriter, reader: FramedReader, decode: DecodeLevel, + max_failed_requests: Option, ) -> Self { Self { rx, writer, reader, tx_id: TxId::default(), + failed_requests: FailedRequestTracker::new(max_failed_requests), decode, enabled: false, } @@ -129,6 +184,7 @@ impl ClientLoop { } pub(crate) async fn run(&mut self, io: &mut PhysLayer) -> SessionError { + self.failed_requests.reset(); loop { if let Err(err) = self.poll(io).await { tracing::warn!("ending session: {err}"); @@ -169,16 +225,23 @@ impl ClientLoop { .instrument(tracing::info_span!("Transaction", tx_id = %tx_id)) .await; - if let Err(err) = result { - // Fail the request in ONE place. If the whole future - // gets dropped, then the request gets failed with Shutdown - tracing::warn!("request error: {}", err); - request.details.fail(err); + match result { + Ok(()) => self.failed_requests.reset(), + Err(err) => { + // Fail the request in ONE place. If the whole future + // gets dropped, then the request gets failed with Shutdown + tracing::warn!("request error: {}", err); + request.details.fail(err); + + // some request errors are a session error that will + // bubble up and close the session + if let Some(err) = SessionError::from_request_err(err) { + return Err(err); + } - // some request errors are a session error that will - // bubble up and close the session - if let Some(err) = SessionError::from_request_err(err) { - return Err(err); + // if we reach the maximum number of failed requests, this + // can also terminate the session + self.failed_requests.increment()?; } } @@ -319,6 +382,7 @@ mod tests { FrameWriter::tcp(), FramedReader::tcp(), DecodeLevel::default().application(AppDecodeLevel::DataValues), + None, ); let join_handle = tokio::spawn(async move { let mut phys = PhysLayer::new_mock(mock); diff --git a/rodbus/src/serial/client.rs b/rodbus/src/serial/client.rs index c9226099..8ad8b1fd 100644 --- a/rodbus/src/serial/client.rs +++ b/rodbus/src/serial/client.rs @@ -34,6 +34,7 @@ impl SerialChannelTask { FrameWriter::rtu(), FramedReader::rtu_response(), decode, + None ), listener, } @@ -83,7 +84,8 @@ impl SerialChannelTask { // don't wait, we're disabled SessionError::Disabled => Ok(()), // wait before retrying - SessionError::IoError(_) | SessionError::BadFrame => { + SessionError::IoError(_) | SessionError::BadFrame | SessionError::MaxFailedRequests(_) => { + drop(phys); let delay = self.retry.after_disconnect(); self.listener.update(PortState::Wait(delay)).get().await; tracing::warn!("waiting {} ms to re-open port", delay.as_millis()); diff --git a/rodbus/src/tcp/client.rs b/rodbus/src/tcp/client.rs index d7fd8073..cf1561a7 100644 --- a/rodbus/src/tcp/client.rs +++ b/rodbus/src/tcp/client.rs @@ -106,6 +106,7 @@ impl TcpChannelTask { FrameWriter::tcp(), FramedReader::tcp(), options.decode_level, + options.max_failed_requests, ), listener, channel_logging: options.channel_logging, @@ -175,8 +176,11 @@ impl TcpChannelTask { match self.client_loop.run(&mut phys).await { // the mpsc was closed, end the task SessionError::Shutdown => Err(StateChange::Shutdown), + // don't wait, we're disabled + SessionError::Disabled => Ok(()), // re-establish the connection - SessionError::Disabled | SessionError::IoError(_) | SessionError::BadFrame => { + SessionError::IoError(_) | SessionError::BadFrame | SessionError::MaxFailedRequests(_) => { + drop(phys); let delay = self.connect_retry.after_disconnect(); log_channel_event!(self.channel_logging, "waiting {:?} to reconnect", delay); self.listener From 573786f74a8491f60158768125727ae3084a76b7 Mon Sep 17 00:00:00 2001 From: jadamcrain Date: Thu, 16 Oct 2025 11:29:13 -0700 Subject: [PATCH 03/10] fix off by one error --- rodbus/src/client/task.rs | 15 +++++++++------ rodbus/src/serial/client.rs | 6 ++++-- rodbus/src/tcp/client.rs | 4 +++- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/rodbus/src/client/task.rs b/rodbus/src/client/task.rs index 0141e648..177b1ddb 100644 --- a/rodbus/src/client/task.rs +++ b/rodbus/src/client/task.rs @@ -62,7 +62,10 @@ impl std::fmt::Display for SessionError { write!(f, "Shutdown was requested") } SessionError::MaxFailedRequests(max) => { - write!(f, "Maximum number ({max}) of sequential failed requests reached") + write!( + f, + "Maximum number ({max}) of sequential failed requests reached" + ) } } } @@ -81,7 +84,7 @@ impl SessionError { enum FailedRequestState { Disabled, - Enabled { current: usize, max: usize } + Enabled { current: usize, max: usize }, } struct FailedRequestTracker { @@ -96,15 +99,15 @@ impl FailedRequestTracker { Some(max) => FailedRequestState::Enabled { current: 0, max: max.get(), - } - } + }, + }, } } fn reset(&mut self) { match &mut self.state { FailedRequestState::Disabled => {} - FailedRequestState::Enabled { current, .. } => { + FailedRequestState::Enabled { current, .. } => { *current = 0; } } @@ -114,10 +117,10 @@ impl FailedRequestTracker { match &mut self.state { FailedRequestState::Disabled => Ok(()), FailedRequestState::Enabled { current, max } => { + *current = current.wrapping_add(1); if current >= max { Err(SessionError::MaxFailedRequests(*max)) } else { - *current += 1; Ok(()) } } diff --git a/rodbus/src/serial/client.rs b/rodbus/src/serial/client.rs index 8ad8b1fd..b9d08b06 100644 --- a/rodbus/src/serial/client.rs +++ b/rodbus/src/serial/client.rs @@ -34,7 +34,7 @@ impl SerialChannelTask { FrameWriter::rtu(), FramedReader::rtu_response(), decode, - None + None, ), listener, } @@ -84,7 +84,9 @@ impl SerialChannelTask { // don't wait, we're disabled SessionError::Disabled => Ok(()), // wait before retrying - SessionError::IoError(_) | SessionError::BadFrame | SessionError::MaxFailedRequests(_) => { + SessionError::IoError(_) + | SessionError::BadFrame + | SessionError::MaxFailedRequests(_) => { drop(phys); let delay = self.retry.after_disconnect(); self.listener.update(PortState::Wait(delay)).get().await; diff --git a/rodbus/src/tcp/client.rs b/rodbus/src/tcp/client.rs index cf1561a7..c53c5313 100644 --- a/rodbus/src/tcp/client.rs +++ b/rodbus/src/tcp/client.rs @@ -179,7 +179,9 @@ impl TcpChannelTask { // don't wait, we're disabled SessionError::Disabled => Ok(()), // re-establish the connection - SessionError::IoError(_) | SessionError::BadFrame | SessionError::MaxFailedRequests(_) => { + SessionError::IoError(_) + | SessionError::BadFrame + | SessionError::MaxFailedRequests(_) => { drop(phys); let delay = self.connect_retry.after_disconnect(); log_channel_event!(self.channel_logging, "waiting {:?} to reconnect", delay); From 41ad5efed13c023da6b6e06dec93e932c0555b50 Mon Sep 17 00:00:00 2001 From: jadamcrain Date: Thu, 16 Oct 2025 11:46:33 -0700 Subject: [PATCH 04/10] clarify that its response timeouts not just any request error --- rodbus/src/client/task.rs | 55 ++++++++++++++++++------------------- rodbus/src/serial/client.rs | 2 +- rodbus/src/tcp/client.rs | 4 +-- rodbus/src/types.rs | 12 ++++---- 4 files changed, 36 insertions(+), 37 deletions(-) diff --git a/rodbus/src/client/task.rs b/rodbus/src/client/task.rs index 177b1ddb..b84e6e6a 100644 --- a/rodbus/src/client/task.rs +++ b/rodbus/src/client/task.rs @@ -22,8 +22,8 @@ pub(crate) enum SessionError { BadFrame, /// channel was disabled Disabled, - /// maximum number of sequential failed requests reached - MaxFailedRequests(usize), + /// maximum number of consecutive response timeouts reached + MaxTimeouts(usize), /// the mpsc is closed (dropped) on the sender side Shutdown, } @@ -61,11 +61,8 @@ impl std::fmt::Display for SessionError { SessionError::Shutdown => { write!(f, "Shutdown was requested") } - SessionError::MaxFailedRequests(max) => { - write!( - f, - "Maximum number ({max}) of sequential failed requests reached" - ) + SessionError::MaxTimeouts(max) => { + write!(f, "Maximum number ({max}) of consecutive timeouts reached") } } } @@ -82,21 +79,21 @@ impl SessionError { } } -enum FailedRequestState { +enum TimeoutCounterState { Disabled, Enabled { current: usize, max: usize }, } -struct FailedRequestTracker { - state: FailedRequestState, +struct TimeoutCounter { + state: TimeoutCounterState, } -impl FailedRequestTracker { - fn new(max_failed_requests: Option) -> Self { +impl TimeoutCounter { + fn new(max_timeouts: Option) -> Self { Self { - state: match max_failed_requests { - None => FailedRequestState::Disabled, - Some(max) => FailedRequestState::Enabled { + state: match max_timeouts { + None => TimeoutCounterState::Disabled, + Some(max) => TimeoutCounterState::Enabled { current: 0, max: max.get(), }, @@ -106,8 +103,8 @@ impl FailedRequestTracker { fn reset(&mut self) { match &mut self.state { - FailedRequestState::Disabled => {} - FailedRequestState::Enabled { current, .. } => { + TimeoutCounterState::Disabled => {} + TimeoutCounterState::Enabled { current, .. } => { *current = 0; } } @@ -115,11 +112,11 @@ impl FailedRequestTracker { fn increment(&mut self) -> Result<(), SessionError> { match &mut self.state { - FailedRequestState::Disabled => Ok(()), - FailedRequestState::Enabled { current, max } => { + TimeoutCounterState::Disabled => Ok(()), + TimeoutCounterState::Enabled { current, max } => { *current = current.wrapping_add(1); if current >= max { - Err(SessionError::MaxFailedRequests(*max)) + Err(SessionError::MaxTimeouts(*max)) } else { Ok(()) } @@ -133,7 +130,7 @@ pub(crate) struct ClientLoop { writer: FrameWriter, reader: FramedReader, tx_id: TxId, - failed_requests: FailedRequestTracker, + timeout_counter: TimeoutCounter, decode: DecodeLevel, enabled: bool, } @@ -144,14 +141,14 @@ impl ClientLoop { writer: FrameWriter, reader: FramedReader, decode: DecodeLevel, - max_failed_requests: Option, + max_timeouts: Option, ) -> Self { Self { rx, writer, reader, tx_id: TxId::default(), - failed_requests: FailedRequestTracker::new(max_failed_requests), + timeout_counter: TimeoutCounter::new(max_timeouts), decode, enabled: false, } @@ -187,7 +184,7 @@ impl ClientLoop { } pub(crate) async fn run(&mut self, io: &mut PhysLayer) -> SessionError { - self.failed_requests.reset(); + self.timeout_counter.reset(); loop { if let Err(err) = self.poll(io).await { tracing::warn!("ending session: {err}"); @@ -229,7 +226,7 @@ impl ClientLoop { .await; match result { - Ok(()) => self.failed_requests.reset(), + Ok(()) => self.timeout_counter.reset(), Err(err) => { // Fail the request in ONE place. If the whole future // gets dropped, then the request gets failed with Shutdown @@ -242,9 +239,11 @@ impl ClientLoop { return Err(err); } - // if we reach the maximum number of failed requests, this - // can also terminate the session - self.failed_requests.increment()?; + // if we reach the maximum number of consecutive timeouts, + // this can also terminate the session + if err == RequestError::ResponseTimeout { + self.timeout_counter.increment()?; + } } } diff --git a/rodbus/src/serial/client.rs b/rodbus/src/serial/client.rs index b9d08b06..b8453e8e 100644 --- a/rodbus/src/serial/client.rs +++ b/rodbus/src/serial/client.rs @@ -86,7 +86,7 @@ impl SerialChannelTask { // wait before retrying SessionError::IoError(_) | SessionError::BadFrame - | SessionError::MaxFailedRequests(_) => { + | SessionError::MaxTimeouts(_) => { drop(phys); let delay = self.retry.after_disconnect(); self.listener.update(PortState::Wait(delay)).get().await; diff --git a/rodbus/src/tcp/client.rs b/rodbus/src/tcp/client.rs index c53c5313..91f3e738 100644 --- a/rodbus/src/tcp/client.rs +++ b/rodbus/src/tcp/client.rs @@ -106,7 +106,7 @@ impl TcpChannelTask { FrameWriter::tcp(), FramedReader::tcp(), options.decode_level, - options.max_failed_requests, + options.max_timeouts, ), listener, channel_logging: options.channel_logging, @@ -181,7 +181,7 @@ impl TcpChannelTask { // re-establish the connection SessionError::IoError(_) | SessionError::BadFrame - | SessionError::MaxFailedRequests(_) => { + | SessionError::MaxTimeouts(_) => { drop(phys); let delay = self.connect_retry.after_disconnect(); log_channel_event!(self.channel_logging, "waiting {:?} to reconnect", delay); diff --git a/rodbus/src/types.rs b/rodbus/src/types.rs index c33a100e..cc984376 100644 --- a/rodbus/src/types.rs +++ b/rodbus/src/types.rs @@ -402,7 +402,7 @@ pub struct ClientOptions { pub(crate) channel_logging: ChannelLoggingMode, pub(crate) max_queued_requests: usize, pub(crate) decode_level: DecodeLevel, - pub(crate) max_failed_requests: Option, + pub(crate) max_timeouts: Option, } impl ClientOptions { @@ -436,12 +436,12 @@ impl ClientOptions { } } - /// Set the maximum number of failed requests after which the channel is closed and re-opened + /// Set the maximum number of consecutive response timeouts before forcing a reconnect /// - /// Note: defaults to None meaning that there is no limit - pub fn max_failed_requests(self, max_failed_requests: Option) -> Self { + /// Defaults to `None` (no limit) + pub fn max_response_timeouts(self, max_timeouts: Option) -> Self { Self { - max_failed_requests, + max_timeouts, ..self } } @@ -453,7 +453,7 @@ impl Default for ClientOptions { channel_logging: ChannelLoggingMode::default(), max_queued_requests: 16, decode_level: DecodeLevel::default(), - max_failed_requests: None, + max_timeouts: None, } } } From e7aa3cec876d6a93b60fd08a40469781b36fdb22 Mon Sep 17 00:00:00 2001 From: jadamcrain Date: Thu, 16 Oct 2025 11:51:14 -0700 Subject: [PATCH 05/10] tweak docs --- rodbus/src/types.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rodbus/src/types.rs b/rodbus/src/types.rs index cc984376..5961712d 100644 --- a/rodbus/src/types.rs +++ b/rodbus/src/types.rs @@ -438,6 +438,10 @@ impl ClientOptions { /// Set the maximum number of consecutive response timeouts before forcing a reconnect /// + /// Useful for detecting dead TCP connections where the remote device stops responding + /// but doesn't send a proper FIN/RST (e.g., due to network issues or third-party interference). + /// The counter resets on any successful request. + /// /// Defaults to `None` (no limit) pub fn max_response_timeouts(self, max_timeouts: Option) -> Self { Self { From 1143a2f41f45be5b0b4fc5310a5c58e86a562c58 Mon Sep 17 00:00:00 2001 From: jadamcrain Date: Thu, 16 Oct 2025 12:30:09 -0700 Subject: [PATCH 06/10] test cases --- rodbus/src/client/task.rs | 73 +++++++++++++++++++++++++++++++++++++-- rodbus/src/tcp/client.rs | 4 +-- 2 files changed, 72 insertions(+), 5 deletions(-) diff --git a/rodbus/src/client/task.rs b/rodbus/src/client/task.rs index b84e6e6a..5c0037b0 100644 --- a/rodbus/src/client/task.rs +++ b/rodbus/src/client/task.rs @@ -372,7 +372,9 @@ mod tests { use sfio_tokio_mock_io::Event; - fn spawn_client_loop() -> ( + fn spawn_client_loop_with_max_timeouts( + max_timeouts: Option, + ) -> ( Channel, tokio::task::JoinHandle, sfio_tokio_mock_io::Handle, @@ -384,7 +386,7 @@ mod tests { FrameWriter::tcp(), FramedReader::tcp(), DecodeLevel::default().application(AppDecodeLevel::DataValues), - None, + max_timeouts, ); let join_handle = tokio::spawn(async move { let mut phys = PhysLayer::new_mock(mock); @@ -394,6 +396,14 @@ mod tests { (channel, join_handle, io_handle) } + fn spawn_client_loop() -> ( + Channel, + tokio::task::JoinHandle, + sfio_tokio_mock_io::Handle, + ) { + spawn_client_loop_with_max_timeouts(None) + } + fn get_framed_adu(function: FunctionCode, payload: &T) -> Vec where T: Serialize + Loggable + Sized, @@ -530,4 +540,63 @@ mod tests { vec![Indexed::new(7, true), Indexed::new(8, false)] ); } + + #[tokio::test] + async fn terminates_after_max_consecutive_timeouts() { + let (channel, task, mut io) = spawn_client_loop_with_max_timeouts(NonZeroUsize::new(3)); + + tokio::time::pause(); + + let range = AddressRange::try_from(7, 2).unwrap(); + + // spawn 3 requests that will all timeout + for _ in 0..3 { + let mut ch = channel.clone(); + tokio::spawn(async move { + ch.read_coils( + RequestParam::new(UnitId::new(1), Duration::from_secs(1)), + range, + ) + .await + }); + + // wait for write, don't care about exact tx_id + match io.next_event().await { + Event::Write(_) => {} + other => panic!("Expected Write, got {:?}", other), + } + } + + // session should terminate with MaxTimeouts(3) + assert_eq!(task.await.unwrap(), SessionError::MaxTimeouts(3)); + } + + #[tokio::test] + async fn disabled_when_none_allows_unlimited_timeouts() { + let (channel, task, mut io) = spawn_client_loop_with_max_timeouts(None); + + tokio::time::pause(); + + let range = AddressRange::try_from(7, 2).unwrap(); + + // send 10 requests that all timeout + for _ in 0..10 { + let mut ch = channel.clone(); + tokio::spawn(async move { + ch.read_coils( + RequestParam::new(UnitId::new(1), Duration::from_secs(1)), + range, + ) + .await + }); + + match io.next_event().await { + Event::Write(_) => {} + other => panic!("Expected Write, got {:?}", other), + } + } + + // task should still be running + assert!(!task.is_finished()); + } } diff --git a/rodbus/src/tcp/client.rs b/rodbus/src/tcp/client.rs index 91f3e738..7729d951 100644 --- a/rodbus/src/tcp/client.rs +++ b/rodbus/src/tcp/client.rs @@ -179,9 +179,7 @@ impl TcpChannelTask { // don't wait, we're disabled SessionError::Disabled => Ok(()), // re-establish the connection - SessionError::IoError(_) - | SessionError::BadFrame - | SessionError::MaxTimeouts(_) => { + SessionError::IoError(_) | SessionError::BadFrame | SessionError::MaxTimeouts(_) => { drop(phys); let delay = self.connect_retry.after_disconnect(); log_channel_event!(self.channel_logging, "waiting {:?} to reconnect", delay); From 6fc07db06bebd3c9aac9ef9de808dadd9d64a8fe Mon Sep 17 00:00:00 2001 From: jadamcrain Date: Thu, 16 Oct 2025 12:42:50 -0700 Subject: [PATCH 07/10] test the successful responses reset the counter to zero --- rodbus/src/client/task.rs | 86 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) diff --git a/rodbus/src/client/task.rs b/rodbus/src/client/task.rs index 5c0037b0..e0d20c6d 100644 --- a/rodbus/src/client/task.rs +++ b/rodbus/src/client/task.rs @@ -599,4 +599,90 @@ mod tests { // task should still be running assert!(!task.is_finished()); } + + #[tokio::test] + async fn counter_resets_on_successful_request() { + let (channel, task, mut io) = spawn_client_loop_with_max_timeouts(NonZeroUsize::new(3)); + + tokio::time::pause(); + + let range = AddressRange::try_from(7, 2).unwrap(); + + // Pattern: timeout -> timeout -> success -> timeout -> timeout + // With max=3, this should NOT terminate because the success resets the counter + + // First two timeouts + for _ in 0..2 { + let mut ch = channel.clone(); + tokio::spawn(async move { + ch.read_coils( + RequestParam::new(UnitId::new(1), Duration::from_secs(1)), + range, + ) + .await + }); + match io.next_event().await { + Event::Write(_) => {} + other => panic!("Expected Write, got {:?}", other), + } + } + + // Successful request + let success_task = tokio::spawn({ + let mut ch = channel.clone(); + async move { + ch.read_coils( + RequestParam::new(UnitId::new(1), Duration::from_secs(1)), + range, + ) + .await + } + }); + + // Get the request and respond with matching tx_id + let request_bytes = match io.next_event().await { + Event::Write(bytes) => bytes, + other => panic!("Expected Write, got {:?}", other), + }; + + let mut response = get_framed_adu( + FunctionCode::ReadCoils, + &BitWriter::new(ReadBitsRange { inner: range }, |idx| match idx { + 7 => Ok(true), + 8 => Ok(false), + _ => Err(ExceptionCode::IllegalDataAddress), + }), + ); + response[0] = request_bytes[0]; + response[1] = request_bytes[1]; + + io.read(&response); + + // The response will be read by the client loop + match io.next_event().await { + Event::Read => {} // Expected - client loop reads our response + other => panic!("Expected Read after providing response, got {:?}", other), + } + + assert!(success_task.await.unwrap().is_ok()); + + // Two more timeouts - should NOT terminate since counter was reset + for _ in 0..2 { + let mut ch = channel.clone(); + tokio::spawn(async move { + ch.read_coils( + RequestParam::new(UnitId::new(1), Duration::from_secs(1)), + range, + ) + .await + }); + match io.next_event().await { + Event::Write(_) => {} + other => panic!("Expected Write, got {:?}", other), + } + } + + // Task should still be running (only 2 consecutive timeouts, not 3) + assert!(!task.is_finished()); + } } From 5adc16641b615f598f87e36d4c439c2c4c82d7c7 Mon Sep 17 00:00:00 2001 From: jadamcrain Date: Thu, 16 Oct 2025 13:02:05 -0700 Subject: [PATCH 08/10] anything other than a response timeout also resets the counter --- rodbus/src/client/task.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rodbus/src/client/task.rs b/rodbus/src/client/task.rs index e0d20c6d..3da3c79e 100644 --- a/rodbus/src/client/task.rs +++ b/rodbus/src/client/task.rs @@ -243,6 +243,8 @@ impl ClientLoop { // this can also terminate the session if err == RequestError::ResponseTimeout { self.timeout_counter.increment()?; + } else { + self.timeout_counter.reset(); } } } @@ -685,4 +687,5 @@ mod tests { // Task should still be running (only 2 consecutive timeouts, not 3) assert!(!task.is_finished()); } + } From 2eb80898ebe3c7aaf2d14a1c0099ba1e0b9b3985 Mon Sep 17 00:00:00 2001 From: jadamcrain Date: Thu, 16 Oct 2025 15:07:36 -0700 Subject: [PATCH 09/10] comments / formatting --- rodbus/src/client/task.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/rodbus/src/client/task.rs b/rodbus/src/client/task.rs index 3da3c79e..f5d90a75 100644 --- a/rodbus/src/client/task.rs +++ b/rodbus/src/client/task.rs @@ -239,11 +239,13 @@ impl ClientLoop { return Err(err); } - // if we reach the maximum number of consecutive timeouts, - // this can also terminate the session if err == RequestError::ResponseTimeout { + // if we reach the maximum number of consecutive timeouts, + // this can also terminate the session self.timeout_counter.increment()?; } else { + // all other errors reset the response timeout counter, + // e.g. a Modbus exception self.timeout_counter.reset(); } } @@ -687,5 +689,4 @@ mod tests { // Task should still be running (only 2 consecutive timeouts, not 3) assert!(!task.is_finished()); } - } From 1e229e59625557a0a9d2e5336da164876fdcb5b6 Mon Sep 17 00:00:00 2001 From: jadamcrain Date: Thu, 16 Oct 2025 15:24:21 -0700 Subject: [PATCH 10/10] saturating add --- rodbus/src/client/task.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rodbus/src/client/task.rs b/rodbus/src/client/task.rs index f5d90a75..b3a00447 100644 --- a/rodbus/src/client/task.rs +++ b/rodbus/src/client/task.rs @@ -114,7 +114,7 @@ impl TimeoutCounter { match &mut self.state { TimeoutCounterState::Disabled => Ok(()), TimeoutCounterState::Enabled { current, max } => { - *current = current.wrapping_add(1); + *current = current.saturating_add(1); if current >= max { Err(SessionError::MaxTimeouts(*max)) } else {