From 58ff15013551b5a34b7638696e5a39c2fd0113c4 Mon Sep 17 00:00:00 2001 From: Nathan Lee Date: Fri, 3 Oct 2025 14:51:56 -0700 Subject: [PATCH 1/9] wip --- ffi/rodbus-ffi/src/client.rs | 10 +++++++ rodbus-client/src/main.rs | 4 +++ rodbus/src/client/listener.rs | 7 +++++ rodbus/src/client/mod.rs | 34 ++++++++++++++++++++++++ rodbus/src/tcp/client.rs | 45 +++++++++++++++++++++++++++----- rodbus/src/tcp/tls/client.rs | 3 ++- rodbus/src/types.rs | 15 +++++++++++ rodbus/tests/integration_test.rs | 5 ++++ 8 files changed, 115 insertions(+), 8 deletions(-) diff --git a/ffi/rodbus-ffi/src/client.rs b/ffi/rodbus-ffi/src/client.rs index 58903dce..70c0a7a2 100644 --- a/ffi/rodbus-ffi/src/client.rs +++ b/ffi/rodbus-ffi/src/client.rs @@ -331,6 +331,12 @@ impl Listener for ClientStateListener { self.inner.on_change(value.into()); MaybeAsync::ready(()) } + + fn get_value(&self) -> Option<&ClientState> { + Some(self.inner.into()) + } + + } impl From for Box> { @@ -350,6 +356,10 @@ impl Listener for PortStateListener { self.inner.on_change(value.into()); MaybeAsync::ready(()) } + + fn get_value(&self) -> Option<&rodbus::client::PortState> { + None + } } #[cfg(feature = "serial")] diff --git a/rodbus-client/src/main.rs b/rodbus-client/src/main.rs index dfde4139..bc62a17a 100644 --- a/rodbus-client/src/main.rs +++ b/rodbus-client/src/main.rs @@ -255,6 +255,10 @@ where }; MaybeAsync::asynchronous(future) } + + fn get_value(&self) -> Option<&T> { + None + } } #[tokio::main(flavor = "multi_thread")] diff --git a/rodbus/src/client/listener.rs b/rodbus/src/client/listener.rs index d3f5b7b3..86eb18da 100644 --- a/rodbus/src/client/listener.rs +++ b/rodbus/src/client/listener.rs @@ -6,6 +6,9 @@ pub trait Listener: Send { fn update(&mut self, _value: T) -> MaybeAsync<()> { MaybeAsync::ready(()) } + + /// Get the value of T + fn get_value(&self) -> Option<&T>; } /// Listener that does nothing @@ -23,6 +26,10 @@ impl Listener for NullListener { fn update(&mut self, _value: T) -> MaybeAsync<()> { MaybeAsync::ready(()) } + + fn get_value(&self) -> Option<&T> { + None + } } /// State of TCP/TLS client connection diff --git a/rodbus/src/client/mod.rs b/rodbus/src/client/mod.rs index 32f1d8c5..4c40f683 100644 --- a/rodbus/src/client/mod.rs +++ b/rodbus/src/client/mod.rs @@ -17,6 +17,7 @@ pub use crate::client::channel::*; pub use crate::client::listener::*; pub use crate::client::requests::write_multiple::WriteMultiple; pub use crate::retry::*; +use crate::LoggingStrategy; #[cfg(feature = "ffi")] pub use ffi_channel::*; @@ -112,6 +113,39 @@ pub fn spawn_tcp_client_task( retry, decode, listener.unwrap_or_else(|| NullListener::create()), + LoggingStrategy::All + ) +} + + +/// Spawns a channel task onto the runtime that maintains a TCP connection and processes +/// requests. The task completes when the returned channel handle is dropped. +/// +/// The channel uses the provided [`RetryStrategy`] to pause between failed connection attempts +/// +/// * `host` - Address/port of the remote server. Can be a IP address or name on which to perform DNS resolution. +/// * `max_queued_requests` - The maximum size of the request queue +/// * `retry` - A boxed trait object that controls when the connection is retried on failure +/// * `decode` - Decode log level +/// * `listener` - Optional callback to monitor the TCP connection state +/// * `logging_strategy` - An optional +/// +/// `WARNING`: This function must be called from with the context of the Tokio runtime or it will panic. +pub fn spawn_tcp_client_task_2( + host: HostAddr, + max_queued_requests: usize, + retry: Box, + decode: DecodeLevel, + listener: Option>>, + logging_strategy: LoggingStrategy, +) -> Channel { + crate::tcp::client::spawn_tcp_channel( + host, + max_queued_requests, + retry, + decode, + listener.unwrap_or_else(|| NullListener::create()), + logging_strategy, ) } diff --git a/rodbus/src/tcp/client.rs b/rodbus/src/tcp/client.rs index 762ca38d..64921498 100644 --- a/rodbus/src/tcp/client.rs +++ b/rodbus/src/tcp/client.rs @@ -9,6 +9,7 @@ use crate::client::task::{ClientLoop, SessionError, StateChange}; use crate::common::frame::{FrameWriter, FramedReader}; use crate::error::Shutdown; use crate::retry::RetryStrategy; +use crate::LoggingStrategy; use tokio::net::TcpStream; @@ -18,9 +19,10 @@ pub(crate) fn spawn_tcp_channel( connect_retry: Box, decode: DecodeLevel, listener: Box>, + logging_strategy: LoggingStrategy, ) -> Channel { let (handle, task) = - create_tcp_channel(host, max_queued_requests, connect_retry, decode, listener); + create_tcp_channel(host, max_queued_requests, connect_retry, decode, listener, logging_strategy); tokio::spawn(task); handle } @@ -31,6 +33,7 @@ pub(crate) fn create_tcp_channel( connect_retry: Box, decode: DecodeLevel, listener: Box>, + logging_strategy: LoggingStrategy ) -> (Channel, impl std::future::Future) { let (tx, rx) = tokio::sync::mpsc::channel(max_queued_requests); let task = async move { @@ -41,6 +44,7 @@ pub(crate) fn create_tcp_channel( connect_retry, decode, listener, + logging_strategy ) .run() .instrument(tracing::info_span!("Modbus-Client-TCP", endpoint = ?host)) @@ -75,6 +79,7 @@ pub(crate) struct TcpChannelTask { connection_handler: TcpTaskConnectionHandler, client_loop: ClientLoop, listener: Box>, + logging_strategy: LoggingStrategy, } impl TcpChannelTask { @@ -85,6 +90,7 @@ impl TcpChannelTask { connect_retry: Box, decode: DecodeLevel, listener: Box>, + logging_strategy: LoggingStrategy, ) -> Self { Self { host, @@ -92,6 +98,7 @@ impl TcpChannelTask { connection_handler, client_loop: ClientLoop::new(rx, FrameWriter::tcp(), FramedReader::tcp(), decode), listener, + logging_strategy } } @@ -131,16 +138,36 @@ impl TcpChannelTask { } async fn try_connect_and_run(&mut self) -> Result<(), StateChange> { + // let state = self.listener.get_value().clone(); + self.listener.update(ClientState::Connecting).get().await; match self.connect().await? { Err(err) => { let delay = self.connect_retry.after_failed_connect(); - tracing::warn!( - "failed to connect to {}: {} - waiting {} ms before next attempt", - self.host, - err, - delay.as_millis() - ); + // * we could do a listener.state == connected, then we log "move from connected to disconnected" + match self.logging_strategy { + LoggingStrategy::All => { + tracing::warn!( + "failed to connect to {}: {} - waiting {} ms before next attempt", + self.host, + err, + delay.as_millis() + ); + }, + LoggingStrategy::StateDriven => { + // match state { + // Some(x) => match x { + // ClientState::Disabled => todo!(), + // ClientState::Connecting => todo!(), + // ClientState::Connected => todo!(), + // ClientState::WaitAfterFailedConnect(_) => todo!(), + // ClientState::WaitAfterDisconnect(_) => todo!(), + // ClientState::Shutdown => todo!(), + // }, + // None => todo!(), + // } + }, + } self.listener .update(ClientState::WaitAfterFailedConnect(delay)) .get() @@ -149,6 +176,7 @@ impl TcpChannelTask { } Ok(socket) => { if let Ok(addr) = socket.peer_addr() { + // * if disconnected before, then state "connected" tracing::info!("connected to: {}", addr); } if let Err(err) = socket.set_nodelay(true) { @@ -157,6 +185,9 @@ impl TcpChannelTask { match self.connection_handler.handle(socket, &self.host).await { Err(err) => { let delay = self.connect_retry.after_failed_connect(); + + // match state {} + tracing::warn!( "{} - waiting {} ms before next attempt", err, diff --git a/rodbus/src/tcp/tls/client.rs b/rodbus/src/tcp/tls/client.rs index 963d2024..85c380e3 100644 --- a/rodbus/src/tcp/tls/client.rs +++ b/rodbus/src/tcp/tls/client.rs @@ -15,7 +15,7 @@ use crate::common::phys::PhysLayer; use crate::tcp::client::{TcpChannelTask, TcpTaskConnectionHandler}; use crate::tcp::tls::{CertificateMode, MinTlsVersion, TlsError}; -use crate::DecodeLevel; +use crate::{DecodeLevel, LoggingStrategy}; /// TLS configuration pub struct TlsClientConfig { @@ -60,6 +60,7 @@ pub(crate) fn create_tls_channel( connect_retry, decode, listener, + LoggingStrategy::All ) .run() .instrument(tracing::info_span!("Modbus-Client-TCP", endpoint = ?host)) diff --git a/rodbus/src/types.rs b/rodbus/src/types.rs index e35a2489..2e09be45 100644 --- a/rodbus/src/types.rs +++ b/rodbus/src/types.rs @@ -375,6 +375,21 @@ impl Default for UnitId { } } +/// How verbose to make the logging for connects & disconnects +#[derive(Clone, Copy)] +pub enum LoggingStrategy { + /// Log it all + All, + /// Only log State changes + StateDriven, +} + +impl Default for LoggingStrategy { + fn default() -> Self { + Self::All + } +} + #[cfg(test)] mod tests { use crate::error::*; diff --git a/rodbus/tests/integration_test.rs b/rodbus/tests/integration_test.rs index 5ee6af1e..7828b7d9 100644 --- a/rodbus/tests/integration_test.rs +++ b/rodbus/tests/integration_test.rs @@ -41,6 +41,11 @@ impl Listener for ClientStateListener { }; MaybeAsync::asynchronous(update) } + + fn get_value(&self) -> Option<&ClientState> { + self.as_ref() + } + } impl RequestHandler for Handler { From cd0b99961cc2d7a0b6f11e13d38e1916ad19cc96 Mon Sep 17 00:00:00 2001 From: nslee333 Date: Mon, 6 Oct 2025 11:32:56 -0700 Subject: [PATCH 2/9] fmt, reverting Listener changes and clippy suggestions --- ffi/rodbus-ffi/src/client.rs | 10 ------- ffi/rodbus-ffi/src/iterator.rs | 4 +-- rodbus-client/src/main.rs | 4 --- rodbus/src/client/listener.rs | 7 ----- rodbus/src/client/mod.rs | 3 +- rodbus/src/tcp/client.rs | 51 +++++++++++++------------------- rodbus/src/tcp/tls/client.rs | 2 +- rodbus/tests/integration_test.rs | 5 ---- 8 files changed, 24 insertions(+), 62 deletions(-) diff --git a/ffi/rodbus-ffi/src/client.rs b/ffi/rodbus-ffi/src/client.rs index 70c0a7a2..58903dce 100644 --- a/ffi/rodbus-ffi/src/client.rs +++ b/ffi/rodbus-ffi/src/client.rs @@ -331,12 +331,6 @@ impl Listener for ClientStateListener { self.inner.on_change(value.into()); MaybeAsync::ready(()) } - - fn get_value(&self) -> Option<&ClientState> { - Some(self.inner.into()) - } - - } impl From for Box> { @@ -356,10 +350,6 @@ impl Listener for PortStateListener { self.inner.on_change(value.into()); MaybeAsync::ready(()) } - - fn get_value(&self) -> Option<&rodbus::client::PortState> { - None - } } #[cfg(feature = "serial")] diff --git a/ffi/rodbus-ffi/src/iterator.rs b/ffi/rodbus-ffi/src/iterator.rs index daf0f697..4a406336 100644 --- a/ffi/rodbus-ffi/src/iterator.rs +++ b/ffi/rodbus-ffi/src/iterator.rs @@ -30,7 +30,7 @@ impl<'a> RegisterValueIterator<'a> { } pub(crate) unsafe fn bit_value_iterator_next( - it: *mut crate::BitValueIterator, + it: *mut crate::BitValueIterator<'_>, ) -> Option<&crate::ffi::BitValue> { match it.as_mut() { Some(it) => match it.inner.next() { @@ -46,7 +46,7 @@ pub(crate) unsafe fn bit_value_iterator_next( } pub(crate) unsafe fn register_value_iterator_next( - it: *mut crate::RegisterValueIterator, + it: *mut crate::RegisterValueIterator<'_>, ) -> Option<&crate::ffi::RegisterValue> { match it.as_mut() { Some(it) => match it.inner.next() { diff --git a/rodbus-client/src/main.rs b/rodbus-client/src/main.rs index bc62a17a..dfde4139 100644 --- a/rodbus-client/src/main.rs +++ b/rodbus-client/src/main.rs @@ -255,10 +255,6 @@ where }; MaybeAsync::asynchronous(future) } - - fn get_value(&self) -> Option<&T> { - None - } } #[tokio::main(flavor = "multi_thread")] diff --git a/rodbus/src/client/listener.rs b/rodbus/src/client/listener.rs index 86eb18da..d3f5b7b3 100644 --- a/rodbus/src/client/listener.rs +++ b/rodbus/src/client/listener.rs @@ -6,9 +6,6 @@ pub trait Listener: Send { fn update(&mut self, _value: T) -> MaybeAsync<()> { MaybeAsync::ready(()) } - - /// Get the value of T - fn get_value(&self) -> Option<&T>; } /// Listener that does nothing @@ -26,10 +23,6 @@ impl Listener for NullListener { fn update(&mut self, _value: T) -> MaybeAsync<()> { MaybeAsync::ready(()) } - - fn get_value(&self) -> Option<&T> { - None - } } /// State of TCP/TLS client connection diff --git a/rodbus/src/client/mod.rs b/rodbus/src/client/mod.rs index 4c40f683..ffc3c6bf 100644 --- a/rodbus/src/client/mod.rs +++ b/rodbus/src/client/mod.rs @@ -113,11 +113,10 @@ pub fn spawn_tcp_client_task( retry, decode, listener.unwrap_or_else(|| NullListener::create()), - LoggingStrategy::All + LoggingStrategy::All, ) } - /// Spawns a channel task onto the runtime that maintains a TCP connection and processes /// requests. The task completes when the returned channel handle is dropped. /// diff --git a/rodbus/src/tcp/client.rs b/rodbus/src/tcp/client.rs index 64921498..461e03d3 100644 --- a/rodbus/src/tcp/client.rs +++ b/rodbus/src/tcp/client.rs @@ -21,8 +21,14 @@ pub(crate) fn spawn_tcp_channel( listener: Box>, logging_strategy: LoggingStrategy, ) -> Channel { - let (handle, task) = - create_tcp_channel(host, max_queued_requests, connect_retry, decode, listener, logging_strategy); + let (handle, task) = create_tcp_channel( + host, + max_queued_requests, + connect_retry, + decode, + listener, + logging_strategy, + ); tokio::spawn(task); handle } @@ -33,7 +39,7 @@ pub(crate) fn create_tcp_channel( connect_retry: Box, decode: DecodeLevel, listener: Box>, - logging_strategy: LoggingStrategy + logging_strategy: LoggingStrategy, ) -> (Channel, impl std::future::Future) { let (tx, rx) = tokio::sync::mpsc::channel(max_queued_requests); let task = async move { @@ -44,7 +50,7 @@ pub(crate) fn create_tcp_channel( connect_retry, decode, listener, - logging_strategy + logging_strategy, ) .run() .instrument(tracing::info_span!("Modbus-Client-TCP", endpoint = ?host)) @@ -79,7 +85,7 @@ pub(crate) struct TcpChannelTask { connection_handler: TcpTaskConnectionHandler, client_loop: ClientLoop, listener: Box>, - logging_strategy: LoggingStrategy, + _logging_strategy: LoggingStrategy, } impl TcpChannelTask { @@ -90,7 +96,7 @@ impl TcpChannelTask { connect_retry: Box, decode: DecodeLevel, listener: Box>, - logging_strategy: LoggingStrategy, + _logging_strategy: LoggingStrategy, ) -> Self { Self { host, @@ -98,7 +104,7 @@ impl TcpChannelTask { connection_handler, client_loop: ClientLoop::new(rx, FrameWriter::tcp(), FramedReader::tcp(), decode), listener, - logging_strategy + _logging_strategy, } } @@ -144,30 +150,13 @@ impl TcpChannelTask { match self.connect().await? { Err(err) => { let delay = self.connect_retry.after_failed_connect(); - // * we could do a listener.state == connected, then we log "move from connected to disconnected" - match self.logging_strategy { - LoggingStrategy::All => { - tracing::warn!( - "failed to connect to {}: {} - waiting {} ms before next attempt", - self.host, - err, - delay.as_millis() - ); - }, - LoggingStrategy::StateDriven => { - // match state { - // Some(x) => match x { - // ClientState::Disabled => todo!(), - // ClientState::Connecting => todo!(), - // ClientState::Connected => todo!(), - // ClientState::WaitAfterFailedConnect(_) => todo!(), - // ClientState::WaitAfterDisconnect(_) => todo!(), - // ClientState::Shutdown => todo!(), - // }, - // None => todo!(), - // } - }, - } + tracing::warn!( + "failed to connect to {}: {} - waiting {} ms before next attempt", + self.host, + err, + delay.as_millis() + ); + self.listener .update(ClientState::WaitAfterFailedConnect(delay)) .get() diff --git a/rodbus/src/tcp/tls/client.rs b/rodbus/src/tcp/tls/client.rs index 85c380e3..124170b3 100644 --- a/rodbus/src/tcp/tls/client.rs +++ b/rodbus/src/tcp/tls/client.rs @@ -60,7 +60,7 @@ pub(crate) fn create_tls_channel( connect_retry, decode, listener, - LoggingStrategy::All + LoggingStrategy::All, ) .run() .instrument(tracing::info_span!("Modbus-Client-TCP", endpoint = ?host)) diff --git a/rodbus/tests/integration_test.rs b/rodbus/tests/integration_test.rs index 7828b7d9..5ee6af1e 100644 --- a/rodbus/tests/integration_test.rs +++ b/rodbus/tests/integration_test.rs @@ -41,11 +41,6 @@ impl Listener for ClientStateListener { }; MaybeAsync::asynchronous(update) } - - fn get_value(&self) -> Option<&ClientState> { - self.as_ref() - } - } impl RequestHandler for Handler { From aeb22ce631b30f8aa3a8fc5511c7f95d0f01fd22 Mon Sep 17 00:00:00 2001 From: nslee333 Date: Mon, 6 Oct 2025 11:46:46 -0700 Subject: [PATCH 3/9] wip --- rodbus/src/tcp/client.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/rodbus/src/tcp/client.rs b/rodbus/src/tcp/client.rs index 461e03d3..44cd176d 100644 --- a/rodbus/src/tcp/client.rs +++ b/rodbus/src/tcp/client.rs @@ -144,8 +144,6 @@ impl TcpChannelTask { } async fn try_connect_and_run(&mut self) -> Result<(), StateChange> { - // let state = self.listener.get_value().clone(); - self.listener.update(ClientState::Connecting).get().await; match self.connect().await? { Err(err) => { @@ -156,7 +154,6 @@ impl TcpChannelTask { err, delay.as_millis() ); - self.listener .update(ClientState::WaitAfterFailedConnect(delay)) .get() @@ -165,7 +162,6 @@ impl TcpChannelTask { } Ok(socket) => { if let Ok(addr) = socket.peer_addr() { - // * if disconnected before, then state "connected" tracing::info!("connected to: {}", addr); } if let Err(err) = socket.set_nodelay(true) { @@ -174,9 +170,6 @@ impl TcpChannelTask { match self.connection_handler.handle(socket, &self.host).await { Err(err) => { let delay = self.connect_retry.after_failed_connect(); - - // match state {} - tracing::warn!( "{} - waiting {} ms before next attempt", err, From 122f88e2b6f2ece9eb599ee651c1144b916be7ac Mon Sep 17 00:00:00 2001 From: nslee333 Date: Mon, 6 Oct 2025 11:54:40 -0700 Subject: [PATCH 4/9] comment --- rodbus/src/client/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rodbus/src/client/mod.rs b/rodbus/src/client/mod.rs index ffc3c6bf..680647ff 100644 --- a/rodbus/src/client/mod.rs +++ b/rodbus/src/client/mod.rs @@ -127,7 +127,7 @@ pub fn spawn_tcp_client_task( /// * `retry` - A boxed trait object that controls when the connection is retried on failure /// * `decode` - Decode log level /// * `listener` - Optional callback to monitor the TCP connection state -/// * `logging_strategy` - An optional +/// * `logging_strategy` - An optional parameter for logging verbosity, `All` is default behavior, `StateDriven` is used for logging when changes in state occurs, like `Connected` to `WaitAfterDisconnect` /// /// `WARNING`: This function must be called from with the context of the Tokio runtime or it will panic. pub fn spawn_tcp_client_task_2( From 7e1a380eabb413ce71d20e4277f441b591d08fea Mon Sep 17 00:00:00 2001 From: nslee333 Date: Mon, 6 Oct 2025 15:10:08 -0700 Subject: [PATCH 5/9] wip --- rodbus/src/client/mod.rs | 19 +++++--------- rodbus/src/tcp/client.rs | 25 ++++++++---------- rodbus/src/tcp/tls/client.rs | 4 +-- rodbus/src/types.rs | 49 ++++++++++++++++++++++++++++++++++-- 4 files changed, 65 insertions(+), 32 deletions(-) diff --git a/rodbus/src/client/mod.rs b/rodbus/src/client/mod.rs index 680647ff..81a47ebf 100644 --- a/rodbus/src/client/mod.rs +++ b/rodbus/src/client/mod.rs @@ -17,7 +17,7 @@ pub use crate::client::channel::*; pub use crate::client::listener::*; pub use crate::client::requests::write_multiple::WriteMultiple; pub use crate::retry::*; -use crate::LoggingStrategy; +use crate::ClientOptions; #[cfg(feature = "ffi")] pub use ffi_channel::*; @@ -107,13 +107,12 @@ pub fn spawn_tcp_client_task( decode: DecodeLevel, listener: Option>>, ) -> Channel { + let options = ClientOptions::default().decode(decode).max_queued_requests(max_queued_requests); crate::tcp::client::spawn_tcp_channel( host, - max_queued_requests, retry, - decode, listener.unwrap_or_else(|| NullListener::create()), - LoggingStrategy::All, + options, ) } @@ -123,28 +122,22 @@ pub fn spawn_tcp_client_task( /// The channel uses the provided [`RetryStrategy`] to pause between failed connection attempts /// /// * `host` - Address/port of the remote server. Can be a IP address or name on which to perform DNS resolution. -/// * `max_queued_requests` - The maximum size of the request queue /// * `retry` - A boxed trait object that controls when the connection is retried on failure -/// * `decode` - Decode log level /// * `listener` - Optional callback to monitor the TCP connection state -/// * `logging_strategy` - An optional parameter for logging verbosity, `All` is default behavior, `StateDriven` is used for logging when changes in state occurs, like `Connected` to `WaitAfterDisconnect` +/// * `client_options` - A builder that contains various client options. /// /// `WARNING`: This function must be called from with the context of the Tokio runtime or it will panic. pub fn spawn_tcp_client_task_2( host: HostAddr, - max_queued_requests: usize, retry: Box, - decode: DecodeLevel, listener: Option>>, - logging_strategy: LoggingStrategy, + client_options: ClientOptions, ) -> Channel { crate::tcp::client::spawn_tcp_channel( host, - max_queued_requests, retry, - decode, listener.unwrap_or_else(|| NullListener::create()), - logging_strategy, + client_options, ) } diff --git a/rodbus/src/tcp/client.rs b/rodbus/src/tcp/client.rs index 44cd176d..dc8eec49 100644 --- a/rodbus/src/tcp/client.rs +++ b/rodbus/src/tcp/client.rs @@ -9,25 +9,21 @@ use crate::client::task::{ClientLoop, SessionError, StateChange}; use crate::common::frame::{FrameWriter, FramedReader}; use crate::error::Shutdown; use crate::retry::RetryStrategy; -use crate::LoggingStrategy; +use crate::{ClientOptions, ConnectionLoggingStrategy}; use tokio::net::TcpStream; pub(crate) fn spawn_tcp_channel( host: HostAddr, - max_queued_requests: usize, connect_retry: Box, - decode: DecodeLevel, listener: Box>, - logging_strategy: LoggingStrategy, + client_options: ClientOptions, ) -> Channel { let (handle, task) = create_tcp_channel( host, - max_queued_requests, connect_retry, - decode, listener, - logging_strategy, + client_options, ); tokio::spawn(task); handle @@ -35,22 +31,20 @@ pub(crate) fn spawn_tcp_channel( pub(crate) fn create_tcp_channel( host: HostAddr, - max_queued_requests: usize, connect_retry: Box, - decode: DecodeLevel, listener: Box>, - logging_strategy: LoggingStrategy, + options: ClientOptions, ) -> (Channel, impl std::future::Future) { - let (tx, rx) = tokio::sync::mpsc::channel(max_queued_requests); + let (tx, rx) = tokio::sync::mpsc::channel(options.max_queued_requests); let task = async move { TcpChannelTask::new( host.clone(), rx.into(), TcpTaskConnectionHandler::Tcp, connect_retry, - decode, + options.decode, listener, - logging_strategy, + options.connection_logging_strategy, ) .run() .instrument(tracing::info_span!("Modbus-Client-TCP", endpoint = ?host)) @@ -85,9 +79,10 @@ pub(crate) struct TcpChannelTask { connection_handler: TcpTaskConnectionHandler, client_loop: ClientLoop, listener: Box>, - _logging_strategy: LoggingStrategy, + _logging_strategy: ConnectionLoggingStrategy, } + impl TcpChannelTask { pub(crate) fn new( host: HostAddr, @@ -96,7 +91,7 @@ impl TcpChannelTask { connect_retry: Box, decode: DecodeLevel, listener: Box>, - _logging_strategy: LoggingStrategy, + _logging_strategy: ConnectionLoggingStrategy, ) -> Self { Self { host, diff --git a/rodbus/src/tcp/tls/client.rs b/rodbus/src/tcp/tls/client.rs index 124170b3..0c47d98d 100644 --- a/rodbus/src/tcp/tls/client.rs +++ b/rodbus/src/tcp/tls/client.rs @@ -15,7 +15,7 @@ use crate::common::phys::PhysLayer; use crate::tcp::client::{TcpChannelTask, TcpTaskConnectionHandler}; use crate::tcp::tls::{CertificateMode, MinTlsVersion, TlsError}; -use crate::{DecodeLevel, LoggingStrategy}; +use crate::{DecodeLevel, ConnectionLoggingStrategy}; /// TLS configuration pub struct TlsClientConfig { @@ -60,7 +60,7 @@ pub(crate) fn create_tls_channel( connect_retry, decode, listener, - LoggingStrategy::All, + ConnectionLoggingStrategy::All, ) .run() .instrument(tracing::info_span!("Modbus-Client-TCP", endpoint = ?host)) diff --git a/rodbus/src/types.rs b/rodbus/src/types.rs index 2e09be45..e8816b71 100644 --- a/rodbus/src/types.rs +++ b/rodbus/src/types.rs @@ -1,5 +1,6 @@ use crate::decode::AppDecodeLevel; use crate::error::{AduParseError, InvalidRange}; +use crate::DecodeLevel; use scursor::ReadCursor; @@ -377,19 +378,63 @@ impl Default for UnitId { /// How verbose to make the logging for connects & disconnects #[derive(Clone, Copy)] -pub enum LoggingStrategy { +pub enum ConnectionLoggingStrategy { /// Log it all All, /// Only log State changes StateDriven, } -impl Default for LoggingStrategy { +impl Default for ConnectionLoggingStrategy { fn default() -> Self { Self::All } } +/// A ClientOptions builder +#[derive(Copy, Clone)] +pub struct ClientOptions { + pub(crate) connection_logging_strategy: ConnectionLoggingStrategy, + pub(crate) max_queued_requests: usize, + pub(crate) decode: DecodeLevel, +} + +impl ClientOptions { + /// Builder function for the connection_logging_strategy field. + pub fn connection_logging(self, connection_logging_strategy: ConnectionLoggingStrategy) -> Self { + Self { + connection_logging_strategy, + ..self + } + } + + /// builder function for the max queued requests field + pub fn max_queued_requests(self, max_queued_requests: usize) -> Self { + Self { + max_queued_requests, + ..self + } + } + + /// builder function for the decode level field + pub fn decode(self, decode: DecodeLevel) -> Self { + Self { + decode, + ..self + } + } +} + +impl Default for ClientOptions { + fn default() -> Self { + Self { + connection_logging_strategy: ConnectionLoggingStrategy::default(), + max_queued_requests: 16, + decode: DecodeLevel::default(), + } + } +} + #[cfg(test)] mod tests { use crate::error::*; From a4c3303e0bfe22c315c91dc9ff177819bc89c543 Mon Sep 17 00:00:00 2001 From: nslee333 Date: Mon, 6 Oct 2025 15:14:35 -0700 Subject: [PATCH 6/9] clippy fix & fmt --- rodbus/src/client/mod.rs | 4 +++- rodbus/src/tcp/client.rs | 8 +------- rodbus/src/tcp/tls/client.rs | 2 +- rodbus/src/types.rs | 14 +++++++------- 4 files changed, 12 insertions(+), 16 deletions(-) diff --git a/rodbus/src/client/mod.rs b/rodbus/src/client/mod.rs index 81a47ebf..d6f77e1e 100644 --- a/rodbus/src/client/mod.rs +++ b/rodbus/src/client/mod.rs @@ -107,7 +107,9 @@ pub fn spawn_tcp_client_task( decode: DecodeLevel, listener: Option>>, ) -> Channel { - let options = ClientOptions::default().decode(decode).max_queued_requests(max_queued_requests); + let options = ClientOptions::default() + .decode(decode) + .max_queued_requests(max_queued_requests); crate::tcp::client::spawn_tcp_channel( host, retry, diff --git a/rodbus/src/tcp/client.rs b/rodbus/src/tcp/client.rs index dc8eec49..72e1e00e 100644 --- a/rodbus/src/tcp/client.rs +++ b/rodbus/src/tcp/client.rs @@ -19,12 +19,7 @@ pub(crate) fn spawn_tcp_channel( listener: Box>, client_options: ClientOptions, ) -> Channel { - let (handle, task) = create_tcp_channel( - host, - connect_retry, - listener, - client_options, - ); + let (handle, task) = create_tcp_channel(host, connect_retry, listener, client_options); tokio::spawn(task); handle } @@ -82,7 +77,6 @@ pub(crate) struct TcpChannelTask { _logging_strategy: ConnectionLoggingStrategy, } - impl TcpChannelTask { pub(crate) fn new( host: HostAddr, diff --git a/rodbus/src/tcp/tls/client.rs b/rodbus/src/tcp/tls/client.rs index 0c47d98d..5d6a0d79 100644 --- a/rodbus/src/tcp/tls/client.rs +++ b/rodbus/src/tcp/tls/client.rs @@ -15,7 +15,7 @@ use crate::common::phys::PhysLayer; use crate::tcp::client::{TcpChannelTask, TcpTaskConnectionHandler}; use crate::tcp::tls::{CertificateMode, MinTlsVersion, TlsError}; -use crate::{DecodeLevel, ConnectionLoggingStrategy}; +use crate::{ConnectionLoggingStrategy, DecodeLevel}; /// TLS configuration pub struct TlsClientConfig { diff --git a/rodbus/src/types.rs b/rodbus/src/types.rs index e8816b71..a0d10cbe 100644 --- a/rodbus/src/types.rs +++ b/rodbus/src/types.rs @@ -401,7 +401,10 @@ pub struct ClientOptions { impl ClientOptions { /// Builder function for the connection_logging_strategy field. - pub fn connection_logging(self, connection_logging_strategy: ConnectionLoggingStrategy) -> Self { + pub fn connection_logging( + self, + connection_logging_strategy: ConnectionLoggingStrategy, + ) -> Self { Self { connection_logging_strategy, ..self @@ -410,7 +413,7 @@ impl ClientOptions { /// builder function for the max queued requests field pub fn max_queued_requests(self, max_queued_requests: usize) -> Self { - Self { + Self { max_queued_requests, ..self } @@ -418,10 +421,7 @@ impl ClientOptions { /// builder function for the decode level field pub fn decode(self, decode: DecodeLevel) -> Self { - Self { - decode, - ..self - } + Self { decode, ..self } } } @@ -443,7 +443,7 @@ mod tests { #[test] fn address_start_max_count_of_one_is_allowed() { - AddressRange::try_from(std::u16::MAX, 1).unwrap(); + AddressRange::try_from(u16::MAX, 1).unwrap(); } #[test] From f803fbc74ebc82d4e3293bd2ac354efa4a2c6587 Mon Sep 17 00:00:00 2001 From: nslee333 Date: Mon, 6 Oct 2025 16:50:49 -0700 Subject: [PATCH 7/9] failed tcp connection --- rodbus/src/tcp/client.rs | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/rodbus/src/tcp/client.rs b/rodbus/src/tcp/client.rs index 72e1e00e..54e14c70 100644 --- a/rodbus/src/tcp/client.rs +++ b/rodbus/src/tcp/client.rs @@ -158,17 +158,7 @@ impl TcpChannelTask { } match self.connection_handler.handle(socket, &self.host).await { Err(err) => { - let delay = self.connect_retry.after_failed_connect(); - tracing::warn!( - "{} - waiting {} ms before next attempt", - err, - delay.as_millis() - ); - self.listener - .update(ClientState::WaitAfterFailedConnect(delay)) - .get() - .await; - self.client_loop.fail_requests_for(delay).await + self.failed_tcp_connection(err).await } Ok(mut phys) => { self.listener.update(ClientState::Connected).get().await; @@ -197,4 +187,18 @@ impl TcpChannelTask { } } } + + async fn failed_tcp_connection(&mut self, err: String) -> Result<(), StateChange> { + let delay = self.connect_retry.after_failed_connect(); + tracing::warn!( + "{} - waiting {} ms before next attempt", + err, + delay.as_millis() + ); + self.listener + .update(ClientState::WaitAfterFailedConnect(delay)) + .get() + .await; + self.client_loop.fail_requests_for(delay).await + } } From 3a2f7319d3662456d66651202b819581df902540 Mon Sep 17 00:00:00 2001 From: nslee333 Date: Mon, 6 Oct 2025 16:52:34 -0700 Subject: [PATCH 8/9] failed tcp stream connection --- rodbus/src/tcp/client.rs | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/rodbus/src/tcp/client.rs b/rodbus/src/tcp/client.rs index 54e14c70..3167a900 100644 --- a/rodbus/src/tcp/client.rs +++ b/rodbus/src/tcp/client.rs @@ -1,3 +1,5 @@ +use std::error::Error; + use tracing::Instrument; use crate::client::{Channel, ClientState, HostAddr, Listener}; @@ -136,18 +138,7 @@ impl TcpChannelTask { self.listener.update(ClientState::Connecting).get().await; match self.connect().await? { Err(err) => { - let delay = self.connect_retry.after_failed_connect(); - tracing::warn!( - "failed to connect to {}: {} - waiting {} ms before next attempt", - self.host, - err, - delay.as_millis() - ); - self.listener - .update(ClientState::WaitAfterFailedConnect(delay)) - .get() - .await; - self.client_loop.fail_requests_for(delay).await + self.failed_tcp_stream_connection(err).await } Ok(socket) => { if let Ok(addr) = socket.peer_addr() { @@ -201,4 +192,19 @@ impl TcpChannelTask { .await; self.client_loop.fail_requests_for(delay).await } + + async fn failed_tcp_stream_connection(&mut self, err: T) -> Result<(), Error> { + let delay = self.connect_retry.after_failed_connect(); + tracing::warn!( + "failed to connect to {}: {} - waiting {} ms before next attempt", + self.host, + err, + delay.as_millis() + ); + self.listener + .update(ClientState::WaitAfterFailedConnect(delay)) + .get() + .await; + self.client_loop.fail_requests_for(delay).await + } } From 7e963909b6cd25be805bc68f6f0ba81c65ce2f89 Mon Sep 17 00:00:00 2001 From: nslee333 Date: Mon, 6 Oct 2025 16:55:43 -0700 Subject: [PATCH 9/9] more refactoring --- rodbus/src/tcp/client.rs | 56 +++++++++++++++++++--------------------- 1 file changed, 26 insertions(+), 30 deletions(-) diff --git a/rodbus/src/tcp/client.rs b/rodbus/src/tcp/client.rs index 3167a900..d5e797ab 100644 --- a/rodbus/src/tcp/client.rs +++ b/rodbus/src/tcp/client.rs @@ -137,9 +137,7 @@ impl TcpChannelTask { async fn try_connect_and_run(&mut self) -> Result<(), StateChange> { self.listener.update(ClientState::Connecting).get().await; match self.connect().await? { - Err(err) => { - self.failed_tcp_stream_connection(err).await - } + Err(err) => self.failed_tcp_stream_connection(err).await, Ok(socket) => { if let Ok(addr) = socket.peer_addr() { tracing::info!("connected to: {}", addr); @@ -148,37 +146,35 @@ impl TcpChannelTask { tracing::warn!("unable to enable TCP_NODELAY: {}", err); } match self.connection_handler.handle(socket, &self.host).await { - Err(err) => { - self.failed_tcp_connection(err).await - } - Ok(mut phys) => { - self.listener.update(ClientState::Connected).get().await; - // reset the retry strategy now that we have a successful connection - // we do this here so that the reset happens after a TLS handshake - self.connect_retry.reset(); - // run the physical layer independent processing loop - match self.client_loop.run(&mut phys).await { - // the mpsc was closed, end the task - SessionError::Shutdown => Err(StateChange::Shutdown), - // re-establish the connection - SessionError::Disabled - | SessionError::IoError(_) - | SessionError::BadFrame => { - let delay = self.connect_retry.after_disconnect(); - tracing::warn!("waiting {:?} to reconnect", delay); - self.listener - .update(ClientState::WaitAfterDisconnect(delay)) - .get() - .await; - self.client_loop.fail_requests_for(delay).await - } - } - } + Err(err) => self.failed_tcp_connection(err).await, + Ok(phys) => self.connected(phys).await, } } } } + async fn connected(&mut self, mut phys: PhysLayer) -> Result<(), StateChange> { + self.listener.update(ClientState::Connected).get().await; + // reset the retry strategy now that we have a successful connection + // we do this here so that the reset happens after a TLS handshake + self.connect_retry.reset(); + // run the physical layer independent processing loop + match self.client_loop.run(&mut phys).await { + // the mpsc was closed, end the task + SessionError::Shutdown => Err(StateChange::Shutdown), + // re-establish the connection + SessionError::Disabled | SessionError::IoError(_) | SessionError::BadFrame => { + let delay = self.connect_retry.after_disconnect(); + tracing::warn!("waiting {:?} to reconnect", delay); + self.listener + .update(ClientState::WaitAfterDisconnect(delay)) + .get() + .await; + self.client_loop.fail_requests_for(delay).await + } + } + } + async fn failed_tcp_connection(&mut self, err: String) -> Result<(), StateChange> { let delay = self.connect_retry.after_failed_connect(); tracing::warn!( @@ -193,7 +189,7 @@ impl TcpChannelTask { self.client_loop.fail_requests_for(delay).await } - async fn failed_tcp_stream_connection(&mut self, err: T) -> Result<(), Error> { + async fn failed_tcp_stream_connection(&mut self, err: T) -> Result<(), StateChange> { let delay = self.connect_retry.after_failed_connect(); tracing::warn!( "failed to connect to {}: {} - waiting {} ms before next attempt",