diff --git a/deploy/docker/README.md b/deploy/docker/README.md index c3c968f4..d0bed5e3 100644 --- a/deploy/docker/README.md +++ b/deploy/docker/README.md @@ -887,6 +887,7 @@ redis: port: 6379 db: 0 password: "" + task_ttl_seconds: 3600 # TTL for task data (1 hour default, 0 to disable) # ... other redis options ... # Rate Limiting Configuration @@ -989,6 +990,8 @@ You can override the default `config.yml`. - Adjust memory_threshold_percent based on available RAM - Set timeouts according to your content size and network conditions - Use Redis for rate limiting in multi-container setups + - Configure `task_ttl_seconds` to control Redis memory usage (default: 3600s/1 hour) + - Set `REDIS_TASK_TTL` environment variable to override TTL at runtime 3. **Monitoring** 📊 - Enable Prometheus if you need metrics diff --git a/deploy/docker/api.py b/deploy/docker/api.py index 81cd312a..b4c9638d 100644 --- a/deploy/docker/api.py +++ b/deploy/docker/api.py @@ -44,7 +44,8 @@ get_llm_api_key, validate_llm_provider, get_llm_temperature, - get_llm_base_url + get_llm_base_url, + get_redis_task_ttl ) from webhook import WebhookDeliveryService @@ -61,6 +62,21 @@ def _get_memory_mb(): return None +async def hset_with_ttl(redis, key: str, mapping: dict, config: dict): + """Set Redis hash with automatic TTL expiry. + + Args: + redis: Redis client instance + key: Redis key (e.g., "task:abc123") + mapping: Hash field-value mapping + config: Application config containing redis.task_ttl_seconds + """ + await redis.hset(key, mapping=mapping) + ttl = get_redis_task_ttl(config) + if ttl > 0: + await redis.expire(key, ttl) + + async def handle_llm_qa( url: str, query: str, @@ -143,10 +159,10 @@ async def process_llm_extraction( # Validate provider is_valid, error_msg = validate_llm_provider(config, provider) if not is_valid: - await redis.hset(f"task:{task_id}", mapping={ + await hset_with_ttl(redis, f"task:{task_id}", { "status": TaskStatus.FAILED, "error": error_msg - }) + }, config) # Send webhook notification on failure await webhook_service.notify_job_completion( @@ -183,10 +199,10 @@ async def process_llm_extraction( ) if not result.success: - await redis.hset(f"task:{task_id}", mapping={ + await hset_with_ttl(redis, f"task:{task_id}", { "status": TaskStatus.FAILED, "error": result.error_message - }) + }, config) # Send webhook notification on failure await webhook_service.notify_job_completion( @@ -206,10 +222,10 @@ async def process_llm_extraction( result_data = {"extracted_content": content} - await redis.hset(f"task:{task_id}", mapping={ + await hset_with_ttl(redis, f"task:{task_id}", { "status": TaskStatus.COMPLETED, "result": json.dumps(content) - }) + }, config) # Send webhook notification on successful completion await webhook_service.notify_job_completion( @@ -223,10 +239,10 @@ async def process_llm_extraction( except Exception as e: logger.error(f"LLM extraction error: {str(e)}", exc_info=True) - await redis.hset(f"task:{task_id}", mapping={ + await hset_with_ttl(redis, f"task:{task_id}", { "status": TaskStatus.FAILED, "error": str(e) - }) + }, config) # Send webhook notification on failure await webhook_service.notify_job_completion( @@ -430,7 +446,7 @@ async def create_new_task( if webhook_config: task_data["webhook_config"] = json.dumps(webhook_config) - await redis.hset(f"task:{task_id}", mapping=task_data) + await hset_with_ttl(redis, f"task:{task_id}", task_data, config) background_tasks.add_task( process_llm_extraction, @@ -806,7 +822,7 @@ async def handle_crawl_job( if webhook_config: task_data["webhook_config"] = json.dumps(webhook_config) - await redis.hset(f"task:{task_id}", mapping=task_data) + await hset_with_ttl(redis, f"task:{task_id}", task_data, config) # Initialize webhook service webhook_service = WebhookDeliveryService(config) @@ -819,10 +835,10 @@ async def _runner(): crawler_config=crawler_config, config=config, ) - await redis.hset(f"task:{task_id}", mapping={ + await hset_with_ttl(redis, f"task:{task_id}", { "status": TaskStatus.COMPLETED, "result": json.dumps(result), - }) + }, config) # Send webhook notification on successful completion await webhook_service.notify_job_completion( @@ -836,10 +852,10 @@ async def _runner(): await asyncio.sleep(5) # Give Redis time to process the update except Exception as exc: - await redis.hset(f"task:{task_id}", mapping={ + await hset_with_ttl(redis, f"task:{task_id}", { "status": TaskStatus.FAILED, "error": str(exc), - }) + }, config) # Send webhook notification on failure await webhook_service.notify_job_completion( diff --git a/deploy/docker/config.yml b/deploy/docker/config.yml index db3193a6..04cfecde 100644 --- a/deploy/docker/config.yml +++ b/deploy/docker/config.yml @@ -14,20 +14,21 @@ llm: # api_key: sk-... # If you pass the API key directly (not recommended) # Redis Configuration +# Set task_ttl_seconds to automatically expire task data in Redis. +# This prevents unbounded memory growth from accumulated task results. +# Default: 3600 (1 hour). Set to 0 to disable TTL (not recommended). +# Can be overridden with REDIS_TASK_TTL environment variable. redis: host: "localhost" port: 6379 db: 0 password: "" + task_ttl_seconds: 3600 # TTL for task data (1 hour default) ssl: False ssl_cert_reqs: None ssl_ca_certs: None ssl_certfile: None ssl_keyfile: None - ssl_cert_reqs: None - ssl_ca_certs: None - ssl_certfile: None - ssl_keyfile: None # Rate Limiting Configuration rate_limiting: diff --git a/deploy/docker/utils.py b/deploy/docker/utils.py index 52f4e11f..0a649809 100644 --- a/deploy/docker/utils.py +++ b/deploy/docker/utils.py @@ -36,7 +36,16 @@ def load_config() -> Dict: if llm_api_key and "api_key" not in config["llm"]: config["llm"]["api_key"] = llm_api_key logging.info("LLM API key loaded from LLM_API_KEY environment variable") - + + # Override Redis task TTL from environment if set + redis_task_ttl = os.environ.get("REDIS_TASK_TTL") + if redis_task_ttl: + try: + config["redis"]["task_ttl_seconds"] = int(redis_task_ttl) + logging.info(f"Redis task TTL overridden from REDIS_TASK_TTL: {redis_task_ttl}s") + except ValueError: + logging.warning(f"Invalid REDIS_TASK_TTL value: {redis_task_ttl}, using default") + return config def setup_logging(config: Dict) -> None: @@ -70,6 +79,18 @@ def decode_redis_hash(hash_data: Dict[bytes, bytes]) -> Dict[str, str]: return {k.decode('utf-8'): v.decode('utf-8') for k, v in hash_data.items()} +def get_redis_task_ttl(config: Dict) -> int: + """Get Redis task TTL in seconds from config. + + Args: + config: The application configuration dictionary + + Returns: + TTL in seconds (default 3600). Returns 0 if TTL is disabled. + """ + return config.get("redis", {}).get("task_ttl_seconds", 3600) + + def get_llm_api_key(config: Dict, provider: Optional[str] = None) -> Optional[str]: """Get the appropriate API key based on the LLM provider. diff --git a/tests/docker/test_redis_ttl.py b/tests/docker/test_redis_ttl.py new file mode 100644 index 00000000..a941fb69 --- /dev/null +++ b/tests/docker/test_redis_ttl.py @@ -0,0 +1,244 @@ +""" +Unit tests for Redis TTL (Time-To-Live) feature. + +These tests verify: +1. TTL configuration loading from config.yml +2. Environment variable override (REDIS_TASK_TTL) +3. hset_with_ttl helper function behavior +4. TTL is applied when set > 0 +5. TTL is not applied when set to 0 +""" +import asyncio +import os +import pytest +from unittest.mock import AsyncMock, patch, MagicMock + + +class TestGetRedisTaskTTL: + """Tests for get_redis_task_ttl function.""" + + def test_returns_configured_ttl(self): + """Should return TTL from config when present.""" + import sys + sys.path.insert(0, 'deploy/docker') + from utils import get_redis_task_ttl + + config = {"redis": {"task_ttl_seconds": 7200}} + assert get_redis_task_ttl(config) == 7200 + + def test_returns_default_when_not_configured(self): + """Should return default 3600 when TTL not in config.""" + import sys + sys.path.insert(0, 'deploy/docker') + from utils import get_redis_task_ttl + + config = {"redis": {}} + assert get_redis_task_ttl(config) == 3600 + + def test_returns_default_when_redis_missing(self): + """Should return default 3600 when redis section missing.""" + import sys + sys.path.insert(0, 'deploy/docker') + from utils import get_redis_task_ttl + + config = {} + assert get_redis_task_ttl(config) == 3600 + + def test_returns_zero_when_configured_zero(self): + """Should return 0 when TTL explicitly set to 0 (disabled).""" + import sys + sys.path.insert(0, 'deploy/docker') + from utils import get_redis_task_ttl + + config = {"redis": {"task_ttl_seconds": 0}} + assert get_redis_task_ttl(config) == 0 + + +class TestLoadConfigTTL: + """Tests for TTL configuration loading.""" + + def test_env_override_valid_value(self): + """REDIS_TASK_TTL env var should override config value.""" + import sys + sys.path.insert(0, 'deploy/docker') + + with patch.dict(os.environ, {"REDIS_TASK_TTL": "1800"}): + # Need to reload to pick up env var + import importlib + import utils + importlib.reload(utils) + + config = utils.load_config() + assert config["redis"]["task_ttl_seconds"] == 1800 + + def test_env_override_invalid_value_uses_default(self): + """Invalid REDIS_TASK_TTL should be ignored with warning.""" + import sys + sys.path.insert(0, 'deploy/docker') + + with patch.dict(os.environ, {"REDIS_TASK_TTL": "invalid"}): + import importlib + import utils + importlib.reload(utils) + + config = utils.load_config() + # Should fall back to config.yml default (3600) + assert config["redis"]["task_ttl_seconds"] == 3600 + + def test_env_override_zero_disables_ttl(self): + """REDIS_TASK_TTL=0 should disable TTL.""" + import sys + sys.path.insert(0, 'deploy/docker') + + with patch.dict(os.environ, {"REDIS_TASK_TTL": "0"}): + import importlib + import utils + importlib.reload(utils) + + config = utils.load_config() + assert config["redis"]["task_ttl_seconds"] == 0 + + +class TestHsetWithTTL: + """Tests for hset_with_ttl helper function. + + These tests recreate the hset_with_ttl logic to verify correctness + without importing the full api.py (which has heavy dependencies). + """ + + def test_sets_hash_and_ttl(self): + """Should call hset and expire when TTL > 0.""" + import sys + sys.path.insert(0, 'deploy/docker') + from utils import get_redis_task_ttl + + async def hset_with_ttl(redis, key, mapping, config): + """Recreate the function for testing.""" + await redis.hset(key, mapping=mapping) + ttl = get_redis_task_ttl(config) + if ttl > 0: + await redis.expire(key, ttl) + + async def run_test(): + mock_redis = AsyncMock() + + config = {"redis": {"task_ttl_seconds": 3600}} + key = "task:test123" + mapping = {"status": "processing", "url": "https://example.com"} + + await hset_with_ttl(mock_redis, key, mapping, config) + + mock_redis.hset.assert_called_once_with(key, mapping=mapping) + mock_redis.expire.assert_called_once_with(key, 3600) + + asyncio.run(run_test()) + + def test_no_expire_when_ttl_zero(self): + """Should not call expire when TTL is 0 (disabled).""" + import sys + sys.path.insert(0, 'deploy/docker') + from utils import get_redis_task_ttl + + async def hset_with_ttl(redis, key, mapping, config): + """Recreate the function for testing.""" + await redis.hset(key, mapping=mapping) + ttl = get_redis_task_ttl(config) + if ttl > 0: + await redis.expire(key, ttl) + + async def run_test(): + mock_redis = AsyncMock() + + config = {"redis": {"task_ttl_seconds": 0}} + key = "task:test456" + mapping = {"status": "completed"} + + await hset_with_ttl(mock_redis, key, mapping, config) + + mock_redis.hset.assert_called_once_with(key, mapping=mapping) + mock_redis.expire.assert_not_called() + + asyncio.run(run_test()) + + def test_custom_ttl_value(self): + """Should use custom TTL value from config.""" + import sys + sys.path.insert(0, 'deploy/docker') + from utils import get_redis_task_ttl + + async def hset_with_ttl(redis, key, mapping, config): + """Recreate the function for testing.""" + await redis.hset(key, mapping=mapping) + ttl = get_redis_task_ttl(config) + if ttl > 0: + await redis.expire(key, ttl) + + async def run_test(): + mock_redis = AsyncMock() + + config = {"redis": {"task_ttl_seconds": 86400}} # 24 hours + key = "task:custom" + mapping = {"status": "processing"} + + await hset_with_ttl(mock_redis, key, mapping, config) + + mock_redis.expire.assert_called_once_with(key, 86400) + + asyncio.run(run_test()) + + def test_default_ttl_when_not_configured(self): + """Should use default TTL (3600) when not in config.""" + import sys + sys.path.insert(0, 'deploy/docker') + from utils import get_redis_task_ttl + + async def hset_with_ttl(redis, key, mapping, config): + """Recreate the function for testing.""" + await redis.hset(key, mapping=mapping) + ttl = get_redis_task_ttl(config) + if ttl > 0: + await redis.expire(key, ttl) + + async def run_test(): + mock_redis = AsyncMock() + + config = {} # No redis config + key = "task:noconfig" + mapping = {"status": "processing"} + + await hset_with_ttl(mock_redis, key, mapping, config) + + mock_redis.expire.assert_called_once_with(key, 3600) + + asyncio.run(run_test()) + + +class TestTTLDocumentation: + """Tests to verify TTL is documented in config.yml.""" + + def test_config_has_ttl_setting(self): + """Verify config.yml contains task_ttl_seconds setting.""" + import sys + sys.path.insert(0, 'deploy/docker') + from utils import load_config + + config = load_config() + assert "redis" in config + assert "task_ttl_seconds" in config["redis"] + + def test_config_default_is_3600(self): + """Verify default TTL in config.yml is 3600 (1 hour).""" + import sys + sys.path.insert(0, 'deploy/docker') + + # Clear any env override + with patch.dict(os.environ, {}, clear=True): + # Remove REDIS_TASK_TTL if present + os.environ.pop("REDIS_TASK_TTL", None) + + import importlib + import utils + importlib.reload(utils) + + config = utils.load_config() + assert config["redis"]["task_ttl_seconds"] == 3600