Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions deploy/docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,7 @@ redis:
port: 6379
db: 0
password: ""
task_ttl_seconds: 3600 # TTL for task data (1 hour default, 0 to disable)
# ... other redis options ...

# Rate Limiting Configuration
Expand Down Expand Up @@ -989,6 +990,8 @@ You can override the default `config.yml`.
- Adjust memory_threshold_percent based on available RAM
- Set timeouts according to your content size and network conditions
- Use Redis for rate limiting in multi-container setups
- Configure `task_ttl_seconds` to control Redis memory usage (default: 3600s/1 hour)
- Set `REDIS_TASK_TTL` environment variable to override TTL at runtime

3. **Monitoring** 📊
- Enable Prometheus if you need metrics
Expand Down
46 changes: 31 additions & 15 deletions deploy/docker/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
get_llm_api_key,
validate_llm_provider,
get_llm_temperature,
get_llm_base_url
get_llm_base_url,
get_redis_task_ttl
)
from webhook import WebhookDeliveryService

Expand All @@ -61,6 +62,21 @@ def _get_memory_mb():
return None


async def hset_with_ttl(redis, key: str, mapping: dict, config: dict):
"""Set Redis hash with automatic TTL expiry.

Args:
redis: Redis client instance
key: Redis key (e.g., "task:abc123")
mapping: Hash field-value mapping
config: Application config containing redis.task_ttl_seconds
"""
await redis.hset(key, mapping=mapping)
ttl = get_redis_task_ttl(config)
if ttl > 0:
await redis.expire(key, ttl)


async def handle_llm_qa(
url: str,
query: str,
Expand Down Expand Up @@ -143,10 +159,10 @@ async def process_llm_extraction(
# Validate provider
is_valid, error_msg = validate_llm_provider(config, provider)
if not is_valid:
await redis.hset(f"task:{task_id}", mapping={
await hset_with_ttl(redis, f"task:{task_id}", {
"status": TaskStatus.FAILED,
"error": error_msg
})
}, config)

# Send webhook notification on failure
await webhook_service.notify_job_completion(
Expand Down Expand Up @@ -183,10 +199,10 @@ async def process_llm_extraction(
)

if not result.success:
await redis.hset(f"task:{task_id}", mapping={
await hset_with_ttl(redis, f"task:{task_id}", {
"status": TaskStatus.FAILED,
"error": result.error_message
})
}, config)

# Send webhook notification on failure
await webhook_service.notify_job_completion(
Expand All @@ -206,10 +222,10 @@ async def process_llm_extraction(

result_data = {"extracted_content": content}

await redis.hset(f"task:{task_id}", mapping={
await hset_with_ttl(redis, f"task:{task_id}", {
"status": TaskStatus.COMPLETED,
"result": json.dumps(content)
})
}, config)

# Send webhook notification on successful completion
await webhook_service.notify_job_completion(
Expand All @@ -223,10 +239,10 @@ async def process_llm_extraction(

except Exception as e:
logger.error(f"LLM extraction error: {str(e)}", exc_info=True)
await redis.hset(f"task:{task_id}", mapping={
await hset_with_ttl(redis, f"task:{task_id}", {
"status": TaskStatus.FAILED,
"error": str(e)
})
}, config)

# Send webhook notification on failure
await webhook_service.notify_job_completion(
Expand Down Expand Up @@ -430,7 +446,7 @@ async def create_new_task(
if webhook_config:
task_data["webhook_config"] = json.dumps(webhook_config)

await redis.hset(f"task:{task_id}", mapping=task_data)
await hset_with_ttl(redis, f"task:{task_id}", task_data, config)

background_tasks.add_task(
process_llm_extraction,
Expand Down Expand Up @@ -806,7 +822,7 @@ async def handle_crawl_job(
if webhook_config:
task_data["webhook_config"] = json.dumps(webhook_config)

await redis.hset(f"task:{task_id}", mapping=task_data)
await hset_with_ttl(redis, f"task:{task_id}", task_data, config)

# Initialize webhook service
webhook_service = WebhookDeliveryService(config)
Expand All @@ -819,10 +835,10 @@ async def _runner():
crawler_config=crawler_config,
config=config,
)
await redis.hset(f"task:{task_id}", mapping={
await hset_with_ttl(redis, f"task:{task_id}", {
"status": TaskStatus.COMPLETED,
"result": json.dumps(result),
})
}, config)

# Send webhook notification on successful completion
await webhook_service.notify_job_completion(
Expand All @@ -836,10 +852,10 @@ async def _runner():

await asyncio.sleep(5) # Give Redis time to process the update
except Exception as exc:
await redis.hset(f"task:{task_id}", mapping={
await hset_with_ttl(redis, f"task:{task_id}", {
"status": TaskStatus.FAILED,
"error": str(exc),
})
}, config)

# Send webhook notification on failure
await webhook_service.notify_job_completion(
Expand Down
9 changes: 5 additions & 4 deletions deploy/docker/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,21 @@ llm:
# api_key: sk-... # If you pass the API key directly (not recommended)

# Redis Configuration
# Set task_ttl_seconds to automatically expire task data in Redis.
# This prevents unbounded memory growth from accumulated task results.
# Default: 3600 (1 hour). Set to 0 to disable TTL (not recommended).
# Can be overridden with REDIS_TASK_TTL environment variable.
redis:
host: "localhost"
port: 6379
db: 0
password: ""
task_ttl_seconds: 3600 # TTL for task data (1 hour default)
ssl: False
ssl_cert_reqs: None
ssl_ca_certs: None
ssl_certfile: None
ssl_keyfile: None
ssl_cert_reqs: None
ssl_ca_certs: None
ssl_certfile: None
ssl_keyfile: None

# Rate Limiting Configuration
rate_limiting:
Expand Down
23 changes: 22 additions & 1 deletion deploy/docker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,16 @@ def load_config() -> Dict:
if llm_api_key and "api_key" not in config["llm"]:
config["llm"]["api_key"] = llm_api_key
logging.info("LLM API key loaded from LLM_API_KEY environment variable")


# Override Redis task TTL from environment if set
redis_task_ttl = os.environ.get("REDIS_TASK_TTL")
if redis_task_ttl:
try:
config["redis"]["task_ttl_seconds"] = int(redis_task_ttl)
logging.info(f"Redis task TTL overridden from REDIS_TASK_TTL: {redis_task_ttl}s")
except ValueError:
logging.warning(f"Invalid REDIS_TASK_TTL value: {redis_task_ttl}, using default")

return config

def setup_logging(config: Dict) -> None:
Expand Down Expand Up @@ -70,6 +79,18 @@ def decode_redis_hash(hash_data: Dict[bytes, bytes]) -> Dict[str, str]:
return {k.decode('utf-8'): v.decode('utf-8') for k, v in hash_data.items()}


def get_redis_task_ttl(config: Dict) -> int:
"""Get Redis task TTL in seconds from config.

Args:
config: The application configuration dictionary

Returns:
TTL in seconds (default 3600). Returns 0 if TTL is disabled.
"""
return config.get("redis", {}).get("task_ttl_seconds", 3600)



def get_llm_api_key(config: Dict, provider: Optional[str] = None) -> Optional[str]:
"""Get the appropriate API key based on the LLM provider.
Expand Down
Loading