Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ config = "0.15.11"
aws-arn = "0.3.1"

# Redis
redis = { version = "0.31.0", features = ["tokio-comp", "connection-manager"] }
redis = { version = "0.31.0", features = ["connection-manager", "tls-rustls", "tls-rustls-webpki-roots", "tokio-rustls-comp"] }

# Rustls (required for TLS crypto provider selection).
# Rustls 0.23 requires exactly one crypto provider feature (ring or aws-lc-rs).
rustls = { version = "0.23.32", default-features = false, features = ["ring"] }

# Dev dependencies
criterion = { version = "0.6", features = ["html_reports", "async_tokio"] }
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ thirdweb:

redis:
url: "redis://localhost:6379"
# For Redis over TLS, use the `rediss://` scheme:
# url: "rediss://localhost:6379"

queue:
webhook_workers: 50
Expand All @@ -166,6 +168,8 @@ export APP__QUEUE__LOCAL_CONCURRENCY=500

# Custom Redis configuration
export APP__REDIS__URL="redis://redis-cluster:6379"
# For Redis over TLS, use the `rediss://` scheme:
# export APP__REDIS__URL="rediss://redis-cluster:6379"

# Debug logging for development
export RUST_LOG="thirdweb_engine=debug,twmq=debug"
Expand Down
3 changes: 2 additions & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,5 @@ aws-arn = { workspace = true }
moka = { workspace = true }
engine-eip7702-core = { path = "../eip7702-core" }
prometheus = { workspace = true }
thiserror = { workspace = true }
thiserror = { workspace = true }
rustls = { workspace = true }
6 changes: 6 additions & 0 deletions server/DOCKER.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ The following environment variables must be set when running the container:
```bash
# Redis Configuration
APP__REDIS__URL=redis://localhost:6379
# For Redis over TLS, use the `rediss://` scheme:
# APP__REDIS__URL=rediss://localhost:6379

# Thirdweb Configuration
APP__THIRDWEB__SECRET=your_secret_key_here
Expand Down Expand Up @@ -68,6 +70,8 @@ Create a `.env` file with your configuration:
```bash
# .env file
APP__REDIS__URL=redis://localhost:6379
# For Redis over TLS, use the `rediss://` scheme:
# APP__REDIS__URL=rediss://localhost:6379
APP__THIRDWEB__SECRET=your_secret_key_here
APP__THIRDWEB__CLIENT_ID=your_client_id_here
APP__THIRDWEB__URLS__RPC=https://your-rpc-url.com
Expand Down Expand Up @@ -128,6 +132,8 @@ services:
- "8080:8080"
environment:
- APP__REDIS__URL=redis://redis:6379
# For Redis over TLS, use the `rediss://` scheme:
# - APP__REDIS__URL=rediss://redis:6379
- APP__THIRDWEB__SECRET=${APP__THIRDWEB__SECRET}
- APP__THIRDWEB__CLIENT_ID=${APP__THIRDWEB__CLIENT_ID}
- APP__THIRDWEB__URLS__RPC=${APP__THIRDWEB__URLS__RPC}
Expand Down
8 changes: 8 additions & 0 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ async fn main() -> anyhow::Result<()> {
});
let eoa_signer = Arc::new(EoaSigner::new(vault_client.clone(), iaw_client.clone()));
let solana_signer = Arc::new(SolanaSigner::new(vault_client.clone(), iaw_client));

// Rustls 0.23 requires selecting a process-level CryptoProvider (ring or aws-lc-rs)
// before any TLS client configuration is created (e.g. when using `rediss://`).
// If another crate already installed a provider, this will be a no-op error.
if let Err(e) = rustls::crypto::ring::default_provider().install_default() {
tracing::debug!(error = ?e, "Rustls CryptoProvider already installed");
}

let redis_client = twmq::redis::Client::open(config.redis.url.as_str())?;

let authorization_cache = EoaAuthorizationCache::new(
Expand Down
1 change: 1 addition & 0 deletions twmq/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ thiserror = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] }
futures = { workspace = true }
rustls = { workspace = true }

[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
Expand Down
47 changes: 31 additions & 16 deletions twmq/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,52 +175,63 @@ impl<H: DurableExecution> Queue<H> {
&self.name
}

/// Redis Cluster hash tag used to keep all queue keys in the same slot.
/// See: https://redis.io/docs/latest/operate/oss_and_stack/reference/cluster-spec/#hash-tags
fn redis_hash_tag(&self) -> String {
format!("{{{}}}", self.name())
}

pub fn pending_list_name(&self) -> String {
format!("twmq:{}:pending", self.name())
format!("twmq:{}:pending", self.redis_hash_tag())
}

pub fn active_hash_name(&self) -> String {
format!("twmq:{}:active", self.name)
format!("twmq:{}:active", self.redis_hash_tag())
}

pub fn delayed_zset_name(&self) -> String {
format!("twmq:{}:delayed", self.name)
format!("twmq:{}:delayed", self.redis_hash_tag())
}

pub fn success_list_name(&self) -> String {
format!("twmq:{}:success", self.name)
format!("twmq:{}:success", self.redis_hash_tag())
}

pub fn failed_list_name(&self) -> String {
format!("twmq:{}:failed", self.name)
format!("twmq:{}:failed", self.redis_hash_tag())
}

pub fn job_data_hash_name(&self) -> String {
format!("twmq:{}:jobs:data", self.name)
format!("twmq:{}:jobs:data", self.redis_hash_tag())
}

pub fn job_meta_hash_name(&self, job_id: &str) -> String {
format!("twmq:{}:job:{}:meta", self.name, job_id)
format!("twmq:{}:job:{}:meta", self.redis_hash_tag(), job_id)
}

pub fn job_errors_list_name(&self, job_id: &str) -> String {
format!("twmq:{}:job:{}:errors", self.name, job_id)
format!("twmq:{}:job:{}:errors", self.redis_hash_tag(), job_id)
}

pub fn job_result_hash_name(&self) -> String {
format!("twmq:{}:jobs:result", self.name)
format!("twmq:{}:jobs:result", self.redis_hash_tag())
}

pub fn dedupe_set_name(&self) -> String {
format!("twmq:{}:dedup", self.name)
format!("twmq:{}:dedup", self.redis_hash_tag())
}

pub fn pending_cancellation_set_name(&self) -> String {
format!("twmq:{}:pending_cancellations", self.name)
format!("twmq:{}:pending_cancellations", self.redis_hash_tag())
}

pub fn lease_key_name(&self, job_id: &str, lease_token: &str) -> String {
format!("twmq:{}:job:{}:lease:{}", self.name, job_id, lease_token)
format!(
"twmq:{}:job:{}:lease:{}",
self.redis_hash_tag(),
job_id,
lease_token
)
}

pub async fn push(
Expand Down Expand Up @@ -301,7 +312,8 @@ impl<H: DurableExecution> Queue<H> {
let position_string = delay.position.to_string();

let _result: (i32, String) = script
.key(&self.name)
// Redis Cluster: all KEYS must be in the same slot
.key(self.redis_hash_tag())
.key(self.delayed_zset_name())
.key(self.pending_list_name())
.key(self.job_data_hash_name())
Expand Down Expand Up @@ -742,7 +754,8 @@ impl<H: DurableExecution> Queue<H> {
Vec<String>,
Vec<String>,
) = script
.key(self.name())
// Redis Cluster: all KEYS must be in the same slot
.key(self.redis_hash_tag())
.key(self.delayed_zset_name())
.key(self.pending_list_name())
.key(self.active_hash_name())
Expand Down Expand Up @@ -990,7 +1003,8 @@ impl<H: DurableExecution> Queue<H> {
);

let trimmed_count: usize = trim_script
.key(self.name())
// Redis Cluster: all KEYS must be in the same slot
.key(self.redis_hash_tag())
.key(self.success_list_name())
.key(self.job_data_hash_name())
.key(self.job_result_hash_name()) // results_hash
Expand Down Expand Up @@ -1168,7 +1182,8 @@ impl<H: DurableExecution> Queue<H> {
);

let trimmed_count: usize = trim_script
.key(self.name())
// Redis Cluster: all KEYS must be in the same slot
.key(self.redis_hash_tag())
.key(self.failed_list_name())
.key(self.job_data_hash_name())
.key(self.dedupe_set_name())
Expand Down
65 changes: 49 additions & 16 deletions twmq/src/multilane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,59 +84,89 @@ impl<H: DurableExecution> MultilaneQueue<H> {
&self.queue_id
}

/// Redis Cluster hash tag used to keep all multilane keys in the same slot.
fn redis_hash_tag(&self) -> String {
format!("{{{}}}", self.queue_id())
}

// Redis key naming methods with proper multilane namespacing
pub fn lanes_zset_name(&self) -> String {
format!("twmq_multilane:{}:lanes", self.queue_id)
format!("twmq_multilane:{}:lanes", self.redis_hash_tag())
}

pub fn lane_pending_list_name(&self, lane_id: &str) -> String {
format!("twmq_multilane:{}:lane:{}:pending", self.queue_id, lane_id)
format!(
"twmq_multilane:{}:lane:{}:pending",
self.redis_hash_tag(),
lane_id
)
}

pub fn lane_delayed_zset_name(&self, lane_id: &str) -> String {
format!("twmq_multilane:{}:lane:{}:delayed", self.queue_id, lane_id)
format!(
"twmq_multilane:{}:lane:{}:delayed",
self.redis_hash_tag(),
lane_id
)
}

pub fn lane_active_hash_name(&self, lane_id: &str) -> String {
format!("twmq_multilane:{}:lane:{}:active", self.queue_id, lane_id)
format!(
"twmq_multilane:{}:lane:{}:active",
self.redis_hash_tag(),
lane_id
)
}

pub fn success_list_name(&self) -> String {
format!("twmq_multilane:{}:success", self.queue_id)
format!("twmq_multilane:{}:success", self.redis_hash_tag())
}

pub fn failed_list_name(&self) -> String {
format!("twmq_multilane:{}:failed", self.queue_id)
format!("twmq_multilane:{}:failed", self.redis_hash_tag())
}

pub fn job_data_hash_name(&self) -> String {
format!("twmq_multilane:{}:jobs:data", self.queue_id)
format!("twmq_multilane:{}:jobs:data", self.redis_hash_tag())
}

pub fn job_meta_hash_name(&self, job_id: &str) -> String {
format!("twmq_multilane:{}:job:{}:meta", self.queue_id, job_id)
format!(
"twmq_multilane:{}:job:{}:meta",
self.redis_hash_tag(),
job_id
)
}

pub fn job_errors_list_name(&self, job_id: &str) -> String {
format!("twmq_multilane:{}:job:{}:errors", self.queue_id, job_id)
format!(
"twmq_multilane:{}:job:{}:errors",
self.redis_hash_tag(),
job_id
)
}

pub fn job_result_hash_name(&self) -> String {
format!("twmq_multilane:{}:jobs:result", self.queue_id)
format!("twmq_multilane:{}:jobs:result", self.redis_hash_tag())
}

pub fn dedupe_set_name(&self) -> String {
format!("twmq_multilane:{}:dedup", self.queue_id)
format!("twmq_multilane:{}:dedup", self.redis_hash_tag())
}

pub fn pending_cancellation_set_name(&self) -> String {
format!("twmq_multilane:{}:pending_cancellations", self.queue_id)
format!(
"twmq_multilane:{}:pending_cancellations",
self.redis_hash_tag()
)
}

pub fn lease_key_name(&self, job_id: &str, lease_token: &str) -> String {
format!(
"twmq_multilane:{}:job:{}:lease:{}",
self.queue_id, job_id, lease_token
self.redis_hash_tag(),
job_id,
lease_token
)
}

Expand Down Expand Up @@ -229,7 +259,8 @@ impl<H: DurableExecution> MultilaneQueue<H> {
.key(self.job_data_hash_name())
.key(self.job_meta_hash_name(&job.id))
.key(self.dedupe_set_name())
.arg(&self.queue_id)
// Redis Cluster: ensure constructed keys match hash-tagged names
.arg(self.redis_hash_tag())
.arg(lane_id)
.arg(&job_options.id)
.arg(job_data)
Expand Down Expand Up @@ -414,7 +445,8 @@ impl<H: DurableExecution> MultilaneQueue<H> {
.key(self.pending_cancellation_set_name())
.key(self.job_meta_hash_name(job_id))
.key(self.job_data_hash_name())
.arg(&self.queue_id)
// Redis Cluster: ensure constructed keys match hash-tagged names
.arg(self.redis_hash_tag())
.arg(job_id)
.arg(now)
.invoke_async(&mut self.redis.clone())
Expand Down Expand Up @@ -760,7 +792,8 @@ impl<H: DurableExecution> MultilaneQueue<H> {
.key(self.pending_cancellation_set_name())
.key(self.failed_list_name())
.key(self.success_list_name())
.arg(&self.queue_id)
// Redis Cluster: ensure constructed keys match hash-tagged names
.arg(self.redis_hash_tag())
.arg(now)
.arg(batch_size)
.arg(self.options.lease_duration.as_secs())
Expand Down
3 changes: 2 additions & 1 deletion twmq/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/";
// Helper to clean up Redis keys for a given queue name pattern
async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) {
let mut conn = conn_manager.clone();
let keys_pattern = format!("twmq:{queue_name}:*");
// twmq queue keys are hash-tagged for Redis Cluster compatibility
let keys_pattern = format!("twmq:{{{queue_name}}}:*");

let keys: Vec<String> = redis::cmd("KEYS")
.arg(&keys_pattern)
Expand Down
3 changes: 2 additions & 1 deletion twmq/tests/basic_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use twmq::{
// Helper to clean up Redis keys for a given queue name pattern
async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) {
let mut conn = conn_manager.clone();
let keys_pattern = format!("twmq:{queue_name}:*");
// twmq queue keys are hash-tagged for Redis Cluster compatibility
let keys_pattern = format!("twmq:{{{queue_name}}}:*");

let keys: Vec<String> = redis::cmd("KEYS")
.arg(&keys_pattern)
Expand Down
3 changes: 2 additions & 1 deletion twmq/tests/delay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/";
// Helper to clean up Redis keys
async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) {
let mut conn = conn_manager.clone();
let keys_pattern = format!("twmq:{queue_name}:*");
// twmq queue keys are hash-tagged for Redis Cluster compatibility
let keys_pattern = format!("twmq:{{{queue_name}}}:*");
let keys: Vec<String> = redis::cmd("KEYS")
.arg(&keys_pattern)
.query_async(&mut conn)
Expand Down
3 changes: 2 additions & 1 deletion twmq/tests/idempotency_modes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ impl DurableExecution for TestJobHandler {
// Helper to clean up Redis keys
async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) {
let mut conn = conn_manager.clone();
let keys_pattern = format!("twmq:{queue_name}:*");
// twmq queue keys are hash-tagged for Redis Cluster compatibility
let keys_pattern = format!("twmq:{{{queue_name}}}:*");

let keys: Vec<String> = redis::cmd("KEYS")
.arg(&keys_pattern)
Expand Down
3 changes: 2 additions & 1 deletion twmq/tests/lease_expiry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ const REDIS_URL: &str = "redis://127.0.0.1:6379/";
// Helper to clean up Redis keys
async fn cleanup_redis_keys(conn_manager: &ConnectionManager, queue_name: &str) {
let mut conn = conn_manager.clone();
let keys_pattern = format!("twmq:{queue_name}:*");
// twmq queue keys are hash-tagged for Redis Cluster compatibility
let keys_pattern = format!("twmq:{{{queue_name}}}:*");
let keys: Vec<String> = redis::cmd("KEYS")
.arg(&keys_pattern)
.query_async(&mut conn)
Expand Down
Loading
Loading