diff --git a/ffi/rodbus-ffi/src/iterator.rs b/ffi/rodbus-ffi/src/iterator.rs index daf0f697..4a406336 100644 --- a/ffi/rodbus-ffi/src/iterator.rs +++ b/ffi/rodbus-ffi/src/iterator.rs @@ -30,7 +30,7 @@ impl<'a> RegisterValueIterator<'a> { } pub(crate) unsafe fn bit_value_iterator_next( - it: *mut crate::BitValueIterator, + it: *mut crate::BitValueIterator<'_>, ) -> Option<&crate::ffi::BitValue> { match it.as_mut() { Some(it) => match it.inner.next() { @@ -46,7 +46,7 @@ pub(crate) unsafe fn bit_value_iterator_next( } pub(crate) unsafe fn register_value_iterator_next( - it: *mut crate::RegisterValueIterator, + it: *mut crate::RegisterValueIterator<'_>, ) -> Option<&crate::ffi::RegisterValue> { match it.as_mut() { Some(it) => match it.inner.next() { diff --git a/rodbus/src/client/mod.rs b/rodbus/src/client/mod.rs index 32f1d8c5..d6f77e1e 100644 --- a/rodbus/src/client/mod.rs +++ b/rodbus/src/client/mod.rs @@ -17,6 +17,7 @@ pub use crate::client::channel::*; pub use crate::client::listener::*; pub use crate::client::requests::write_multiple::WriteMultiple; pub use crate::retry::*; +use crate::ClientOptions; #[cfg(feature = "ffi")] pub use ffi_channel::*; @@ -105,13 +106,40 @@ pub fn spawn_tcp_client_task( retry: Box, decode: DecodeLevel, listener: Option>>, +) -> Channel { + let options = ClientOptions::default() + .decode(decode) + .max_queued_requests(max_queued_requests); + crate::tcp::client::spawn_tcp_channel( + host, + retry, + listener.unwrap_or_else(|| NullListener::create()), + options, + ) +} + +/// Spawns a channel task onto the runtime that maintains a TCP connection and processes +/// requests. The task completes when the returned channel handle is dropped. +/// +/// The channel uses the provided [`RetryStrategy`] to pause between failed connection attempts +/// +/// * `host` - Address/port of the remote server. Can be a IP address or name on which to perform DNS resolution. +/// * `retry` - A boxed trait object that controls when the connection is retried on failure +/// * `listener` - Optional callback to monitor the TCP connection state +/// * `client_options` - A builder that contains various client options. +/// +/// `WARNING`: This function must be called from with the context of the Tokio runtime or it will panic. +pub fn spawn_tcp_client_task_2( + host: HostAddr, + retry: Box, + listener: Option>>, + client_options: ClientOptions, ) -> Channel { crate::tcp::client::spawn_tcp_channel( host, - max_queued_requests, retry, - decode, listener.unwrap_or_else(|| NullListener::create()), + client_options, ) } diff --git a/rodbus/src/tcp/client.rs b/rodbus/src/tcp/client.rs index 762ca38d..d5e797ab 100644 --- a/rodbus/src/tcp/client.rs +++ b/rodbus/src/tcp/client.rs @@ -1,3 +1,5 @@ +use std::error::Error; + use tracing::Instrument; use crate::client::{Channel, ClientState, HostAddr, Listener}; @@ -9,38 +11,37 @@ use crate::client::task::{ClientLoop, SessionError, StateChange}; use crate::common::frame::{FrameWriter, FramedReader}; use crate::error::Shutdown; use crate::retry::RetryStrategy; +use crate::{ClientOptions, ConnectionLoggingStrategy}; use tokio::net::TcpStream; pub(crate) fn spawn_tcp_channel( host: HostAddr, - max_queued_requests: usize, connect_retry: Box, - decode: DecodeLevel, listener: Box>, + client_options: ClientOptions, ) -> Channel { - let (handle, task) = - create_tcp_channel(host, max_queued_requests, connect_retry, decode, listener); + let (handle, task) = create_tcp_channel(host, connect_retry, listener, client_options); tokio::spawn(task); handle } pub(crate) fn create_tcp_channel( host: HostAddr, - max_queued_requests: usize, connect_retry: Box, - decode: DecodeLevel, listener: Box>, + options: ClientOptions, ) -> (Channel, impl std::future::Future) { - let (tx, rx) = tokio::sync::mpsc::channel(max_queued_requests); + let (tx, rx) = tokio::sync::mpsc::channel(options.max_queued_requests); let task = async move { TcpChannelTask::new( host.clone(), rx.into(), TcpTaskConnectionHandler::Tcp, connect_retry, - decode, + options.decode, listener, + options.connection_logging_strategy, ) .run() .instrument(tracing::info_span!("Modbus-Client-TCP", endpoint = ?host)) @@ -75,6 +76,7 @@ pub(crate) struct TcpChannelTask { connection_handler: TcpTaskConnectionHandler, client_loop: ClientLoop, listener: Box>, + _logging_strategy: ConnectionLoggingStrategy, } impl TcpChannelTask { @@ -85,6 +87,7 @@ impl TcpChannelTask { connect_retry: Box, decode: DecodeLevel, listener: Box>, + _logging_strategy: ConnectionLoggingStrategy, ) -> Self { Self { host, @@ -92,6 +95,7 @@ impl TcpChannelTask { connection_handler, client_loop: ClientLoop::new(rx, FrameWriter::tcp(), FramedReader::tcp(), decode), listener, + _logging_strategy, } } @@ -133,20 +137,7 @@ impl TcpChannelTask { async fn try_connect_and_run(&mut self) -> Result<(), StateChange> { self.listener.update(ClientState::Connecting).get().await; match self.connect().await? { - Err(err) => { - let delay = self.connect_retry.after_failed_connect(); - tracing::warn!( - "failed to connect to {}: {} - waiting {} ms before next attempt", - self.host, - err, - delay.as_millis() - ); - self.listener - .update(ClientState::WaitAfterFailedConnect(delay)) - .get() - .await; - self.client_loop.fail_requests_for(delay).await - } + Err(err) => self.failed_tcp_stream_connection(err).await, Ok(socket) => { if let Ok(addr) = socket.peer_addr() { tracing::info!("connected to: {}", addr); @@ -155,44 +146,61 @@ impl TcpChannelTask { tracing::warn!("unable to enable TCP_NODELAY: {}", err); } match self.connection_handler.handle(socket, &self.host).await { - Err(err) => { - let delay = self.connect_retry.after_failed_connect(); - tracing::warn!( - "{} - waiting {} ms before next attempt", - err, - delay.as_millis() - ); - self.listener - .update(ClientState::WaitAfterFailedConnect(delay)) - .get() - .await; - self.client_loop.fail_requests_for(delay).await - } - Ok(mut phys) => { - self.listener.update(ClientState::Connected).get().await; - // reset the retry strategy now that we have a successful connection - // we do this here so that the reset happens after a TLS handshake - self.connect_retry.reset(); - // run the physical layer independent processing loop - match self.client_loop.run(&mut phys).await { - // the mpsc was closed, end the task - SessionError::Shutdown => Err(StateChange::Shutdown), - // re-establish the connection - SessionError::Disabled - | SessionError::IoError(_) - | SessionError::BadFrame => { - let delay = self.connect_retry.after_disconnect(); - tracing::warn!("waiting {:?} to reconnect", delay); - self.listener - .update(ClientState::WaitAfterDisconnect(delay)) - .get() - .await; - self.client_loop.fail_requests_for(delay).await - } - } - } + Err(err) => self.failed_tcp_connection(err).await, + Ok(phys) => self.connected(phys).await, } } } } + + async fn connected(&mut self, mut phys: PhysLayer) -> Result<(), StateChange> { + self.listener.update(ClientState::Connected).get().await; + // reset the retry strategy now that we have a successful connection + // we do this here so that the reset happens after a TLS handshake + self.connect_retry.reset(); + // run the physical layer independent processing loop + match self.client_loop.run(&mut phys).await { + // the mpsc was closed, end the task + SessionError::Shutdown => Err(StateChange::Shutdown), + // re-establish the connection + SessionError::Disabled | SessionError::IoError(_) | SessionError::BadFrame => { + let delay = self.connect_retry.after_disconnect(); + tracing::warn!("waiting {:?} to reconnect", delay); + self.listener + .update(ClientState::WaitAfterDisconnect(delay)) + .get() + .await; + self.client_loop.fail_requests_for(delay).await + } + } + } + + async fn failed_tcp_connection(&mut self, err: String) -> Result<(), StateChange> { + let delay = self.connect_retry.after_failed_connect(); + tracing::warn!( + "{} - waiting {} ms before next attempt", + err, + delay.as_millis() + ); + self.listener + .update(ClientState::WaitAfterFailedConnect(delay)) + .get() + .await; + self.client_loop.fail_requests_for(delay).await + } + + async fn failed_tcp_stream_connection(&mut self, err: T) -> Result<(), StateChange> { + let delay = self.connect_retry.after_failed_connect(); + tracing::warn!( + "failed to connect to {}: {} - waiting {} ms before next attempt", + self.host, + err, + delay.as_millis() + ); + self.listener + .update(ClientState::WaitAfterFailedConnect(delay)) + .get() + .await; + self.client_loop.fail_requests_for(delay).await + } } diff --git a/rodbus/src/tcp/tls/client.rs b/rodbus/src/tcp/tls/client.rs index 963d2024..5d6a0d79 100644 --- a/rodbus/src/tcp/tls/client.rs +++ b/rodbus/src/tcp/tls/client.rs @@ -15,7 +15,7 @@ use crate::common::phys::PhysLayer; use crate::tcp::client::{TcpChannelTask, TcpTaskConnectionHandler}; use crate::tcp::tls::{CertificateMode, MinTlsVersion, TlsError}; -use crate::DecodeLevel; +use crate::{ConnectionLoggingStrategy, DecodeLevel}; /// TLS configuration pub struct TlsClientConfig { @@ -60,6 +60,7 @@ pub(crate) fn create_tls_channel( connect_retry, decode, listener, + ConnectionLoggingStrategy::All, ) .run() .instrument(tracing::info_span!("Modbus-Client-TCP", endpoint = ?host)) diff --git a/rodbus/src/types.rs b/rodbus/src/types.rs index e35a2489..a0d10cbe 100644 --- a/rodbus/src/types.rs +++ b/rodbus/src/types.rs @@ -1,5 +1,6 @@ use crate::decode::AppDecodeLevel; use crate::error::{AduParseError, InvalidRange}; +use crate::DecodeLevel; use scursor::ReadCursor; @@ -375,6 +376,65 @@ impl Default for UnitId { } } +/// How verbose to make the logging for connects & disconnects +#[derive(Clone, Copy)] +pub enum ConnectionLoggingStrategy { + /// Log it all + All, + /// Only log State changes + StateDriven, +} + +impl Default for ConnectionLoggingStrategy { + fn default() -> Self { + Self::All + } +} + +/// A ClientOptions builder +#[derive(Copy, Clone)] +pub struct ClientOptions { + pub(crate) connection_logging_strategy: ConnectionLoggingStrategy, + pub(crate) max_queued_requests: usize, + pub(crate) decode: DecodeLevel, +} + +impl ClientOptions { + /// Builder function for the connection_logging_strategy field. + pub fn connection_logging( + self, + connection_logging_strategy: ConnectionLoggingStrategy, + ) -> Self { + Self { + connection_logging_strategy, + ..self + } + } + + /// builder function for the max queued requests field + pub fn max_queued_requests(self, max_queued_requests: usize) -> Self { + Self { + max_queued_requests, + ..self + } + } + + /// builder function for the decode level field + pub fn decode(self, decode: DecodeLevel) -> Self { + Self { decode, ..self } + } +} + +impl Default for ClientOptions { + fn default() -> Self { + Self { + connection_logging_strategy: ConnectionLoggingStrategy::default(), + max_queued_requests: 16, + decode: DecodeLevel::default(), + } + } +} + #[cfg(test)] mod tests { use crate::error::*; @@ -383,7 +443,7 @@ mod tests { #[test] fn address_start_max_count_of_one_is_allowed() { - AddressRange::try_from(std::u16::MAX, 1).unwrap(); + AddressRange::try_from(u16::MAX, 1).unwrap(); } #[test]