From 2ed4ea586e94e303a7758e5c29ffe77a45401e1f Mon Sep 17 00:00:00 2001 From: nischit Date: Wed, 11 Feb 2026 13:48:16 +0545 Subject: [PATCH 1/4] redis tls support --- Cargo.toml | 2 +- README.md | 4 ++++ server/DOCKER.md | 6 ++++++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index b8fc7cd..fcff06d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,7 +98,7 @@ config = "0.15.11" aws-arn = "0.3.1" # Redis -redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager"] } +redis = { version = "0.31.0", features = ["connection-manager", "tls-rustls", "tls-rustls-webpki-roots", "tokio-rustls-comp"] } # Dev dependencies criterion = { version = "0.6", features = ["html_reports", "async_tokio"] } \ No newline at end of file diff --git a/README.md b/README.md index 41631d5..8884e48 100644 --- a/README.md +++ b/README.md @@ -147,6 +147,8 @@ thirdweb: redis: url: "redis://localhost:6379" +# For Redis over TLS, use the `rediss://` scheme: +# url: "rediss://localhost:6379" queue: webhook_workers: 50 @@ -166,6 +168,8 @@ export APP__QUEUE__LOCAL_CONCURRENCY=500 # Custom Redis configuration export APP__REDIS__URL="redis://redis-cluster:6379" +# For Redis over TLS, use the `rediss://` scheme: +# export APP__REDIS__URL="rediss://redis-cluster:6379" # Debug logging for development export RUST_LOG="thirdweb_engine=debug,twmq=debug" diff --git a/server/DOCKER.md b/server/DOCKER.md index e50a245..6f6e066 100644 --- a/server/DOCKER.md +++ b/server/DOCKER.md @@ -25,6 +25,8 @@ The following environment variables must be set when running the container: ```bash # Redis Configuration APP__REDIS__URL=redis://localhost:6379 +# For Redis over TLS, use the `rediss://` scheme: +# APP__REDIS__URL=rediss://localhost:6379 # Thirdweb Configuration APP__THIRDWEB__SECRET=your_secret_key_here @@ -68,6 +70,8 @@ Create a `.env` file with your configuration: ```bash # .env file APP__REDIS__URL=redis://localhost:6379 +# For Redis over TLS, use the `rediss://` scheme: +# APP__REDIS__URL=rediss://localhost:6379 APP__THIRDWEB__SECRET=your_secret_key_here APP__THIRDWEB__CLIENT_ID=your_client_id_here APP__THIRDWEB__URLS__RPC=https://your-rpc-url.com @@ -128,6 +132,8 @@ services: - "8080:8080" environment: - APP__REDIS__URL=redis://redis:6379 +# For Redis over TLS, use the `rediss://` scheme: +# - APP__REDIS__URL=rediss://redis:6379 - APP__THIRDWEB__SECRET=${APP__THIRDWEB__SECRET} - APP__THIRDWEB__CLIENT_ID=${APP__THIRDWEB__CLIENT_ID} - APP__THIRDWEB__URLS__RPC=${APP__THIRDWEB__URLS__RPC} From 11ff4dc824df5ed969bce09a2d41757a77616fd3 Mon Sep 17 00:00:00 2001 From: nischit Date: Wed, 11 Feb 2026 14:25:12 +0545 Subject: [PATCH 2/4] minor change --- Cargo.toml | 4 ++++ twmq/Cargo.toml | 1 + 2 files changed, 5 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index fcff06d..bf3a2e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -100,5 +100,9 @@ aws-arn = "0.3.1" # Redis redis = { version = "0.31.0", features = ["connection-manager", "tls-rustls", "tls-rustls-webpki-roots", "tokio-rustls-comp"] } +# Rustls (required for TLS crypto provider selection). +# Rustls 0.23 requires exactly one crypto provider feature (ring or aws-lc-rs). +rustls = { version = "0.23.32", default-features = false, features = ["ring"] } + # Dev dependencies criterion = { version = "0.6", features = ["html_reports", "async_tokio"] } \ No newline at end of file diff --git a/twmq/Cargo.toml b/twmq/Cargo.toml index 9d40caa..3f6227f 100644 --- a/twmq/Cargo.toml +++ b/twmq/Cargo.toml @@ -14,6 +14,7 @@ thiserror = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] } futures = { workspace = true } +rustls = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["full"] } From 26914f3379dc13651c48bc11b4663776184a471f Mon Sep 17 00:00:00 2001 From: nischit Date: Wed, 11 Feb 2026 14:50:03 +0545 Subject: [PATCH 3/4] tls update --- server/Cargo.toml | 3 ++- server/src/main.rs | 8 ++++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index d568ffb..f368839 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -50,4 +50,5 @@ aws-arn = { workspace = true } moka = { workspace = true } engine-eip7702-core = { path = "../eip7702-core" } prometheus = { workspace = true } -thiserror = { workspace = true } \ No newline at end of file +thiserror = { workspace = true } +rustls = { workspace = true } \ No newline at end of file diff --git a/server/src/main.rs b/server/src/main.rs index 4376aec..e6dd8a7 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -70,6 +70,14 @@ async fn main() -> anyhow::Result<()> { }); let eoa_signer = Arc::new(EoaSigner::new(vault_client.clone(), iaw_client.clone())); let solana_signer = Arc::new(SolanaSigner::new(vault_client.clone(), iaw_client)); + + // Rustls 0.23 requires selecting a process-level CryptoProvider (ring or aws-lc-rs) + // before any TLS client configuration is created (e.g. when using `rediss://`). + // If another crate already installed a provider, this will be a no-op error. + if let Err(e) = rustls::crypto::ring::default_provider().install_default() { + tracing::debug!(error = ?e, "Rustls CryptoProvider already installed"); + } + let redis_client = twmq::redis::Client::open(config.redis.url.as_str())?; let authorization_cache = EoaAuthorizationCache::new( From cb2a91cad4796c2f0e5c1fe63a1c4ccaed7f76a3 Mon Sep 17 00:00:00 2001 From: nischit Date: Wed, 11 Feb 2026 15:24:49 +0545 Subject: [PATCH 4/4] valkey hashtag --- twmq/src/lib.rs | 47 ++++++++++++++------- twmq/src/multilane.rs | 65 ++++++++++++++++++++++------- twmq/tests/basic.rs | 3 +- twmq/tests/basic_hook.rs | 3 +- twmq/tests/delay.rs | 3 +- twmq/tests/idempotency_modes.rs | 3 +- twmq/tests/lease_expiry.rs | 3 +- twmq/tests/multilane_batch_pop.rs | 5 ++- twmq/tests/nack.rs | 3 +- twmq/tests/prune_race_condition.rs | 3 +- twmq/tests/prune_race_random_ids.rs | 5 ++- 11 files changed, 100 insertions(+), 43 deletions(-) diff --git a/twmq/src/lib.rs b/twmq/src/lib.rs index 164d234..da022ba 100644 --- a/twmq/src/lib.rs +++ b/twmq/src/lib.rs @@ -175,52 +175,63 @@ impl Queue { &self.name } + /// Redis Cluster hash tag used to keep all queue keys in the same slot. + /// See: https://redis.io/docs/latest/operate/oss_and_stack/reference/cluster-spec/#hash-tags + fn redis_hash_tag(&self) -> String { + format!("{{{}}}", self.name()) + } + pub fn pending_list_name(&self) -> String { - format!("twmq:{}:pending", self.name()) + format!("twmq:{}:pending", self.redis_hash_tag()) } pub fn active_hash_name(&self) -> String { - format!("twmq:{}:active", self.name) + format!("twmq:{}:active", self.redis_hash_tag()) } pub fn delayed_zset_name(&self) -> String { - format!("twmq:{}:delayed", self.name) + format!("twmq:{}:delayed", self.redis_hash_tag()) } pub fn success_list_name(&self) -> String { - format!("twmq:{}:success", self.name) + format!("twmq:{}:success", self.redis_hash_tag()) } pub fn failed_list_name(&self) -> String { - format!("twmq:{}:failed", self.name) + format!("twmq:{}:failed", self.redis_hash_tag()) } pub fn job_data_hash_name(&self) -> String { - format!("twmq:{}:jobs:data", self.name) + format!("twmq:{}:jobs:data", self.redis_hash_tag()) } pub fn job_meta_hash_name(&self, job_id: &str) -> String { - format!("twmq:{}:job:{}:meta", self.name, job_id) + format!("twmq:{}:job:{}:meta", self.redis_hash_tag(), job_id) } pub fn job_errors_list_name(&self, job_id: &str) -> String { - format!("twmq:{}:job:{}:errors", self.name, job_id) + format!("twmq:{}:job:{}:errors", self.redis_hash_tag(), job_id) } pub fn job_result_hash_name(&self) -> String { - format!("twmq:{}:jobs:result", self.name) + format!("twmq:{}:jobs:result", self.redis_hash_tag()) } pub fn dedupe_set_name(&self) -> String { - format!("twmq:{}:dedup", self.name) + format!("twmq:{}:dedup", self.redis_hash_tag()) } pub fn pending_cancellation_set_name(&self) -> String { - format!("twmq:{}:pending_cancellations", self.name) + format!("twmq:{}:pending_cancellations", self.redis_hash_tag()) } pub fn lease_key_name(&self, job_id: &str, lease_token: &str) -> String { - format!("twmq:{}:job:{}:lease:{}", self.name, job_id, lease_token) + format!( + "twmq:{}:job:{}:lease:{}", + self.redis_hash_tag(), + job_id, + lease_token + ) } pub async fn push( @@ -301,7 +312,8 @@ impl Queue { let position_string = delay.position.to_string(); let _result: (i32, String) = script - .key(&self.name) + // Redis Cluster: all KEYS must be in the same slot + .key(self.redis_hash_tag()) .key(self.delayed_zset_name()) .key(self.pending_list_name()) .key(self.job_data_hash_name()) @@ -742,7 +754,8 @@ impl Queue { Vec, Vec, ) = script - .key(self.name()) + // Redis Cluster: all KEYS must be in the same slot + .key(self.redis_hash_tag()) .key(self.delayed_zset_name()) .key(self.pending_list_name()) .key(self.active_hash_name()) @@ -990,7 +1003,8 @@ impl Queue { ); let trimmed_count: usize = trim_script - .key(self.name()) + // Redis Cluster: all KEYS must be in the same slot + .key(self.redis_hash_tag()) .key(self.success_list_name()) .key(self.job_data_hash_name()) .key(self.job_result_hash_name()) // results_hash @@ -1168,7 +1182,8 @@ impl Queue { ); let trimmed_count: usize = trim_script - .key(self.name()) + // Redis Cluster: all KEYS must be in the same slot + .key(self.redis_hash_tag()) .key(self.failed_list_name()) .key(self.job_data_hash_name()) .key(self.dedupe_set_name()) diff --git a/twmq/src/multilane.rs b/twmq/src/multilane.rs index 5921e0a..a965e5d 100644 --- a/twmq/src/multilane.rs +++ b/twmq/src/multilane.rs @@ -84,59 +84,89 @@ impl MultilaneQueue { &self.queue_id } + /// Redis Cluster hash tag used to keep all multilane keys in the same slot. + fn redis_hash_tag(&self) -> String { + format!("{{{}}}", self.queue_id()) + } + // Redis key naming methods with proper multilane namespacing pub fn lanes_zset_name(&self) -> String { - format!("twmq_multilane:{}:lanes", self.queue_id) + format!("twmq_multilane:{}:lanes", self.redis_hash_tag()) } pub fn lane_pending_list_name(&self, lane_id: &str) -> String { - format!("twmq_multilane:{}:lane:{}:pending", self.queue_id, lane_id) + format!( + "twmq_multilane:{}:lane:{}:pending", + self.redis_hash_tag(), + lane_id + ) } pub fn lane_delayed_zset_name(&self, lane_id: &str) -> String { - format!("twmq_multilane:{}:lane:{}:delayed", self.queue_id, lane_id) + format!( + "twmq_multilane:{}:lane:{}:delayed", + self.redis_hash_tag(), + lane_id + ) } pub fn lane_active_hash_name(&self, lane_id: &str) -> String { - format!("twmq_multilane:{}:lane:{}:active", self.queue_id, lane_id) + format!( + "twmq_multilane:{}:lane:{}:active", + self.redis_hash_tag(), + lane_id + ) } pub fn success_list_name(&self) -> String { - format!("twmq_multilane:{}:success", self.queue_id) + format!("twmq_multilane:{}:success", self.redis_hash_tag()) } pub fn failed_list_name(&self) -> String { - format!("twmq_multilane:{}:failed", self.queue_id) + format!("twmq_multilane:{}:failed", self.redis_hash_tag()) } pub fn job_data_hash_name(&self) -> String { - format!("twmq_multilane:{}:jobs:data", self.queue_id) + format!("twmq_multilane:{}:jobs:data", self.redis_hash_tag()) } pub fn job_meta_hash_name(&self, job_id: &str) -> String { - format!("twmq_multilane:{}:job:{}:meta", self.queue_id, job_id) + format!( + "twmq_multilane:{}:job:{}:meta", + self.redis_hash_tag(), + job_id + ) } pub fn job_errors_list_name(&self, job_id: &str) -> String { - format!("twmq_multilane:{}:job:{}:errors", self.queue_id, job_id) + format!( + "twmq_multilane:{}:job:{}:errors", + self.redis_hash_tag(), + job_id + ) } pub fn job_result_hash_name(&self) -> String { - format!("twmq_multilane:{}:jobs:result", self.queue_id) + format!("twmq_multilane:{}:jobs:result", self.redis_hash_tag()) } pub fn dedupe_set_name(&self) -> String { - format!("twmq_multilane:{}:dedup", self.queue_id) + format!("twmq_multilane:{}:dedup", self.redis_hash_tag()) } pub fn pending_cancellation_set_name(&self) -> String { - format!("twmq_multilane:{}:pending_cancellations", self.queue_id) + format!( + "twmq_multilane:{}:pending_cancellations", + self.redis_hash_tag() + ) } pub fn lease_key_name(&self, job_id: &str, lease_token: &str) -> String { format!( "twmq_multilane:{}:job:{}:lease:{}", - self.queue_id, job_id, lease_token + self.redis_hash_tag(), + job_id, + lease_token ) } @@ -229,7 +259,8 @@ impl MultilaneQueue { .key(self.job_data_hash_name()) .key(self.job_meta_hash_name(&job.id)) .key(self.dedupe_set_name()) - .arg(&self.queue_id) + // Redis Cluster: ensure constructed keys match hash-tagged names + .arg(self.redis_hash_tag()) .arg(lane_id) .arg(&job_options.id) .arg(job_data) @@ -414,7 +445,8 @@ impl MultilaneQueue { .key(self.pending_cancellation_set_name()) .key(self.job_meta_hash_name(job_id)) .key(self.job_data_hash_name()) - .arg(&self.queue_id) + // Redis Cluster: ensure constructed keys match hash-tagged names + .arg(self.redis_hash_tag()) .arg(job_id) .arg(now) .invoke_async(&mut self.redis.clone()) @@ -760,7 +792,8 @@ impl MultilaneQueue { .key(self.pending_cancellation_set_name()) .key(self.failed_list_name()) .key(self.success_list_name()) - .arg(&self.queue_id) + // Redis Cluster: ensure constructed keys match hash-tagged names + .arg(self.redis_hash_tag()) .arg(now) .arg(batch_size) .arg(self.options.lease_duration.as_secs()) diff --git a/twmq/tests/basic.rs b/twmq/tests/basic.rs index 0c5d2f6..66eae0b 100644 --- a/twmq/tests/basic.rs +++ b/twmq/tests/basic.rs @@ -18,7 +18,8 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys for a given queue name pattern async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + // twmq queue keys are hash-tagged for Redis Cluster compatibility + let keys_pattern = format!("twmq:{{{queue_name}}}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/basic_hook.rs b/twmq/tests/basic_hook.rs index 0306ac2..73435ce 100644 --- a/twmq/tests/basic_hook.rs +++ b/twmq/tests/basic_hook.rs @@ -23,7 +23,8 @@ use twmq::{ // Helper to clean up Redis keys for a given queue name pattern async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + // twmq queue keys are hash-tagged for Redis Cluster compatibility + let keys_pattern = format!("twmq:{{{queue_name}}}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/delay.rs b/twmq/tests/delay.rs index 5245188..1c74ac1 100644 --- a/twmq/tests/delay.rs +++ b/twmq/tests/delay.rs @@ -23,7 +23,8 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + // twmq queue keys are hash-tagged for Redis Cluster compatibility + let keys_pattern = format!("twmq:{{{queue_name}}}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/idempotency_modes.rs b/twmq/tests/idempotency_modes.rs index ce1d0f3..7799e6f 100644 --- a/twmq/tests/idempotency_modes.rs +++ b/twmq/tests/idempotency_modes.rs @@ -68,7 +68,8 @@ impl DurableExecution for TestJobHandler { // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + // twmq queue keys are hash-tagged for Redis Cluster compatibility + let keys_pattern = format!("twmq:{{{queue_name}}}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/lease_expiry.rs b/twmq/tests/lease_expiry.rs index 773c100..5822c66 100644 --- a/twmq/tests/lease_expiry.rs +++ b/twmq/tests/lease_expiry.rs @@ -23,7 +23,8 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + // twmq queue keys are hash-tagged for Redis Cluster compatibility + let keys_pattern = format!("twmq:{{{queue_name}}}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/multilane_batch_pop.rs b/twmq/tests/multilane_batch_pop.rs index 3f8c398..189ce21 100644 --- a/twmq/tests/multilane_batch_pop.rs +++ b/twmq/tests/multilane_batch_pop.rs @@ -90,7 +90,8 @@ impl MultilaneTestHarness { /// Clean up all Redis keys for this test async fn cleanup(&self) { let mut conn = self.queue.redis.clone(); - let keys_pattern = format!("twmq_multilane:{}:*", self.queue_id); + // twmq multilane keys are hash-tagged for Redis Cluster compatibility + let keys_pattern = format!("twmq_multilane:{{{}}}:*", self.queue_id); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) @@ -165,7 +166,7 @@ impl Drop for MultilaneTestHarness { tokio::spawn(async move { let mut conn = redis; - let keys_pattern = format!("twmq_multilane:{queue_id}:*"); + let keys_pattern = format!("twmq_multilane:{{{queue_id}}}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/nack.rs b/twmq/tests/nack.rs index f66cd9f..8b0fd39 100644 --- a/twmq/tests/nack.rs +++ b/twmq/tests/nack.rs @@ -26,7 +26,8 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/"; // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + // twmq queue keys are hash-tagged for Redis Cluster compatibility + let keys_pattern = format!("twmq:{{{queue_name}}}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) .query_async(&mut conn) diff --git a/twmq/tests/prune_race_condition.rs b/twmq/tests/prune_race_condition.rs index c9898f9..3245cc8 100644 --- a/twmq/tests/prune_race_condition.rs +++ b/twmq/tests/prune_race_condition.rs @@ -148,7 +148,8 @@ impl DurableExecution for EoaSimulatorJobHandler { // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + // twmq queue keys are hash-tagged for Redis Cluster compatibility + let keys_pattern = format!("twmq:{{{queue_name}}}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) diff --git a/twmq/tests/prune_race_random_ids.rs b/twmq/tests/prune_race_random_ids.rs index fd5e9ea..b09ad89 100644 --- a/twmq/tests/prune_race_random_ids.rs +++ b/twmq/tests/prune_race_random_ids.rs @@ -123,7 +123,8 @@ impl DurableExecution for RandomJobHandler { // Helper to clean up Redis keys async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) { let mut conn = conn_manager.clone(); - let keys_pattern = format!("twmq:{queue_name}:*"); + // twmq queue keys are hash-tagged for Redis Cluster compatibility + let keys_pattern = format!("twmq:{{{queue_name}}}:*"); let keys: Vec = redis::cmd("KEYS") .arg(&keys_pattern) @@ -239,7 +240,7 @@ async fn test_prune_with_random_ids() { let success_job_ids: Vec = conn.lrange(queue.success_list_name(), 0, -1).await.unwrap(); // Count how many job metadata hashes still exist (should match success list length if pruning works) - let meta_pattern = format!("twmq:{}:job:*:meta", queue.name()); + let meta_pattern = format!("twmq:{{{}}}:job:*:meta", queue.name()); let meta_keys: Vec = redis::cmd("KEYS") .arg(&meta_pattern) .query_async(&mut conn)