diff --git a/.env.docker b/.env.docker new file mode 100644 index 0000000..a397be3 --- /dev/null +++ b/.env.docker @@ -0,0 +1,34 @@ +# Docker Environment Configuration for RouteMQ +# Copy this file to .env and customize for your deployment + +# MQTT Broker Configuration +MQTT_BROKER=test.mosquitto.org +MQTT_PORT=1883 +MQTT_USERNAME= +MQTT_PASSWORD= +MQTT_GROUP_NAME=mqtt_framework_group + +# MySQL Configuration +ENABLE_MYSQL=true +DB_HOST=mysql +DB_PORT=3306 +DB_NAME=mqtt_framework +DB_USER=routemq +DB_PASS=routemq + +# Redis Configuration +ENABLE_REDIS=true +REDIS_HOST=redis +REDIS_PORT=6379 +REDIS_DB=0 +REDIS_PASSWORD= + +# Queue Configuration +QUEUE_CONNECTION=redis + +# Timezone +TIMEZONE=Asia/Jakarta + +# Logging Configuration +LOG_LEVEL=INFO +LOG_FORMAT=%(asctime)s - %(name)s - %(levelname)s - %(message)s diff --git a/.env.example b/.env.example index ad66980..fa55d54 100644 --- a/.env.example +++ b/.env.example @@ -19,6 +19,12 @@ DB_PASS= # Redis Configuration ENABLE_REDIS=false +# Queue Configuration +# Queue connection driver: 'redis' or 'database' +# Redis queue is faster but requires ENABLE_REDIS=true +# Database queue is persistent but requires ENABLE_MYSQL=true +QUEUE_CONNECTION=redis + # Logging Configuration # Enable/disable file logging (true/false) LOG_TO_FILE=true diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..6666567 --- /dev/null +++ b/Makefile @@ -0,0 +1,98 @@ +.PHONY: help build up down restart logs ps clean dev queue-work + +help: ## Show this help message + @echo "RouteMQ Docker Commands" + @echo "" + @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}' + +build: ## Build all Docker images + docker compose build + +up: ## Start all services (production) + docker compose up -d + +down: ## Stop all services + docker compose down + +restart: ## Restart all services + docker compose restart + +logs: ## View logs from all services + docker compose logs -f + +logs-app: ## View logs from RouteMQ app + docker compose logs -f routemq + +logs-worker: ## View logs from default queue worker + docker compose logs -f queue-worker-default + +logs-redis: ## View logs from Redis + docker compose logs -f redis + +logs-mysql: ## View logs from MySQL + docker compose logs -f mysql + +ps: ## Show running services + docker compose ps + +stats: ## Show resource usage stats + docker stats + +clean: ## Stop and remove all containers, networks, and volumes + docker compose down -v + +dev: ## Start development environment (Redis + MySQL only) + docker compose -f docker-compose.dev.yml up -d + +dev-full: ## Start development environment (all services) + docker compose -f docker-compose.dev.yml --profile full up -d + +dev-down: ## Stop development environment + docker compose -f docker-compose.dev.yml down + +queue-work: ## Start queue worker on host (for development) + uv run python main.py --queue-work --queue default + +queue-high: ## Start high-priority queue worker on host + uv run python main.py --queue-work --queue high-priority --sleep 1 + +queue-emails: ## Start emails queue worker on host + uv run python main.py --queue-work --queue emails --sleep 5 + +scale-default: ## Scale default queue workers to 3 instances + docker compose up -d --scale queue-worker-default=3 + +scale-emails: ## Scale email queue workers to 2 instances + docker compose up -d --scale queue-worker-emails=2 + +shell-app: ## Open shell in RouteMQ app container + docker compose exec routemq bash + +shell-redis: ## Open Redis CLI + docker compose exec redis redis-cli + +shell-mysql: ## Open MySQL CLI + docker compose exec mysql mysql -uroot -p + +backup-mysql: ## Backup MySQL database + docker compose exec mysql mysqldump -uroot -p${DB_PASS} ${DB_NAME} > backup_mysql_$$(date +%Y%m%d_%H%M%S).sql + +backup-redis: ## Backup Redis data + docker compose exec redis redis-cli SAVE + docker cp routemq-redis:/data/dump.rdb backup_redis_$$(date +%Y%m%d_%H%M%S).rdb + +health: ## Check health of all services + @echo "Service Health Status:" + @docker compose ps --format "table {{.Service}}\t{{.Status}}" + +install: ## Install dependencies on host + uv sync + +run: ## Run RouteMQ on host + uv run python main.py --run + +tinker: ## Start interactive REPL + uv run python main.py --tinker + +init: ## Initialize new RouteMQ project + uv run python main.py --init diff --git a/README.md b/README.md index 1b9379c..b5bec4c 100644 --- a/README.md +++ b/README.md @@ -26,10 +26,12 @@ uv run python main.py --run - **Route-based MQTT topic handling** - Define routes using a clean, expressive syntax - **Middleware support** - Process messages through middleware chains - **Parameter extraction** - Extract variables from MQTT topics using Laravel-style syntax +- **Background Task Queue** - Laravel-style queue system for async job processing - **Shared Subscriptions** - Horizontal scaling with worker processes - **Redis Integration** - Optional Redis support for distributed caching and rate limiting - **Advanced Rate Limiting** - Multiple rate limiting strategies with Redis backend - **Optional MySQL integration** - Use with or without a database +- **Docker Support** - Production-ready Docker Compose setup with queue workers - **Environment-based configuration** - Flexible configuration through .env files ## ๐Ÿ“š Documentation @@ -43,6 +45,8 @@ uv run python main.py --run - **[Routing](./docs/routing/README.md)** - Route definition, parameters, and organization - **[Controllers](./docs/controllers/README.md)** - Creating and organizing business logic - **[Middleware](./docs/middleware/README.md)** - Request processing and middleware chains +- **[Queue System](./docs/queue/README.md)** - Background task processing and job queues +- **[Docker Deployment](./docs/docker-deployment.md)** - Production deployment with Docker - **[Redis Integration](./docs/redis/README.md)** - Caching, sessions, and distributed features - **[Rate Limiting](./docs/rate-limiting/README.md)** - Advanced rate limiting strategies - **[Examples](./docs/examples/README.md)** - Practical examples and use cases @@ -58,12 +62,68 @@ RouteMQ/ โ”‚ โ”œโ”€โ”€ controllers/ # ๐ŸŽฎ Route handlers โ”‚ โ”œโ”€โ”€ middleware/ # ๐Ÿ”ง Custom middleware โ”‚ โ”œโ”€โ”€ models/ # ๐Ÿ—„๏ธ Database models +โ”‚ โ”œโ”€โ”€ jobs/ # ๐Ÿ“‹ Background jobs โ”‚ โ””โ”€โ”€ routers/ # ๐Ÿ›ฃ๏ธ Route definitions โ”œโ”€โ”€ core/ # โšก Framework core +โ”‚ โ”œโ”€โ”€ queue/ # ๐Ÿ”„ Queue system +โ”‚ โ”œโ”€โ”€ job.py # ๐Ÿ“ Base job class +โ”‚ โ””โ”€โ”€ ... # Other core components โ”œโ”€โ”€ bootstrap/ # ๐ŸŒŸ Application bootstrap +โ”œโ”€โ”€ docker-compose.yml # ๐Ÿณ Production Docker setup โ””โ”€โ”€ tests/ # ๐Ÿงช Test files ``` +## ๐Ÿณ Docker Deployment + +RouteMQ includes production-ready Docker Compose configuration with Redis, MySQL, and queue workers: + +```bash +# Start all services (app + 3 queue workers + Redis + MySQL) +docker compose up -d + +# View logs +docker compose logs -f + +# Scale workers +docker compose up -d --scale queue-worker-default=5 + +# Or use Makefile +make up # Start all services +make logs # View logs +make ps # Show status +``` + +See [Docker Deployment Guide](./docs/docker-deployment.md) for detailed instructions. + +## ๐Ÿ“‹ Background Task Queue + +Process time-consuming tasks asynchronously with the built-in queue system: + +```python +# Create a job +from core.job import Job + +class SendEmailJob(Job): + max_tries = 3 + queue = "emails" + + async def handle(self): + # Send email logic + pass + +# Dispatch the job +from core.queue.queue_manager import dispatch + +job = SendEmailJob() +job.to = "user@example.com" +await dispatch(job) + +# Run queue worker +python main.py --queue-work --queue emails +``` + +See [Queue System Documentation](./docs/queue-system.md) for complete guide. + ## ๐Ÿค Contributing We welcome contributions! Please see our documentation for development setup and contribution guidelines. diff --git a/app/jobs/__init__.py b/app/jobs/__init__.py new file mode 100644 index 0000000..b037ae0 --- /dev/null +++ b/app/jobs/__init__.py @@ -0,0 +1 @@ +# This file marks the directory as a Python package diff --git a/app/jobs/example_data_processing_job.py b/app/jobs/example_data_processing_job.py new file mode 100644 index 0000000..a309b3a --- /dev/null +++ b/app/jobs/example_data_processing_job.py @@ -0,0 +1,73 @@ +import asyncio +import logging +from core.job import Job + +logger = logging.getLogger("RouteMQ.Jobs.ProcessDataJob") + + +class ProcessDataJob(Job): + """ + Example job for processing data in the background. + + This demonstrates a job that processes sensor data from IoT devices. + + Usage: + from app.jobs.example_data_processing_job import ProcessDataJob + from core.queue.queue_manager import dispatch + + # Dispatch the job + job = ProcessDataJob() + job.device_id = "sensor-001" + job.sensor_data = {"temperature": 25.5, "humidity": 60} + await dispatch(job) + """ + + # Configure job properties + max_tries = 5 + timeout = 120 # Longer timeout for data processing + retry_after = 5 + queue = "data-processing" + + def __init__(self): + super().__init__() + self.device_id = None + self.sensor_data = None + + async def handle(self) -> None: + """ + Execute the job - process sensor data. + """ + logger.info(f"Processing data from device {self.device_id}") + logger.info(f"Sensor data: {self.sensor_data}") + + # Simulate data processing + await asyncio.sleep(3) + + # Example: Calculate statistics + if isinstance(self.sensor_data, dict): + temperature = self.sensor_data.get("temperature") + humidity = self.sensor_data.get("humidity") + + if temperature and temperature > 30: + logger.warning(f"High temperature detected: {temperature}ยฐC") + + if humidity and humidity > 80: + logger.warning(f"High humidity detected: {humidity}%") + + # In a real application, you might: + # - Store processed data in a database + # - Calculate aggregations and statistics + # - Trigger alerts if thresholds are exceeded + # - Send data to analytics services + + logger.info(f"Successfully processed data from device {self.device_id}") + + async def failed(self, exception: Exception) -> None: + """ + Handle permanent job failure. + """ + logger.error( + f"Failed to process data from device {self.device_id} " + f"after {self.max_tries} attempts" + ) + logger.error(f"Error: {str(exception)}") diff --git a/app/jobs/example_email_job.py b/app/jobs/example_email_job.py new file mode 100644 index 0000000..86f9e88 --- /dev/null +++ b/app/jobs/example_email_job.py @@ -0,0 +1,65 @@ +import asyncio +import logging +from core.job import Job + +logger = logging.getLogger("RouteMQ.Jobs.SendEmailJob") + + +class SendEmailJob(Job): + """ + Example job for sending emails in the background. + + Usage: + from app.jobs.example_email_job import SendEmailJob + from core.queue.queue_manager import dispatch + + # Dispatch the job + job = SendEmailJob() + job.to = "user@example.com" + job.subject = "Welcome!" + job.message = "Thank you for signing up." + await dispatch(job) + """ + + # Configure job properties + max_tries = 3 + timeout = 30 + retry_after = 10 # Retry after 10 seconds on failure + queue = "emails" # Use 'emails' queue instead of 'default' + + def __init__(self): + super().__init__() + self.to = None + self.subject = None + self.message = None + + async def handle(self) -> None: + """ + Execute the job - send an email. + In a real application, this would use an email service like SendGrid, AWS SES, etc. + """ + logger.info(f"Sending email to {self.to}") + logger.info(f"Subject: {self.subject}") + logger.info(f"Message: {self.message}") + + # Simulate email sending (replace with actual email service) + await asyncio.sleep(2) # Simulate API call delay + + # Uncomment to test job failure and retry + # if self.attempts == 1: + # raise Exception("Simulated email sending failure") + + logger.info(f"Email sent successfully to {self.to}") + + async def failed(self, exception: Exception) -> None: + """ + Handle permanent job failure. + This is called when the job exceeds max_tries. + """ + logger.error(f"Failed to send email to {self.to} after {self.max_tries} attempts") + logger.error(f"Error: {str(exception)}") + + # In a real application, you might: + # - Log to a monitoring service + # - Send alert to administrators + # - Store failure in a database for manual review diff --git a/app/jobs/example_report_job.py b/app/jobs/example_report_job.py new file mode 100644 index 0000000..149e55f --- /dev/null +++ b/app/jobs/example_report_job.py @@ -0,0 +1,75 @@ +import asyncio +import logging +from datetime import datetime +from core.job import Job + +logger = logging.getLogger("RouteMQ.Jobs.GenerateReportJob") + + +class GenerateReportJob(Job): + """ + Example job for generating reports in the background. + + This demonstrates a long-running job that generates reports + and demonstrates delayed job execution. + + Usage: + from app.jobs.example_report_job import GenerateReportJob + from core.queue.queue_manager import queue + + # Dispatch the job immediately + job = GenerateReportJob() + job.report_type = "daily" + job.user_id = 123 + await queue.push(job) + + # Or schedule it to run after a delay (in seconds) + await queue.later(3600, job) # Run after 1 hour + """ + + # Configure job properties + max_tries = 2 + timeout = 300 # 5 minutes for report generation + retry_after = 60 # Retry after 1 minute + queue = "reports" + + def __init__(self): + super().__init__() + self.report_type = None + self.user_id = None + + async def handle(self) -> None: + """ + Execute the job - generate a report. + """ + logger.info(f"Generating {self.report_type} report for user {self.user_id}") + + # Simulate report generation + await asyncio.sleep(5) + + # In a real application, you might: + # - Query database for report data + # - Generate PDF or Excel file + # - Upload to cloud storage + # - Send email notification with download link + # - Update user's report history + + report_file = f"{self.report_type}_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf" + + logger.info(f"Report generated successfully: {report_file}") + logger.info(f"Report available for user {self.user_id}") + + async def failed(self, exception: Exception) -> None: + """ + Handle permanent job failure. + """ + logger.error( + f"Failed to generate {self.report_type} report for user {self.user_id} " + f"after {self.max_tries} attempts" + ) + logger.error(f"Error: {str(exception)}") + + # In a real application, you might: + # - Notify the user that report generation failed + # - Log to monitoring service + # - Create a support ticket diff --git a/app/models/queue_failed_job.py b/app/models/queue_failed_job.py new file mode 100644 index 0000000..139cdab --- /dev/null +++ b/app/models/queue_failed_job.py @@ -0,0 +1,19 @@ +from datetime import datetime +from sqlalchemy import Column, Integer, String, Text, DateTime +from core.model import Base + + +class QueueFailedJob(Base): + """Model for queue_failed_jobs table - stores jobs that failed permanently.""" + + __tablename__ = "queue_failed_jobs" + + id = Column(Integer, primary_key=True, autoincrement=True) + connection = Column(String(255), nullable=False) + queue = Column(String(255), nullable=False, index=True) + payload = Column(Text, nullable=False) + exception = Column(Text, nullable=False) + failed_at = Column(DateTime, nullable=False, default=datetime.utcnow) + + def __repr__(self): + return f"" diff --git a/app/models/queue_job.py b/app/models/queue_job.py new file mode 100644 index 0000000..f25dc30 --- /dev/null +++ b/app/models/queue_job.py @@ -0,0 +1,25 @@ +from datetime import datetime +from sqlalchemy import Column, Integer, String, Text, DateTime, Index +from core.model import Base + + +class QueueJob(Base): + """Model for queue_jobs table - stores pending and reserved jobs.""" + + __tablename__ = "queue_jobs" + + id = Column(Integer, primary_key=True, autoincrement=True) + queue = Column(String(255), nullable=False, index=True, default="default") + payload = Column(Text, nullable=False) + attempts = Column(Integer, nullable=False, default=0) + reserved_at = Column(DateTime, nullable=True) + available_at = Column(DateTime, nullable=False) + created_at = Column(DateTime, nullable=False, default=datetime.utcnow) + + # Composite index for efficient job fetching + __table_args__ = ( + Index('queue_jobs_queue_reserved_at_index', 'queue', 'reserved_at'), + ) + + def __repr__(self): + return f"" diff --git a/bootstrap/app.py b/bootstrap/app.py index ed96f67..d322051 100644 --- a/bootstrap/app.py +++ b/bootstrap/app.py @@ -190,6 +190,18 @@ async def initialize_redis(self): else: self.logger.warning("Redis initialization failed") + async def _initialize_connections(self): + """Initialize database and Redis connections.""" + await self.initialize_database() + await self.initialize_redis() + + async def _cleanup_connections(self): + """Cleanup database and Redis connections.""" + if self.redis_enabled: + await redis_manager.disconnect() + if self.mysql_enabled: + await Model.cleanup() + def _on_connect(self, client, userdata, flags, rc): """Callback for when the client receives a CONNACK response from the server.""" self.logger.info(f"Main client connected with result code {rc}") diff --git a/core/job.py b/core/job.py new file mode 100644 index 0000000..d3d3646 --- /dev/null +++ b/core/job.py @@ -0,0 +1,126 @@ +import json +import logging +from abc import ABC, abstractmethod +from typing import Optional, Dict, Any + +logger = logging.getLogger("RouteMQ.Job") + + +class Job(ABC): + """ + Base Job class that all background jobs should extend. + Similar to Laravel's Job class. + """ + + # Maximum number of times the job may be attempted + max_tries: int = 3 + + # Number of seconds the job can run before timing out + timeout: int = 60 + + # Number of seconds to wait before retrying the job after a failure + retry_after: int = 0 + + # The name of the queue the job should be sent to + queue: str = "default" + + def __init__(self): + """Initialize the job.""" + self.job_id: Optional[int] = None + self.attempts: int = 0 + + @abstractmethod + async def handle(self) -> None: + """ + Execute the job. + This method must be implemented by all job classes. + """ + pass + + async def failed(self, exception: Exception) -> None: + """ + Handle a job failure. + Override this method to perform cleanup when a job fails permanently. + + Args: + exception: The exception that caused the job to fail + """ + logger.error( + f"Job {self.__class__.__name__} failed permanently: {str(exception)}" + ) + + def serialize(self) -> str: + """ + Serialize the job to a JSON string for storage. + + Returns: + JSON string representation of the job + """ + job_data = { + "class": f"{self.__class__.__module__}.{self.__class__.__name__}", + "data": self.get_data(), + "max_tries": self.max_tries, + "timeout": self.timeout, + "retry_after": self.retry_after, + "queue": self.queue, + } + return json.dumps(job_data) + + def get_data(self) -> Dict[str, Any]: + """ + Get the serializable data for the job. + Override this method to include custom data that needs to be serialized. + + Returns: + Dictionary of data to be serialized + """ + # Get all instance attributes except private ones and job metadata + data = {} + for key, value in self.__dict__.items(): + if not key.startswith("_") and key not in [ + "job_id", + "attempts", + "max_tries", + "timeout", + "retry_after", + "queue", + ]: + data[key] = value + return data + + @classmethod + def unserialize(cls, payload: str) -> "Job": + """ + Unserialize a job from a JSON string. + + Args: + payload: JSON string representation of the job + + Returns: + Job instance + """ + job_data = json.loads(payload) + + # Import and instantiate the job class + module_name, class_name = job_data["class"].rsplit(".", 1) + module = __import__(module_name, fromlist=[class_name]) + job_class = getattr(module, class_name) + + # Create job instance + job = job_class() + + # Restore job properties + job.max_tries = job_data.get("max_tries", 3) + job.timeout = job_data.get("timeout", 60) + job.retry_after = job_data.get("retry_after", 0) + job.queue = job_data.get("queue", "default") + + # Restore custom data + data = job_data.get("data", {}) + for key, value in data.items(): + setattr(job, key, value) + + return job + + def __repr__(self): + return f"<{self.__class__.__name__}(attempts={self.attempts}, max_tries={self.max_tries})>" diff --git a/core/queue/__init__.py b/core/queue/__init__.py new file mode 100644 index 0000000..ec046c3 --- /dev/null +++ b/core/queue/__init__.py @@ -0,0 +1,19 @@ +""" +Queue system for RouteMQ - Background task processing similar to Laravel's queue system. +""" + +from core.queue.queue_manager import QueueManager, queue, dispatch +from core.queue.queue_worker import QueueWorker +from core.queue.queue_driver import QueueDriver +from core.queue.redis_queue import RedisQueue +from core.queue.database_queue import DatabaseQueue + +__all__ = [ + "QueueManager", + "queue", + "dispatch", + "QueueWorker", + "QueueDriver", + "RedisQueue", + "DatabaseQueue", +] diff --git a/core/queue/database_queue.py b/core/queue/database_queue.py new file mode 100644 index 0000000..3974ea5 --- /dev/null +++ b/core/queue/database_queue.py @@ -0,0 +1,225 @@ +import logging +from datetime import datetime, timedelta +from typing import Optional +from sqlalchemy import select, update, delete +from sqlalchemy.ext.asyncio import AsyncSession + +from core.queue.queue_driver import QueueDriver +from core.model import Model +from app.models.queue_job import QueueJob +from app.models.queue_failed_job import QueueFailedJob + +logger = logging.getLogger("RouteMQ.DatabaseQueue") + + +class DatabaseQueue(QueueDriver): + """ + Database-backed queue driver using MySQL/SQLAlchemy. + Provides persistent job storage with ACID guarantees. + """ + + def __init__(self): + """Initialize the database queue driver.""" + self.connection_name = "database" + + async def push( + self, + payload: str, + queue: str = "default", + delay: int = 0, + ) -> None: + """Push a new job onto the queue.""" + if not Model._is_enabled: + logger.error("Cannot push job to database queue - MySQL is disabled") + raise RuntimeError("MySQL is disabled. Enable it to use DatabaseQueue.") + + session: AsyncSession = await Model.get_session() + try: + available_at = datetime.utcnow() + if delay > 0: + available_at += timedelta(seconds=delay) + + job = QueueJob( + queue=queue, + payload=payload, + attempts=0, + available_at=available_at, + created_at=datetime.utcnow(), + ) + + session.add(job) + await session.commit() + logger.debug(f"Job pushed to queue '{queue}' with delay {delay}s") + + except Exception as e: + await session.rollback() + logger.error(f"Failed to push job to queue: {str(e)}") + raise + finally: + await session.close() + + async def pop(self, queue: str = "default") -> Optional[dict]: + """Pop the next available job from the queue.""" + if not Model._is_enabled: + logger.error("Cannot pop job from database queue - MySQL is disabled") + return None + + session: AsyncSession = await Model.get_session() + try: + # Use FOR UPDATE SKIP LOCKED for concurrency-safe job claiming + # Find the next available job that's not reserved + stmt = ( + select(QueueJob) + .where( + QueueJob.queue == queue, + QueueJob.reserved_at.is_(None), + QueueJob.available_at <= datetime.utcnow(), + ) + .order_by(QueueJob.id) + .limit(1) + .with_for_update(skip_locked=True) + ) + + result = await session.execute(stmt) + job = result.scalars().first() + + if not job: + return None + + # Mark job as reserved + job.reserved_at = datetime.utcnow() + job.attempts += 1 + + await session.commit() + await session.refresh(job) + + logger.debug( + f"Job {job.id} popped from queue '{queue}' (attempt {job.attempts})" + ) + + return { + "id": job.id, + "payload": job.payload, + "attempts": job.attempts, + } + + except Exception as e: + await session.rollback() + logger.error(f"Failed to pop job from queue: {str(e)}") + return None + finally: + await session.close() + + async def release( + self, + job_id: int, + queue: str, + delay: int = 0, + ) -> None: + """Release a job back to the queue for retry.""" + if not Model._is_enabled: + logger.error("Cannot release job - MySQL is disabled") + return + + session: AsyncSession = await Model.get_session() + try: + available_at = datetime.utcnow() + if delay > 0: + available_at += timedelta(seconds=delay) + + stmt = ( + update(QueueJob) + .where(QueueJob.id == job_id, QueueJob.queue == queue) + .values(reserved_at=None, available_at=available_at) + ) + + await session.execute(stmt) + await session.commit() + logger.debug(f"Job {job_id} released back to queue '{queue}' with delay {delay}s") + + except Exception as e: + await session.rollback() + logger.error(f"Failed to release job: {str(e)}") + raise + finally: + await session.close() + + async def delete(self, job_id: int, queue: str) -> None: + """Delete a job from the queue.""" + if not Model._is_enabled: + logger.error("Cannot delete job - MySQL is disabled") + return + + session: AsyncSession = await Model.get_session() + try: + stmt = delete(QueueJob).where( + QueueJob.id == job_id, + QueueJob.queue == queue, + ) + + await session.execute(stmt) + await session.commit() + logger.debug(f"Job {job_id} deleted from queue '{queue}'") + + except Exception as e: + await session.rollback() + logger.error(f"Failed to delete job: {str(e)}") + raise + finally: + await session.close() + + async def failed( + self, + connection: str, + queue: str, + payload: str, + exception: str, + ) -> None: + """Store a failed job.""" + if not Model._is_enabled: + logger.error("Cannot store failed job - MySQL is disabled") + return + + session: AsyncSession = await Model.get_session() + try: + failed_job = QueueFailedJob( + connection=connection, + queue=queue, + payload=payload, + exception=exception, + failed_at=datetime.utcnow(), + ) + + session.add(failed_job) + await session.commit() + logger.info(f"Failed job stored for queue '{queue}'") + + except Exception as e: + await session.rollback() + logger.error(f"Failed to store failed job: {str(e)}") + raise + finally: + await session.close() + + async def size(self, queue: str = "default") -> int: + """Get the size of the queue.""" + if not Model._is_enabled: + logger.error("Cannot get queue size - MySQL is disabled") + return 0 + + session: AsyncSession = await Model.get_session() + try: + stmt = select(QueueJob).where( + QueueJob.queue == queue, + QueueJob.reserved_at.is_(None), + ) + + result = await session.execute(stmt) + jobs = result.scalars().all() + return len(jobs) + + except Exception as e: + logger.error(f"Failed to get queue size: {str(e)}") + return 0 + finally: + await session.close() diff --git a/core/queue/queue_driver.py b/core/queue/queue_driver.py new file mode 100644 index 0000000..3a4e274 --- /dev/null +++ b/core/queue/queue_driver.py @@ -0,0 +1,100 @@ +from abc import ABC, abstractmethod +from typing import Optional +from datetime import datetime + + +class QueueDriver(ABC): + """ + Abstract base class for queue drivers. + Implementations can use Redis, Database, or any other backend. + """ + + @abstractmethod + async def push( + self, + payload: str, + queue: str = "default", + delay: int = 0, + ) -> None: + """ + Push a new job onto the queue. + + Args: + payload: Serialized job data + queue: Queue name + delay: Delay in seconds before the job becomes available + """ + pass + + @abstractmethod + async def pop(self, queue: str = "default") -> Optional[dict]: + """ + Pop the next available job from the queue. + + Args: + queue: Queue name + + Returns: + Dictionary with job data: {id, payload, attempts} or None if no jobs available + """ + pass + + @abstractmethod + async def release( + self, + job_id: int, + queue: str, + delay: int = 0, + ) -> None: + """ + Release a job back to the queue (for retry). + + Args: + job_id: Job identifier + queue: Queue name + delay: Delay in seconds before the job becomes available again + """ + pass + + @abstractmethod + async def delete(self, job_id: int, queue: str) -> None: + """ + Delete a job from the queue. + + Args: + job_id: Job identifier + queue: Queue name + """ + pass + + @abstractmethod + async def failed( + self, + connection: str, + queue: str, + payload: str, + exception: str, + ) -> None: + """ + Store a failed job. + + Args: + connection: Connection name (e.g., 'redis', 'database') + queue: Queue name + payload: Serialized job data + exception: Exception message + """ + pass + + @abstractmethod + async def size(self, queue: str = "default") -> int: + """ + Get the size of the queue. + + Args: + queue: Queue name + + Returns: + Number of jobs in the queue + """ + pass diff --git a/core/queue/queue_manager.py b/core/queue/queue_manager.py new file mode 100644 index 0000000..6e38149 --- /dev/null +++ b/core/queue/queue_manager.py @@ -0,0 +1,179 @@ +import logging +import os +from typing import Optional + +from core.job import Job +from core.queue.queue_driver import QueueDriver +from core.queue.redis_queue import RedisQueue +from core.queue.database_queue import DatabaseQueue +from core.redis_manager import RedisManager +from core.model import Model + +logger = logging.getLogger("RouteMQ.QueueManager") + + +class QueueManager: + """ + Queue Manager for dispatching jobs to queues. + Similar to Laravel's Queue facade. + """ + + _instance: Optional["QueueManager"] = None + _driver: Optional[QueueDriver] = None + _default_connection: str = "redis" + + def __new__(cls) -> "QueueManager": + """Singleton pattern to ensure one queue manager instance.""" + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + """Initialize the queue manager.""" + if hasattr(self, "_initialized"): + return + + self._initialized = True + self._default_connection = os.getenv("QUEUE_CONNECTION", "redis") + logger.info(f"QueueManager initialized with default connection: {self._default_connection}") + + def get_driver(self, connection: Optional[str] = None) -> QueueDriver: + """ + Get the queue driver for the specified connection. + + Args: + connection: Connection name ('redis' or 'database'). If None, uses default. + + Returns: + QueueDriver instance + + Raises: + RuntimeError: If the requested driver is not available + """ + connection = connection or self._default_connection + + if connection == "redis": + redis_manager = RedisManager() + if not redis_manager.is_enabled(): + # Fallback to database if Redis is not available + logger.warning("Redis is not available, falling back to database queue") + connection = "database" + else: + return RedisQueue() + + if connection == "database": + if not Model._is_enabled: + raise RuntimeError( + "Cannot use database queue - MySQL is disabled. " + "Enable MySQL or configure Redis as queue connection." + ) + return DatabaseQueue() + + raise RuntimeError(f"Unknown queue connection: {connection}") + + async def push( + self, + job: Job, + queue: Optional[str] = None, + connection: Optional[str] = None, + ) -> None: + """ + Push a job to the queue. + + Args: + job: Job instance to push + queue: Queue name (uses job's queue if not specified) + connection: Connection name (uses default if not specified) + """ + queue = queue or job.queue + driver = self.get_driver(connection) + + payload = job.serialize() + await driver.push(payload, queue) + + logger.info(f"Job {job.__class__.__name__} dispatched to queue '{queue}'") + + async def later( + self, + delay: int, + job: Job, + queue: Optional[str] = None, + connection: Optional[str] = None, + ) -> None: + """ + Push a job to the queue with a delay. + + Args: + delay: Delay in seconds before the job becomes available + job: Job instance to push + queue: Queue name (uses job's queue if not specified) + connection: Connection name (uses default if not specified) + """ + queue = queue or job.queue + driver = self.get_driver(connection) + + payload = job.serialize() + await driver.push(payload, queue, delay) + + logger.info( + f"Job {job.__class__.__name__} scheduled to queue '{queue}' with {delay}s delay" + ) + + async def bulk( + self, + jobs: list[Job], + queue: Optional[str] = None, + connection: Optional[str] = None, + ) -> None: + """ + Push multiple jobs to the queue. + + Args: + jobs: List of Job instances to push + queue: Queue name (uses each job's queue if not specified) + connection: Connection name (uses default if not specified) + """ + driver = self.get_driver(connection) + + for job in jobs: + q = queue or job.queue + payload = job.serialize() + await driver.push(payload, q) + + logger.info(f"Bulk dispatched {len(jobs)} jobs to queue") + + async def size( + self, + queue: str = "default", + connection: Optional[str] = None, + ) -> int: + """ + Get the size of the queue. + + Args: + queue: Queue name + connection: Connection name (uses default if not specified) + + Returns: + Number of jobs in the queue + """ + driver = self.get_driver(connection) + return await driver.size(queue) + + +# Global queue manager instance +queue = QueueManager() + + +# Helper function for dispatching jobs (Laravel-style) +async def dispatch(job: Job) -> None: + """ + Dispatch a job to the queue. + + Args: + job: Job instance to dispatch + + Example: + await dispatch(SendEmailJob(to="user@example.com", subject="Hello")) + """ + await queue.push(job) diff --git a/core/queue/queue_worker.py b/core/queue/queue_worker.py new file mode 100644 index 0000000..13af57e --- /dev/null +++ b/core/queue/queue_worker.py @@ -0,0 +1,243 @@ +import asyncio +import logging +import signal +import traceback +from typing import Optional + +from core.job import Job +from core.queue.queue_driver import QueueDriver +from core.queue.queue_manager import QueueManager + +logger = logging.getLogger("RouteMQ.QueueWorker") + + +class QueueWorker: + """ + Queue Worker for processing jobs from queues. + Similar to Laravel's queue:work command. + """ + + def __init__( + self, + queue_name: str = "default", + connection: Optional[str] = None, + max_jobs: Optional[int] = None, + max_time: Optional[int] = None, + sleep: int = 3, + max_tries: Optional[int] = None, + timeout: int = 60, + ): + """ + Initialize the queue worker. + + Args: + queue_name: Name of the queue to process + connection: Queue connection to use (redis or database) + max_jobs: Maximum number of jobs to process before stopping + max_time: Maximum time in seconds to run before stopping + sleep: Number of seconds to sleep when no job is available + max_tries: Maximum number of times to attempt a job + timeout: Maximum number of seconds a job can run + """ + self.queue_name = queue_name + self.connection = connection + self.max_jobs = max_jobs + self.max_time = max_time + self.sleep = sleep + self.max_tries = max_tries + self.timeout = timeout + + self.should_quit = False + self.paused = False + self.jobs_processed = 0 + self.start_time = None + + self.queue_manager = QueueManager() + self.driver: Optional[QueueDriver] = None + + # Setup signal handlers for graceful shutdown + signal.signal(signal.SIGTERM, self._handle_signal) + signal.signal(signal.SIGINT, self._handle_signal) + + def _handle_signal(self, signum, frame): + """Handle shutdown signals.""" + logger.info(f"Received signal {signum}, initiating graceful shutdown...") + self.should_quit = True + + async def work(self) -> None: + """ + Start processing jobs from the queue. + This is the main worker loop. + """ + logger.info( + f"Queue worker started for queue '{self.queue_name}' " + f"(connection: {self.connection or 'default'})" + ) + + self.driver = self.queue_manager.get_driver(self.connection) + self.start_time = asyncio.get_event_loop().time() + + while not self.should_quit: + # Check if we've reached max jobs or max time + if self._should_stop(): + logger.info("Worker stopping due to limits") + break + + # Check if paused + if self.paused: + await asyncio.sleep(self.sleep) + continue + + # Try to get a job from the queue + try: + job_data = await self.driver.pop(self.queue_name) + + if job_data: + await self._process_job(job_data) + self.jobs_processed += 1 + else: + # No jobs available, sleep + logger.debug(f"No jobs available, sleeping for {self.sleep}s") + await asyncio.sleep(self.sleep) + + except Exception as e: + logger.error(f"Error in worker loop: {str(e)}") + logger.debug(traceback.format_exc()) + await asyncio.sleep(self.sleep) + + logger.info( + f"Queue worker stopped. Processed {self.jobs_processed} jobs." + ) + + async def _process_job(self, job_data: dict) -> None: + """ + Process a single job. + + Args: + job_data: Job data from queue (id, payload, attempts) + """ + job_id = job_data["id"] + payload = job_data["payload"] + attempts = job_data["attempts"] + + logger.info(f"Processing job {job_id} (attempt {attempts})") + + try: + # Unserialize the job + job = Job.unserialize(payload) + job.job_id = job_id + job.attempts = attempts + + # Check if we've exceeded max tries + max_tries = self.max_tries or job.max_tries + if attempts > max_tries: + logger.warning( + f"Job {job_id} exceeded max tries ({max_tries}), moving to failed queue" + ) + await self._fail_job(job, Exception("Max tries exceeded")) + await self.driver.delete(job_id, self.queue_name) + return + + # Execute the job with timeout + try: + await asyncio.wait_for( + job.handle(), + timeout=job.timeout or self.timeout + ) + + # Job succeeded, delete from queue + await self.driver.delete(job_id, self.queue_name) + logger.info(f"Job {job_id} completed successfully") + + except asyncio.TimeoutError: + logger.error(f"Job {job_id} timed out after {job.timeout}s") + raise Exception(f"Job timed out after {job.timeout} seconds") + + except Exception as e: + logger.error(f"Job {job_id} failed: {str(e)}") + logger.debug(traceback.format_exc()) + + # Try to get the job object if it wasn't unserialized + try: + if 'job' not in locals(): + job = Job.unserialize(payload) + job.job_id = job_id + job.attempts = attempts + except Exception as unserialize_error: + logger.error(f"Failed to unserialize job: {unserialize_error}") + # Delete the corrupted job + await self.driver.delete(job_id, self.queue_name) + return + + # Check if we should retry + max_tries = self.max_tries or job.max_tries + if attempts < max_tries: + # Release back to queue for retry + delay = job.retry_after + await self.driver.release(job_id, self.queue_name, delay) + logger.info( + f"Job {job_id} released back to queue " + f"(attempt {attempts}/{max_tries}, delay: {delay}s)" + ) + else: + # Max tries exceeded, move to failed queue + await self._fail_job(job, e) + await self.driver.delete(job_id, self.queue_name) + + async def _fail_job(self, job: Job, exception: Exception) -> None: + """ + Handle a permanently failed job. + + Args: + job: The job that failed + exception: The exception that caused the failure + """ + try: + # Call the job's failed handler + await job.failed(exception) + + # Store in failed jobs table + exception_str = f"{exception.__class__.__name__}: {str(exception)}\n" + exception_str += traceback.format_exc() + + await self.driver.failed( + connection=self.connection or "default", + queue=self.queue_name, + payload=job.serialize(), + exception=exception_str, + ) + + logger.info(f"Job {job.job_id} moved to failed queue") + + except Exception as e: + logger.error(f"Error handling failed job: {str(e)}") + logger.debug(traceback.format_exc()) + + def _should_stop(self) -> bool: + """Check if the worker should stop based on limits.""" + # Check max jobs + if self.max_jobs and self.jobs_processed >= self.max_jobs: + return True + + # Check max time + if self.max_time and self.start_time: + elapsed = asyncio.get_event_loop().time() - self.start_time + if elapsed >= self.max_time: + return True + + return False + + def pause(self) -> None: + """Pause the worker.""" + self.paused = True + logger.info("Worker paused") + + def resume(self) -> None: + """Resume the worker.""" + self.paused = False + logger.info("Worker resumed") + + def stop(self) -> None: + """Stop the worker gracefully.""" + self.should_quit = True + logger.info("Worker stop requested") diff --git a/core/queue/redis_queue.py b/core/queue/redis_queue.py new file mode 100644 index 0000000..fb7b427 --- /dev/null +++ b/core/queue/redis_queue.py @@ -0,0 +1,282 @@ +import json +import logging +import time +from typing import Optional +from datetime import datetime + +from core.queue.queue_driver import QueueDriver +from core.redis_manager import RedisManager +from core.model import Model +from app.models.queue_failed_job import QueueFailedJob + +logger = logging.getLogger("RouteMQ.RedisQueue") + + +class RedisQueue(QueueDriver): + """ + Redis-backed queue driver using Redis lists and sorted sets. + Provides fast, in-memory job storage with delayed job support. + """ + + def __init__(self): + """Initialize the Redis queue driver.""" + self.redis = RedisManager() + self.connection_name = "redis" + + def _get_queue_key(self, queue: str) -> str: + """Get the Redis key for a queue.""" + return f"routemq:queue:{queue}" + + def _get_delayed_key(self, queue: str) -> str: + """Get the Redis key for delayed jobs in a queue.""" + return f"routemq:queue:{queue}:delayed" + + def _get_reserved_key(self, queue: str) -> str: + """Get the Redis key for reserved jobs in a queue.""" + return f"routemq:queue:{queue}:reserved" + + async def push( + self, + payload: str, + queue: str = "default", + delay: int = 0, + ) -> None: + """Push a new job onto the queue.""" + if not self.redis.is_enabled(): + logger.error("Cannot push job to Redis queue - Redis is disabled") + raise RuntimeError("Redis is disabled. Enable it to use RedisQueue.") + + client = self.redis.get_client() + try: + job_data = { + "id": f"{queue}:{int(time.time() * 1000000)}", # Unique ID with microseconds + "payload": payload, + "attempts": 0, + } + job_json = json.dumps(job_data) + + if delay > 0: + # Use sorted set for delayed jobs (score = available timestamp) + available_at = time.time() + delay + await client.zadd( + self._get_delayed_key(queue), + {job_json: available_at} + ) + logger.debug(f"Job pushed to delayed queue '{queue}' with {delay}s delay") + else: + # Use list for immediate jobs (FIFO) + await client.rpush(self._get_queue_key(queue), job_json) + logger.debug(f"Job pushed to queue '{queue}'") + + except Exception as e: + logger.error(f"Failed to push job to Redis queue: {str(e)}") + raise + + async def _migrate_delayed_jobs(self, queue: str) -> None: + """Move delayed jobs that are now available to the main queue.""" + if not self.redis.is_enabled(): + return + + client = self.redis.get_client() + try: + delayed_key = self._get_delayed_key(queue) + current_time = time.time() + + # Get all jobs that are now available (score <= current_time) + available_jobs = await client.zrangebyscore( + delayed_key, + "-inf", + current_time + ) + + if available_jobs: + # Move jobs to main queue + pipeline = client.pipeline() + for job_json in available_jobs: + pipeline.rpush(self._get_queue_key(queue), job_json) + pipeline.zrem(delayed_key, job_json) + await pipeline.execute() + + logger.debug(f"Migrated {len(available_jobs)} delayed jobs to queue '{queue}'") + + except Exception as e: + logger.error(f"Failed to migrate delayed jobs: {str(e)}") + + async def pop(self, queue: str = "default") -> Optional[dict]: + """Pop the next available job from the queue.""" + if not self.redis.is_enabled(): + logger.error("Cannot pop job from Redis queue - Redis is disabled") + return None + + client = self.redis.get_client() + try: + # First, migrate any delayed jobs that are now available + await self._migrate_delayed_jobs(queue) + + # Pop from main queue (FIFO) and move to reserved + job_json = await client.rpoplpush( + self._get_queue_key(queue), + self._get_reserved_key(queue) + ) + + if not job_json: + return None + + job_data = json.loads(job_json) + job_data["attempts"] += 1 + + # Update the reserved job with new attempt count + updated_job_json = json.dumps(job_data) + await client.lrem(self._get_reserved_key(queue), 1, job_json) + await client.rpush(self._get_reserved_key(queue), updated_job_json) + + logger.debug( + f"Job {job_data['id']} popped from queue '{queue}' (attempt {job_data['attempts']})" + ) + + return job_data + + except Exception as e: + logger.error(f"Failed to pop job from Redis queue: {str(e)}") + return None + + async def release( + self, + job_id: str, + queue: str, + delay: int = 0, + ) -> None: + """Release a job back to the queue for retry.""" + if not self.redis.is_enabled(): + logger.error("Cannot release job - Redis is disabled") + return + + client = self.redis.get_client() + try: + reserved_key = self._get_reserved_key(queue) + + # Find the job in reserved list + reserved_jobs = await client.lrange(reserved_key, 0, -1) + job_json = None + + for reserved_job in reserved_jobs: + job_data = json.loads(reserved_job) + if job_data["id"] == job_id: + job_json = reserved_job + break + + if not job_json: + logger.warning(f"Job {job_id} not found in reserved queue '{queue}'") + return + + # Remove from reserved + await client.lrem(reserved_key, 1, job_json) + + # Add back to queue (with delay if specified) + if delay > 0: + available_at = time.time() + delay + await client.zadd( + self._get_delayed_key(queue), + {job_json: available_at} + ) + else: + await client.rpush(self._get_queue_key(queue), job_json) + + logger.debug(f"Job {job_id} released back to queue '{queue}' with delay {delay}s") + + except Exception as e: + logger.error(f"Failed to release job: {str(e)}") + raise + + async def delete(self, job_id: str, queue: str) -> None: + """Delete a job from the queue.""" + if not self.redis.is_enabled(): + logger.error("Cannot delete job - Redis is disabled") + return + + client = self.redis.get_client() + try: + reserved_key = self._get_reserved_key(queue) + + # Find and remove the job from reserved list + reserved_jobs = await client.lrange(reserved_key, 0, -1) + + for job_json in reserved_jobs: + job_data = json.loads(job_json) + if job_data["id"] == job_id: + await client.lrem(reserved_key, 1, job_json) + logger.debug(f"Job {job_id} deleted from queue '{queue}'") + return + + logger.warning(f"Job {job_id} not found in reserved queue '{queue}'") + + except Exception as e: + logger.error(f"Failed to delete job: {str(e)}") + raise + + async def failed( + self, + connection: str, + queue: str, + payload: str, + exception: str, + ) -> None: + """ + Store a failed job. + If MySQL is enabled, store in database. Otherwise, store in Redis. + """ + try: + if Model._is_enabled: + # Store in database + session = await Model.get_session() + try: + failed_job = QueueFailedJob( + connection=connection, + queue=queue, + payload=payload, + exception=exception, + failed_at=datetime.utcnow(), + ) + session.add(failed_job) + await session.commit() + logger.info(f"Failed job stored in database for queue '{queue}'") + finally: + await session.close() + elif self.redis.is_enabled(): + # Fallback to Redis if database not available + client = self.redis.get_client() + failed_key = f"routemq:queue:failed:{queue}" + failed_data = { + "connection": connection, + "queue": queue, + "payload": payload, + "exception": exception, + "failed_at": datetime.utcnow().isoformat(), + } + await client.rpush(failed_key, json.dumps(failed_data)) + logger.info(f"Failed job stored in Redis for queue '{queue}'") + else: + logger.error("Cannot store failed job - both MySQL and Redis are disabled") + + except Exception as e: + logger.error(f"Failed to store failed job: {str(e)}") + + async def size(self, queue: str = "default") -> int: + """Get the size of the queue.""" + if not self.redis.is_enabled(): + logger.error("Cannot get queue size - Redis is disabled") + return 0 + + client = self.redis.get_client() + try: + # Count jobs in main queue + main_count = await client.llen(self._get_queue_key(queue)) + + # Count delayed jobs + delayed_count = await client.zcard(self._get_delayed_key(queue)) + + return main_count + delayed_count + + except Exception as e: + logger.error(f"Failed to get queue size: {str(e)}") + return 0 diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml new file mode 100644 index 0000000..b870ff4 --- /dev/null +++ b/docker-compose.dev.yml @@ -0,0 +1,88 @@ +# Docker Compose for Development +# Minimal setup without queue workers for local development + +services: + # Redis - Fast queue backend + redis: + image: redis:7-alpine + container_name: routemq-redis-dev + ports: + - "6379:6379" + networks: + - routemq-network + restart: unless-stopped + + # MySQL - Persistent storage + mysql: + image: mysql:8.0 + container_name: routemq-mysql-dev + environment: + MYSQL_ROOT_PASSWORD: ${DB_PASS:-rootpassword} + MYSQL_DATABASE: ${DB_NAME:-mqtt_framework} + MYSQL_USER: ${DB_USER:-routemq} + MYSQL_PASSWORD: ${DB_PASS:-routemq} + ports: + - "3306:3306" + volumes: + - mysql-dev-data:/var/lib/mysql + networks: + - routemq-network + restart: unless-stopped + command: --default-authentication-plugin=mysql_native_password + + # Main RouteMQ Application (for development only) + # Use `uv run python main.py --run` on host for hot reload + # This service is optional for development + routemq: + build: + context: . + args: + TIMEZONE: ${TIMEZONE:-Asia/Jakarta} + container_name: routemq-app-dev + environment: + MQTT_BROKER: ${MQTT_BROKER:-test.mosquitto.org} + MQTT_PORT: ${MQTT_PORT:-1883} + MQTT_USERNAME: ${MQTT_USERNAME:-} + MQTT_PASSWORD: ${MQTT_PASSWORD:-} + MQTT_GROUP_NAME: ${MQTT_GROUP_NAME:-mqtt_framework_group} + + ENABLE_MYSQL: ${ENABLE_MYSQL:-true} + DB_HOST: ${DB_HOST:-mysql} + DB_PORT: ${DB_PORT:-3306} + DB_NAME: ${DB_NAME:-mqtt_framework} + DB_USER: ${DB_USER:-routemq} + DB_PASS: ${DB_PASS:-routemq} + + ENABLE_REDIS: ${ENABLE_REDIS:-true} + REDIS_HOST: ${REDIS_HOST:-redis} + REDIS_PORT: ${REDIS_PORT:-6379} + REDIS_DB: ${REDIS_DB:-0} + + QUEUE_CONNECTION: ${QUEUE_CONNECTION:-redis} + + LOG_LEVEL: ${LOG_LEVEL:-DEBUG} + + depends_on: + - mysql + - redis + + networks: + - routemq-network + + volumes: + - ./app:/app/app + - ./core:/app/core + - ./bootstrap:/app/bootstrap + - ./logs:/app/logs + - ./.env:/app/.env:ro + + profiles: + - full # Use `docker compose --profile full up` to include this + +volumes: + mysql-dev-data: + driver: local + +networks: + routemq-network: + driver: bridge diff --git a/docker-compose.yml b/docker-compose.yml index 09e3a96..001ce28 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,47 @@ services: + # Redis - Fast queue backend + redis: + image: redis:7-alpine + container_name: routemq-redis + command: redis-server --appendonly yes + ports: + - "${REDIS_PORT:-6379}:6379" + volumes: + - redis-data:/data + networks: + - routemq-network + restart: unless-stopped + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 3 + + # MySQL - Persistent storage and database queue + mysql: + image: mysql:8.0 + container_name: routemq-mysql + environment: + MYSQL_ROOT_PASSWORD: ${DB_PASS:-rootpassword} + MYSQL_DATABASE: ${DB_NAME:-mqtt_framework} + MYSQL_USER: ${DB_USER:-routemq} + MYSQL_PASSWORD: ${DB_PASS:-routemq} + ports: + - "${DB_PORT:-3306}:3306" + volumes: + - mysql-data:/var/lib/mysql + networks: + - routemq-network + restart: unless-stopped + healthcheck: + test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "-u", "root", "-p${DB_PASS:-rootpassword}"] + interval: 10s + timeout: 5s + retries: 5 + start_period: 30s + command: --default-authentication-plugin=mysql_native_password + + # Main RouteMQ Application routemq: build: context: . @@ -12,24 +55,38 @@ services: MQTT_PASSWORD: ${MQTT_PASSWORD:-} MQTT_GROUP_NAME: ${MQTT_GROUP_NAME:-mqtt_framework_group} - ENABLE_MYSQL: ${ENABLE_MYSQL:-false} - DB_HOST: ${DB_HOST:-localhost} + ENABLE_MYSQL: ${ENABLE_MYSQL:-true} + DB_HOST: ${DB_HOST:-mysql} DB_PORT: ${DB_PORT:-3306} DB_NAME: ${DB_NAME:-mqtt_framework} - DB_USER: ${DB_USER:-root} - DB_PASS: ${DB_PASS:-} + DB_USER: ${DB_USER:-routemq} + DB_PASS: ${DB_PASS:-routemq} + + ENABLE_REDIS: ${ENABLE_REDIS:-true} + REDIS_HOST: ${REDIS_HOST:-redis} + REDIS_PORT: ${REDIS_PORT:-6379} + REDIS_DB: ${REDIS_DB:-0} + REDIS_PASSWORD: ${REDIS_PASSWORD:-} + + QUEUE_CONNECTION: ${QUEUE_CONNECTION:-redis} LOG_LEVEL: ${LOG_LEVEL:-INFO} LOG_FORMAT: "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + depends_on: + mysql: + condition: service_healthy + redis: + condition: service_healthy + deploy: resources: limits: - cpus: '1.0' # Limit to 1 CPU core - memory: 512M # Limit to 512MB RAM + cpus: '1.0' + memory: 512M reservations: - cpus: '0.5' # Reserve 0.5 CPU core - memory: 256M # Reserve 256MB RAM + cpus: '0.5' + memory: 256M networks: - routemq-network @@ -48,6 +105,168 @@ services: retries: 3 start_period: 40s + # Queue Worker - Default Queue + queue-worker-default: + build: + context: . + args: + TIMEZONE: ${TIMEZONE:-Asia/Jakarta} + container_name: routemq-queue-default + command: ["uv", "run", "python", "main.py", "--queue-work", "--queue", "default", "--sleep", "3"] + environment: + ENABLE_MYSQL: ${ENABLE_MYSQL:-true} + DB_HOST: ${DB_HOST:-mysql} + DB_PORT: ${DB_PORT:-3306} + DB_NAME: ${DB_NAME:-mqtt_framework} + DB_USER: ${DB_USER:-routemq} + DB_PASS: ${DB_PASS:-routemq} + + ENABLE_REDIS: ${ENABLE_REDIS:-true} + REDIS_HOST: ${REDIS_HOST:-redis} + REDIS_PORT: ${REDIS_PORT:-6379} + REDIS_DB: ${REDIS_DB:-0} + REDIS_PASSWORD: ${REDIS_PASSWORD:-} + + QUEUE_CONNECTION: ${QUEUE_CONNECTION:-redis} + + LOG_LEVEL: ${LOG_LEVEL:-INFO} + LOG_FORMAT: "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + + depends_on: + mysql: + condition: service_healthy + redis: + condition: service_healthy + + deploy: + resources: + limits: + cpus: '0.5' + memory: 256M + reservations: + cpus: '0.25' + memory: 128M + + networks: + - routemq-network + + restart: unless-stopped + + volumes: + - ./app:/app/app:ro + - ./logs:/app/logs + - ./.env:/app/.env:ro + + # Queue Worker - High Priority Queue + queue-worker-high: + build: + context: . + args: + TIMEZONE: ${TIMEZONE:-Asia/Jakarta} + container_name: routemq-queue-high + command: ["uv", "run", "python", "main.py", "--queue-work", "--queue", "high-priority", "--sleep", "1"] + environment: + ENABLE_MYSQL: ${ENABLE_MYSQL:-true} + DB_HOST: ${DB_HOST:-mysql} + DB_PORT: ${DB_PORT:-3306} + DB_NAME: ${DB_NAME:-mqtt_framework} + DB_USER: ${DB_USER:-routemq} + DB_PASS: ${DB_PASS:-routemq} + + ENABLE_REDIS: ${ENABLE_REDIS:-true} + REDIS_HOST: ${REDIS_HOST:-redis} + REDIS_PORT: ${REDIS_PORT:-6379} + REDIS_DB: ${REDIS_DB:-0} + REDIS_PASSWORD: ${REDIS_PASSWORD:-} + + QUEUE_CONNECTION: ${QUEUE_CONNECTION:-redis} + + LOG_LEVEL: ${LOG_LEVEL:-INFO} + LOG_FORMAT: "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + + depends_on: + mysql: + condition: service_healthy + redis: + condition: service_healthy + + deploy: + resources: + limits: + cpus: '0.5' + memory: 256M + reservations: + cpus: '0.25' + memory: 128M + + networks: + - routemq-network + + restart: unless-stopped + + volumes: + - ./app:/app/app:ro + - ./logs:/app/logs + - ./.env:/app/.env:ro + + # Queue Worker - Emails Queue + queue-worker-emails: + build: + context: . + args: + TIMEZONE: ${TIMEZONE:-Asia/Jakarta} + container_name: routemq-queue-emails + command: ["uv", "run", "python", "main.py", "--queue-work", "--queue", "emails", "--sleep", "5"] + environment: + ENABLE_MYSQL: ${ENABLE_MYSQL:-true} + DB_HOST: ${DB_HOST:-mysql} + DB_PORT: ${DB_PORT:-3306} + DB_NAME: ${DB_NAME:-mqtt_framework} + DB_USER: ${DB_USER:-routemq} + DB_PASS: ${DB_PASS:-routemq} + + ENABLE_REDIS: ${ENABLE_REDIS:-true} + REDIS_HOST: ${REDIS_HOST:-redis} + REDIS_PORT: ${REDIS_PORT:-6379} + REDIS_DB: ${REDIS_DB:-0} + REDIS_PASSWORD: ${REDIS_PASSWORD:-} + + QUEUE_CONNECTION: ${QUEUE_CONNECTION:-redis} + + LOG_LEVEL: ${LOG_LEVEL:-INFO} + LOG_FORMAT: "%(asctime)s - %(name)s - %(levelname)s - %(message)s" + + depends_on: + mysql: + condition: service_healthy + redis: + condition: service_healthy + + deploy: + resources: + limits: + cpus: '0.5' + memory: 256M + reservations: + cpus: '0.25' + memory: 128M + + networks: + - routemq-network + + restart: unless-stopped + + volumes: + - ./app:/app/app:ro + - ./logs:/app/logs + - ./.env:/app/.env:ro + +volumes: + redis-data: + driver: local + mysql-data: + driver: local + networks: routemq-network: driver: bridge diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 037b051..c7daaf4 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -35,6 +35,14 @@ * [Creating Models](database/creating-models.md) * [Database Operations](database/operations.md) * [Redis Integration](redis/README.md) +* [Queue System](queue/README.md) + * [Getting Started](queue/getting-started.md) + * [Creating Jobs](queue/creating-jobs.md) + * [Dispatching Jobs](queue/dispatching-jobs.md) + * [Running Workers](queue/running-workers.md) + * [Queue Drivers](queue/drivers.md) + * [Failed Jobs](queue/failed-jobs.md) + * [Best Practices](queue/best-practices.md) * [Rate Limiting](rate-limiting/README.md) * [Advanced Rate Limiting Features](rate-limiting/advanced-features.md) * [Basic Rate Limiting](rate-limiting/basic-rate-limiting.md) diff --git a/docs/docker-deployment.md b/docs/docker-deployment.md new file mode 100644 index 0000000..f111ceb --- /dev/null +++ b/docs/docker-deployment.md @@ -0,0 +1,596 @@ +# Docker Deployment Guide + +This guide explains how to deploy RouteMQ with queue workers using Docker and Docker Compose. + +## Table of Contents + +- [Architecture Overview](#architecture-overview) +- [Quick Start](#quick-start) +- [Production Deployment](#production-deployment) +- [Development Setup](#development-setup) +- [Scaling Workers](#scaling-workers) +- [Monitoring](#monitoring) +- [Troubleshooting](#troubleshooting) + +## Architecture Overview + +The Docker deployment includes: + +``` +โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” +โ”‚ Docker Network โ”‚ +โ”‚ โ”‚ +โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ +โ”‚ โ”‚ Redis โ”‚ โ”‚ MySQL โ”‚ โ”‚ RouteMQ โ”‚ โ”‚ +โ”‚ โ”‚ :6379 โ”‚ โ”‚ :3306 โ”‚ โ”‚ App โ”‚ โ”‚ +โ”‚ โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ +โ”‚ โ”‚ โ”‚ โ”‚ +โ”‚ โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ +โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ +โ”‚ โ”Œโ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ–ผโ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ +โ”‚ โ”‚ Queue โ”‚ โ”‚ Queue โ”‚ โ”‚ Queue โ”‚ โ”‚ +โ”‚ โ”‚ Worker โ”‚ โ”‚ Worker โ”‚ โ”‚ Worker โ”‚ โ”‚ +โ”‚ โ”‚(default)โ”‚ โ”‚ (high) โ”‚ โ”‚ (emails) โ”‚ โ”‚ +โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ +โ”‚ โ”‚ +โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ +``` + +### Services + +1. **redis** - Fast, in-memory queue backend +2. **mysql** - Persistent storage and database queue +3. **routemq** - Main MQTT application +4. **queue-worker-default** - Processes jobs from 'default' queue +5. **queue-worker-high** - Processes jobs from 'high-priority' queue (faster polling) +6. **queue-worker-emails** - Processes jobs from 'emails' queue + +## Quick Start + +### Prerequisites + +- Docker 20.10+ +- Docker Compose 2.0+ + +### 1. Prepare Environment File + +```bash +# Copy the Docker environment template +cp .env.docker .env + +# Edit with your settings +nano .env +``` + +### 2. Start All Services + +```bash +# Build and start all services +docker compose up -d + +# Check service status +docker compose ps + +# View logs +docker compose logs -f +``` + +### 3. Verify Deployment + +```bash +# Check RouteMQ app logs +docker compose logs routemq + +# Check queue worker logs +docker compose logs queue-worker-default + +# Check Redis connection +docker compose exec redis redis-cli ping + +# Check MySQL connection +docker compose exec mysql mysql -uroot -p${DB_PASS} -e "SHOW DATABASES;" +``` + +## Production Deployment + +### Environment Configuration + +Create a `.env` file with production settings: + +```env +# MQTT Broker (use your production broker) +MQTT_BROKER=mqtt.yourcompany.com +MQTT_PORT=1883 +MQTT_USERNAME=your_username +MQTT_PASSWORD=your_secure_password +MQTT_GROUP_NAME=routemq_production + +# MySQL (strong password!) +ENABLE_MYSQL=true +DB_HOST=mysql +DB_PORT=3306 +DB_NAME=routemq_production +DB_USER=routemq +DB_PASS=YOUR_STRONG_PASSWORD_HERE + +# Redis +ENABLE_REDIS=true +REDIS_HOST=redis +REDIS_PORT=6379 +REDIS_DB=0 +REDIS_PASSWORD= + +# Queue +QUEUE_CONNECTION=redis + +# Timezone +TIMEZONE=UTC + +# Logging +LOG_LEVEL=INFO +``` + +### Persistent Data + +Data is stored in Docker volumes: + +```bash +# List volumes +docker volume ls | grep routemq + +# Backup MySQL data +docker compose exec mysql mysqldump -uroot -p${DB_PASS} routemq_production > backup.sql + +# Backup Redis data +docker compose exec redis redis-cli SAVE +docker cp routemq-redis:/data/dump.rdb ./redis-backup.rdb +``` + +### Starting Services + +```bash +# Start all services in production mode +docker compose up -d + +# Verify all containers are running +docker compose ps + +# Check resource usage +docker stats +``` + +### Updating Deployment + +```bash +# Pull latest code +git pull origin main + +# Rebuild and restart +docker compose build +docker compose up -d + +# Or rebuild specific service +docker compose build routemq +docker compose up -d routemq +``` + +## Development Setup + +For local development, use the minimal development compose file: + +```bash +# Start only Redis and MySQL +docker compose -f docker-compose.dev.yml up -d + +# Run RouteMQ app on host for hot reload +uv run python main.py --run + +# Run queue worker on host +uv run python main.py --queue-work --queue default +``` + +Or start everything including the app: + +```bash +# Start all services including app +docker compose -f docker-compose.dev.yml --profile full up -d +``` + +## Scaling Workers + +### Add More Workers for Same Queue + +Edit `docker-compose.yml` to add more worker instances: + +```yaml +# Add a second worker for default queue +queue-worker-default-2: + build: + context: . + container_name: routemq-queue-default-2 + command: ["uv", "run", "python", "main.py", "--queue-work", "--queue", "default", "--sleep", "3"] + # ... same environment as queue-worker-default +``` + +Or scale using Docker Compose: + +```bash +# Scale default queue workers to 3 instances +docker compose up -d --scale queue-worker-default=3 +``` + +### Add Workers for New Queues + +To add a worker for a custom queue: + +```yaml +queue-worker-reports: + build: + context: . + container_name: routemq-queue-reports + command: ["uv", "run", "python", "main.py", "--queue-work", "--queue", "reports", "--sleep", "10"] + environment: + # ... same as other workers + depends_on: + - mysql + - redis + networks: + - routemq-network + restart: unless-stopped +``` + +### Resource Allocation + +Adjust resources per worker based on workload: + +```yaml +queue-worker-heavy: + deploy: + resources: + limits: + cpus: '1.0' # Allow 1 full CPU + memory: 512M # 512MB RAM + reservations: + cpus: '0.5' + memory: 256M +``` + +## Monitoring + +### View Logs + +```bash +# All services +docker compose logs -f + +# Specific service +docker compose logs -f queue-worker-default + +# Last 100 lines +docker compose logs --tail=100 routemq + +# Logs from specific time +docker compose logs --since 2h queue-worker-emails +``` + +### Resource Usage + +```bash +# Real-time stats +docker stats + +# Check specific container +docker stats routemq-queue-default +``` + +### Queue Status + +Connect to Redis to check queue status: + +```bash +# Connect to Redis CLI +docker compose exec redis redis-cli + +# Check queue length +LLEN routemq:queue:default +LLEN routemq:queue:emails + +# Check delayed jobs +ZCARD routemq:queue:default:delayed + +# View queue contents (first 10 items) +LRANGE routemq:queue:default 0 9 +``` + +Check database queue: + +```bash +# Connect to MySQL +docker compose exec mysql mysql -uroot -p${DB_PASS} routemq_production + +# Check queue jobs +SELECT queue, COUNT(*) as pending_jobs +FROM queue_jobs +WHERE reserved_at IS NULL +GROUP BY queue; + +# Check failed jobs +SELECT queue, COUNT(*) as failed_jobs +FROM queue_failed_jobs +GROUP BY queue; +``` + +### Health Checks + +All services include health checks: + +```bash +# View health status +docker compose ps + +# Manually check health +docker inspect routemq-redis | grep -A 10 Health +docker inspect routemq-mysql | grep -A 10 Health +docker inspect routemq-app | grep -A 10 Health +``` + +## Service Management + +### Start/Stop Services + +```bash +# Stop all services +docker compose stop + +# Start all services +docker compose start + +# Restart specific service +docker compose restart queue-worker-default + +# Stop and remove containers (keeps volumes) +docker compose down + +# Stop and remove everything including volumes +docker compose down -v +``` + +### Update Configuration + +After changing `.env`: + +```bash +# Recreate containers with new config +docker compose up -d --force-recreate + +# Or restart specific service +docker compose up -d --force-recreate routemq +``` + +## Troubleshooting + +### Workers Not Processing Jobs + +**Check worker logs:** +```bash +docker compose logs queue-worker-default +``` + +**Common issues:** +- Worker not connected to Redis/MySQL +- Queue name mismatch +- Jobs failing during processing + +**Solutions:** +```bash +# Restart worker +docker compose restart queue-worker-default + +# Check Redis connection +docker compose exec redis redis-cli ping + +# Check MySQL connection +docker compose exec mysql mysqladmin ping -h localhost +``` + +### Redis Connection Issues + +```bash +# Check if Redis is running +docker compose ps redis + +# Check Redis logs +docker compose logs redis + +# Test connection +docker compose exec redis redis-cli ping +``` + +### MySQL Connection Issues + +```bash +# Check if MySQL is running +docker compose ps mysql + +# Check MySQL logs +docker compose logs mysql + +# Test connection +docker compose exec mysql mysql -uroot -p${DB_PASS} -e "SELECT 1" +``` + +### High Memory Usage + +```bash +# Check memory usage +docker stats + +# Reduce worker limits in docker-compose.yml +queue-worker-default: + deploy: + resources: + limits: + memory: 128M # Reduce from 256M +``` + +### Container Won't Start + +```bash +# View detailed logs +docker compose logs [service-name] + +# Check for port conflicts +netstat -tulpn | grep :6379 +netstat -tulpn | grep :3306 + +# Remove and recreate +docker compose down +docker compose up -d +``` + +### Database Tables Not Created + +```bash +# Connect to RouteMQ container and create tables manually +docker compose exec routemq python -c " +import asyncio +from bootstrap.app import Application + +async def create_tables(): + app = Application() + await app.initialize_database() + print('Tables created!') + +asyncio.run(create_tables()) +" +``` + +## Best Practices + +### 1. Use Docker Secrets for Production + +Instead of plain text passwords in `.env`: + +```yaml +services: + mysql: + environment: + MYSQL_ROOT_PASSWORD_FILE: /run/secrets/mysql_root_password + secrets: + - mysql_root_password + +secrets: + mysql_root_password: + file: ./secrets/mysql_root_password.txt +``` + +### 2. Regular Backups + +Set up automated backups: + +```bash +#!/bin/bash +# backup.sh + +DATE=$(date +%Y%m%d_%H%M%S) + +# Backup MySQL +docker compose exec mysql mysqldump -uroot -p${DB_PASS} routemq_production > backup_mysql_${DATE}.sql + +# Backup Redis +docker compose exec redis redis-cli SAVE +docker cp routemq-redis:/data/dump.rdb backup_redis_${DATE}.rdb + +echo "Backup completed: ${DATE}" +``` + +### 3. Log Rotation + +Configure log rotation in `docker-compose.yml`: + +```yaml +routemq: + logging: + driver: "json-file" + options: + max-size: "10m" + max-file: "3" +``` + +### 4. Monitoring and Alerts + +Use Docker health checks and monitoring tools: + +```yaml +routemq: + healthcheck: + test: ["CMD", "python", "-c", "import paho.mqtt.client; print('OK')"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 40s +``` + +### 5. Resource Limits + +Always set resource limits in production: + +```yaml +deploy: + resources: + limits: + cpus: '1.0' + memory: 512M + reservations: + cpus: '0.5' + memory: 256M +``` + +## Example Deployment Scenarios + +### Scenario 1: Small Deployment (Single Server) + +```bash +# Use default docker-compose.yml +docker compose up -d +``` + +**Resources:** 1 app + 3 workers + Redis + MySQL + +### Scenario 2: Medium Deployment (High Load) + +Scale workers: + +```bash +docker compose up -d --scale queue-worker-default=5 --scale queue-worker-emails=3 +``` + +**Resources:** 1 app + 8+ workers + Redis + MySQL + +### Scenario 3: Development + +```bash +# Start only dependencies +docker compose -f docker-compose.dev.yml up -d + +# Run app on host +uv run python main.py --run + +# Run worker on host +uv run python main.py --queue-work +``` + +## Summary + +The Docker deployment provides: + +- โœ… Complete stack with Redis and MySQL +- โœ… Multiple queue workers out of the box +- โœ… Easy scaling and configuration +- โœ… Health checks and auto-restart +- โœ… Resource limits for stability +- โœ… Development and production setups + +For more information, see: +- [Queue System Documentation](./queue/README.md) +- [RouteMQ Documentation](../README.md) diff --git a/docs/queue/README.md b/docs/queue/README.md new file mode 100644 index 0000000..e680258 --- /dev/null +++ b/docs/queue/README.md @@ -0,0 +1,131 @@ +# Queue System + +RouteMQ includes a powerful background task queue system similar to Laravel's queue functionality. This allows you to defer time-consuming tasks (like sending emails, processing data, generating reports) to background workers, keeping your MQTT message handlers fast and responsive. + +## Overview + +The queue system consists of several components: + +- **Job**: A class that defines a task to be executed in the background +- **Queue Manager**: Dispatches jobs to queues +- **Queue Driver**: Handles storage and retrieval of jobs (Redis or Database) +- **Queue Worker**: Processes jobs from the queue + +### Architecture + +``` +โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” +โ”‚ Your Code โ”‚ +โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜ + โ”‚ dispatch(job) + โ–ผ +โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” +โ”‚Queue Managerโ”‚ +โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”˜ + โ”‚ push + โ–ผ +โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” +โ”‚ Queue Driver โ”‚ +โ”‚ (Redis/Database)โ”‚ +โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ + โ”‚ pop + โ–ผ +โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” +โ”‚Queue Worker โ”‚ +โ”‚ job.handle()โ”‚ +โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ +``` + +## Quick Start + +```python +# 1. Create a job +from core.job import Job + +class SendEmailJob(Job): + max_tries = 3 + queue = "emails" + + def __init__(self): + super().__init__() + self.to = None + self.subject = None + + async def handle(self): + # Send email logic + print(f"Sending email to {self.to}") + +# 2. Dispatch the job +from core.queue.queue_manager import dispatch + +job = SendEmailJob() +job.to = "user@example.com" +job.subject = "Welcome!" +await dispatch(job) + +# 3. Run the worker +# python main.py --queue-work --queue emails +``` + +## Key Features + +- โœ… **Laravel-style API** - Familiar syntax for Laravel developers +- โœ… **Two Queue Drivers** - Redis (fast) or Database (persistent) +- โœ… **Automatic Retries** - Configurable retry logic with delays +- โœ… **Multiple Queues** - Organize jobs by priority or type +- โœ… **Delayed Jobs** - Schedule jobs to run later +- โœ… **Failed Job Tracking** - Inspect and retry failed jobs +- โœ… **Docker Support** - Production-ready deployment +- โœ… **Graceful Shutdown** - Workers handle SIGTERM/SIGINT + +## Documentation + +- [Getting Started](./getting-started.md) - Installation and configuration +- [Creating Jobs](./creating-jobs.md) - Define background tasks +- [Dispatching Jobs](./dispatching-jobs.md) - Send jobs to queues +- [Running Workers](./running-workers.md) - Process jobs in background +- [Queue Drivers](./drivers.md) - Redis vs Database queues +- [Failed Jobs](./failed-jobs.md) - Handle and retry failures +- [Best Practices](./best-practices.md) - Tips for production use + +## Example Use Cases + +### 1. Email Notifications + +```python +from core.queue.queue_manager import dispatch +from app.jobs.send_email_job import SendEmailJob + +async def handle_user_signup(context): + job = SendEmailJob() + job.to = context["payload"]["email"] + job.template = "welcome" + await dispatch(job) +``` + +### 2. Data Processing + +```python +from core.queue.queue_manager import queue + +job = ProcessDataJob() +job.device_id = device_id +job.sensor_data = data +await queue.push(job, queue="data-processing") +``` + +### 3. Scheduled Reports + +```python +# Schedule report for 1 hour later +job = GenerateReportJob() +job.user_id = user_id +await queue.later(3600, job) # 3600 seconds = 1 hour +``` + +## Next Steps + +1. [Configure your queue](./getting-started.md) - Set up Redis or Database +2. [Create your first job](./creating-jobs.md) - Define a background task +3. [Dispatch jobs](./dispatching-jobs.md) - Send jobs from your code +4. [Run workers](./running-workers.md) - Process jobs in background diff --git a/docs/queue/best-practices.md b/docs/queue/best-practices.md new file mode 100644 index 0000000..6cb1d15 --- /dev/null +++ b/docs/queue/best-practices.md @@ -0,0 +1,537 @@ +# Best Practices + +Follow these best practices to build reliable and efficient queue-based systems with RouteMQ. + +## Job Design + +### 1. Keep Jobs Small and Focused + +Each job should do one thing well: + +```python +# โœ… Good - focused job +class SendWelcomeEmailJob(Job): + async def handle(self): + await send_email(self.user_id, "welcome") + +# โŒ Bad - doing too much +class UserSignupJob(Job): + async def handle(self): + await send_email() + await create_profile() + await setup_billing() + await send_sms() +``` + +**Why?** +- Easier to test and debug +- Better error handling +- Can retry individual steps +- More flexible composition + +### 2. Make Jobs Idempotent + +Jobs should be safe to run multiple times: + +```python +# โœ… Good - idempotent (SET operation) +class UpdateUserScoreJob(Job): + async def handle(self): + await db.execute( + "UPDATE users SET score = ? WHERE id = ?", + (self.new_score, self.user_id) + ) + +# โŒ Bad - not idempotent (INCREMENT operation) +class IncrementScoreJob(Job): + async def handle(self): + await db.execute( + "UPDATE users SET score = score + 10 WHERE id = ?", + (self.user_id,) + ) +``` + +**Why?** +- Jobs may be retried on failure +- Network issues can cause duplicates +- Worker crashes might re-process jobs + +### 3. Set Appropriate Timeouts + +```python +class QuickJob(Job): + timeout = 30 # Quick tasks (30 seconds) + +class DataProcessingJob(Job): + timeout = 300 # Data processing (5 minutes) + +class ReportGenerationJob(Job): + timeout = 600 # Long-running reports (10 minutes) +``` + +**Guidelines:** +- API calls: 30-60 seconds +- Data processing: 2-5 minutes +- Report generation: 5-10 minutes +- Don't exceed 10 minutes (consider breaking into smaller jobs) + +### 4. Use Descriptive Names + +```python +# โœ… Good - clear purpose +class SendPasswordResetEmailJob(Job): + pass + +class ProcessIoTSensorDataJob(Job): + pass + +class GenerateMonthlySalesReportJob(Job): + pass + +# โŒ Bad - unclear +class Job1(Job): + pass + +class ProcessJob(Job): + pass + +class DoStuff(Job): + pass +``` + +## Queue Organization + +### 5. Use Different Queues for Different Priorities + +```python +class CriticalAlertJob(Job): + queue = "critical" + max_tries = 5 + +class EmailJob(Job): + queue = "emails" + max_tries = 3 + +class CleanupJob(Job): + queue = "low-priority" + max_tries = 1 +``` + +Then run workers with appropriate settings: + +```bash +# Critical queue - check every second +python main.py --queue-work --queue critical --sleep 1 + +# Emails - normal priority +python main.py --queue-work --queue emails --sleep 3 + +# Low priority - check every 10 seconds +python main.py --queue-work --queue low-priority --sleep 10 +``` + +### 6. Organize by Function + +```python +# By type +queue = "emails" +queue = "reports" +queue = "notifications" + +# By priority +queue = "high-priority" +queue = "default" +queue = "low-priority" + +# By service +queue = "payment-processing" +queue = "data-sync" +queue = "cleanup" +``` + +## Data Handling + +### 7. Don't Store Large Payloads + +```python +# โœ… Good - store ID only +class ProcessOrderJob(Job): + def __init__(self): + super().__init__() + self.order_id = None # Small + + async def handle(self): + # Fetch data when needed + order = await fetch_order(self.order_id) + await process_order(order) + +# โŒ Bad - storing large data +class ProcessOrderJob(Job): + def __init__(self): + super().__init__() + self.order_data = None # Could be huge! + + async def handle(self): + await process_order(self.order_data) +``` + +**Why?** +- Keeps queue storage small +- Reduces serialization overhead +- Always gets fresh data +- Avoids stale data issues + +### 8. Handle Sensitive Data Carefully + +Don't store passwords or tokens in job payloads: + +```python +# โŒ Bad - storing credentials +class BadJob(Job): + def __init__(self): + super().__init__() + self.password = None # Stored in queue! + self.api_token = None # Visible in logs! + +# โœ… Good - fetch credentials when needed +class GoodJob(Job): + def __init__(self): + super().__init__() + self.user_id = None + + async def handle(self): + credentials = await get_user_credentials(self.user_id) + api_token = await get_api_token() + # Use credentials... +``` + +### 9. Validate Data Before Dispatching + +```python +def dispatch_email_job(to: str, subject: str, body: str): + # Validate before dispatching + if not to or '@' not in to: + raise ValueError("Invalid email address") + + if not subject: + raise ValueError("Subject is required") + + if len(body) > 10000: + raise ValueError("Body too long") + + job = SendEmailJob() + job.to = to + job.subject = subject + job.body = body + + return job +``` + +## Error Handling + +### 10. Always Implement failed() + +```python +class MyJob(Job): + async def failed(self, exception: Exception): + # Log the failure + logger.error(f"Job failed permanently: {exception}") + + # Clean up resources + await cleanup_resources(self.resource_id) + + # Notify stakeholders + await send_admin_alert(f"Job {self.__class__.__name__} failed") + + # Update status + await update_status(self.task_id, "failed") +``` + +### 11. Use Appropriate Retry Strategies + +```python +# Quick tasks - fail fast +class QuickAPICallJob(Job): + max_tries = 2 + retry_after = 5 + +# External services - be patient +class ExternalAPIJob(Job): + max_tries = 5 + retry_after = 60 + +# Critical operations - many retries +class CriticalJob(Job): + max_tries = 10 + retry_after = 300 +``` + +### 12. Log Appropriately + +```python +class MyJob(Job): + async def handle(self): + logger.info(f"Processing job {self.job_id} (attempt {self.attempts})") + logger.debug(f"Job data: {self.data}") + + try: + result = await do_work() + logger.info(f"Job completed: {result}") + except Exception as e: + logger.error(f"Job failed: {e}", exc_info=True) + raise +``` + +## Monitoring + +### 13. Monitor Queue Size + +```python +from core.queue.queue_manager import queue + +# Check queue size periodically +size = await queue.size("default") +if size > 1000: + logger.warning(f"Queue backlog: {size} jobs pending") + await send_alert("High queue backlog detected") +``` + +### 14. Track Processing Time + +```python +import time + +class MyJob(Job): + async def handle(self): + start_time = time.time() + + await do_work() + + elapsed = time.time() - start_time + logger.info(f"Job completed in {elapsed:.2f}s") + + # Alert on slow jobs + if elapsed > 60: + logger.warning(f"Slow job detected: {elapsed:.2f}s") +``` + +### 15. Monitor Failure Rates + +```python +async def check_failure_rate(): + """Alert if too many jobs are failing.""" + session = await Model.get_session() + + # Count failures in last hour + one_hour_ago = datetime.utcnow() - timedelta(hours=1) + result = await session.execute( + select(func.count(QueueFailedJob.id)) + .where(QueueFailedJob.failed_at >= one_hour_ago) + ) + failures = result.scalar() + + if failures > 100: # Threshold + await send_alert(f"High failure rate: {failures} jobs failed in last hour") + + await session.close() +``` + +## Performance + +### 16. Use Bulk Operations + +```python +# โœ… Good - bulk dispatch +jobs = [] +for user_id in user_ids: + job = SendNotificationJob() + job.user_id = user_id + jobs.append(job) + +await queue.bulk(jobs) # Single operation + +# โŒ Bad - individual dispatches +for user_id in user_ids: + job = SendNotificationJob() + job.user_id = user_id + await dispatch(job) # Multiple operations +``` + +### 17. Choose the Right Driver + +```python +# High volume, speed critical โ†’ Redis +QUEUE_CONNECTION=redis + +# Persistence critical, low volume โ†’ Database +QUEUE_CONNECTION=database +``` + +### 18. Scale Workers Appropriately + +```bash +# High load - scale up +docker compose up -d --scale queue-worker-default=10 + +# Low load - scale down +docker compose up -d --scale queue-worker-default=2 +``` + +## Deployment + +### 19. Use Process Managers + +```ini +# supervisor.conf +[program:routemq-queue] +command=/path/to/venv/bin/python main.py --queue-work +autostart=true +autorestart=true +startsecs=10 +stopwaitsecs=60 +``` + +### 20. Regular Maintenance + +```python +# Clean up old failed jobs weekly +async def weekly_cleanup(): + await cleanup_old_failed_jobs(days=30) + +# Monitor queue health daily +async def daily_health_check(): + for queue_name in ["default", "emails", "reports"]: + size = await queue.size(queue_name) + logger.info(f"Queue {queue_name}: {size} jobs") +``` + +## Testing + +### 21. Test Jobs in Isolation + +```python +import pytest + +@pytest.mark.asyncio +async def test_send_email_job(): + job = SendEmailJob() + job.to = "test@example.com" + job.subject = "Test" + job.body = "Test body" + + # Mock external services + with patch('app.jobs.send_email_job.send_email') as mock_send: + await job.handle() + mock_send.assert_called_once() +``` + +### 22. Test with Limited Retries + +```bash +# Test job with single retry +python main.py --queue-work --max-jobs 1 --max-tries 1 +``` + +## Common Anti-Patterns + +### โŒ Don't Block Workers + +```python +# โŒ Bad - blocking +class BadJob(Job): + async def handle(self): + time.sleep(10) # Blocks worker! + +# โœ… Good - non-blocking +class GoodJob(Job): + async def handle(self): + await asyncio.sleep(10) # Non-blocking +``` + +### โŒ Don't Chain Jobs Inside handle() + +```python +# โŒ Bad - chaining inside job +class BadJob(Job): + async def handle(self): + await do_work() + # Dispatching from inside job + next_job = AnotherJob() + await dispatch(next_job) + +# โœ… Good - dispatch from controller +async def handle_message(context): + job1 = FirstJob() + await dispatch(job1) + + job2 = SecondJob() + await dispatch(job2) +``` + +### โŒ Don't Store Job State in Class Variables + +```python +# โŒ Bad - class variable (shared across instances!) +class BadJob(Job): + counter = 0 # Shared! + + async def handle(self): + self.counter += 1 + +# โœ… Good - instance variable +class GoodJob(Job): + def __init__(self): + super().__init__() + self.counter = 0 # Per-instance + + async def handle(self): + self.counter += 1 +``` + +## Checklist + +Before deploying to production: + +- [ ] All jobs have descriptive names +- [ ] Jobs are idempotent +- [ ] Appropriate timeouts set +- [ ] `failed()` method implemented +- [ ] Sensitive data not in payloads +- [ ] Data validated before dispatch +- [ ] Proper logging in place +- [ ] Queue sizes monitored +- [ ] Workers managed by process manager +- [ ] Regular cleanup scheduled +- [ ] Tests written for jobs +- [ ] Documentation updated + +## Summary + +**Do:** +- Keep jobs small and focused +- Make jobs idempotent +- Set appropriate timeouts +- Use different queues for priorities +- Store IDs, not large data +- Handle sensitive data carefully +- Implement `failed()` method +- Monitor queue health +- Use bulk operations +- Test thoroughly + +**Don't:** +- Store large payloads +- Store sensitive data +- Chain jobs inside jobs +- Block workers +- Use class variables for state +- Skip error handling +- Forget to log +- Ignore failed jobs +- Over-complicate jobs + +## Next Steps + +- [Run workers in production](./running-workers.md) +- [Docker deployment guide](../docker-deployment.md) +- [Monitor and troubleshoot](./failed-jobs.md) diff --git a/docs/queue/creating-jobs.md b/docs/queue/creating-jobs.md new file mode 100644 index 0000000..55582be --- /dev/null +++ b/docs/queue/creating-jobs.md @@ -0,0 +1,351 @@ +# Creating Jobs + +Jobs are classes that extend the `Job` base class. Each job must implement the `handle()` method which contains the logic to be executed in the background. + +## Basic Job Structure + +```python +# app/jobs/send_notification_job.py +import logging +from core.job import Job + +logger = logging.getLogger("RouteMQ.Jobs.SendNotificationJob") + + +class SendNotificationJob(Job): + """Send a notification to a user.""" + + # Job configuration + max_tries = 3 # Maximum retry attempts + timeout = 60 # Maximum seconds to run + retry_after = 10 # Seconds to wait before retry + queue = "default" # Queue name + + def __init__(self): + super().__init__() + self.user_id = None + self.message = None + + async def handle(self) -> None: + """Execute the job.""" + logger.info(f"Sending notification to user {self.user_id}") + logger.info(f"Message: {self.message}") + + # Your notification logic here + # e.g., send push notification, SMS, email, etc. + + logger.info("Notification sent successfully") + + async def failed(self, exception: Exception) -> None: + """Called when the job fails permanently.""" + logger.error( + f"Failed to send notification to user {self.user_id}: {str(exception)}" + ) + # Handle failure (e.g., log to monitoring service, alert admin) +``` + +## Job Properties + +Configure your job's behavior with these class attributes: + +| Property | Description | Default | Example | +|----------|-------------|---------|---------| +| `max_tries` | Maximum number of retry attempts | 3 | `max_tries = 5` | +| `timeout` | Maximum seconds the job can run | 60 | `timeout = 120` | +| `retry_after` | Seconds to wait before retrying after failure | 0 | `retry_after = 30` | +| `queue` | Queue name | "default" | `queue = "emails"` | + +## Custom Data in Jobs + +All public instance attributes are automatically serialized and restored when the job is processed: + +```python +class ProcessOrderJob(Job): + def __init__(self): + super().__init__() + self.order_id = None + self.customer_email = None + self.items = [] # Lists and dicts are supported + self.metadata = {} # Nested structures work too + + async def handle(self): + # All attributes are available + print(f"Processing order {self.order_id}") + print(f"Customer: {self.customer_email}") + print(f"Items: {self.items}") + print(f"Metadata: {self.metadata}") +``` + +### Supported Data Types + +- โœ… Strings, integers, floats, booleans +- โœ… Lists and tuples +- โœ… Dictionaries +- โœ… None values +- โŒ Objects (they won't serialize - store IDs instead) +- โŒ File handles +- โŒ Database connections + +## The handle() Method + +The `handle()` method is where your job's logic lives: + +```python +async def handle(self) -> None: + """ + Execute the job. + This method is called by the queue worker. + """ + # Your job logic here + pass +``` + +**Key points:** +- Must be `async` (asynchronous) +- Should return `None` +- Can raise exceptions (will trigger retry) +- Has access to `self.attempts` (current attempt number) +- Has access to `self.job_id` (unique job identifier) + +## The failed() Method + +The `failed()` method is called when a job fails permanently (exceeds `max_tries`): + +```python +async def failed(self, exception: Exception) -> None: + """ + Handle permanent job failure. + Called after max_tries is exceeded. + + Args: + exception: The exception that caused the final failure + """ + logger.error(f"Job failed permanently: {exception}") + + # Send alert + await send_admin_alert(f"Job failed: {self.__class__.__name__}") + + # Log to monitoring service + await log_to_sentry(exception) + + # Store for manual review + # ... custom logic ... +``` + +## Job Examples + +### Example 1: Email Job + +```python +import asyncio +from core.job import Job + +class SendEmailJob(Job): + max_tries = 3 + timeout = 30 + retry_after = 10 + queue = "emails" + + def __init__(self): + super().__init__() + self.to = None + self.subject = None + self.body = None + + async def handle(self): + # Simulate email sending + await asyncio.sleep(2) + + # In production, use real email service + # await send_email(self.to, self.subject, self.body) + + print(f"Email sent to {self.to}") + + async def failed(self, exception: Exception): + print(f"Failed to send email to {self.to}: {exception}") +``` + +### Example 2: Data Processing Job + +```python +from core.job import Job + +class ProcessDataJob(Job): + max_tries = 5 + timeout = 120 # Longer timeout for data processing + retry_after = 5 + queue = "data-processing" + + def __init__(self): + super().__init__() + self.device_id = None + self.sensor_data = None + + async def handle(self): + # Process sensor data + temperature = self.sensor_data.get("temperature") + humidity = self.sensor_data.get("humidity") + + # Calculate statistics + if temperature and temperature > 30: + await send_alert(f"High temperature: {temperature}ยฐC") + + # Store in database + # await store_sensor_data(self.device_id, self.sensor_data) + + print(f"Processed data from device {self.device_id}") + + async def failed(self, exception: Exception): + print(f"Failed to process data from {self.device_id}") +``` + +### Example 3: Report Generation Job + +```python +from datetime import datetime +from core.job import Job + +class GenerateReportJob(Job): + max_tries = 2 + timeout = 300 # 5 minutes for report generation + retry_after = 60 + queue = "reports" + + def __init__(self): + super().__init__() + self.report_type = None + self.user_id = None + + async def handle(self): + # Generate report + report_file = f"{self.report_type}_{datetime.now().strftime('%Y%m%d')}.pdf" + + # In production: + # - Query database + # - Generate PDF + # - Upload to storage + # - Send notification + + print(f"Report generated: {report_file}") + + async def failed(self, exception: Exception): + # Notify user that report generation failed + print(f"Failed to generate report for user {self.user_id}") +``` + +## Job Lifecycle + +Understanding how jobs are processed: + +``` +1. Job Created + โ†“ +2. Job Serialized + โ†“ +3. Job Pushed to Queue + โ†“ +4. Worker Pops Job + โ†“ +5. Job Deserialized + โ†“ +6. handle() Called + โ†“ + โ”œโ”€ Success โ†’ Job Deleted + โ”‚ + โ””โ”€ Failure โ†’ attempts < max_tries? + โ”œโ”€ Yes โ†’ Release Back to Queue (with delay) + โ””โ”€ No โ†’ failed() Called โ†’ Move to Failed Jobs +``` + +## Best Practices + +### 1. Keep Jobs Small and Focused + +```python +# โœ… Good - focused on one task +class SendWelcomeEmailJob(Job): + async def handle(self): + await send_email(self.user_id, "welcome") + +# โŒ Bad - doing too much +class UserSignupJob(Job): + async def handle(self): + await send_email() + await create_profile() + await setup_billing() + await send_sms() +``` + +### 2. Make Jobs Idempotent + +Jobs should be safe to run multiple times: + +```python +# โœ… Good - idempotent (SET operation) +class UpdateUserScoreJob(Job): + async def handle(self): + await db.execute( + "UPDATE users SET score = ? WHERE id = ?", + (self.new_score, self.user_id) + ) + +# โŒ Bad - not idempotent (INCREMENT operation) +class IncrementScoreJob(Job): + async def handle(self): + await db.execute( + "UPDATE users SET score = score + 10 WHERE id = ?", + (self.user_id,) + ) +``` + +### 3. Set Appropriate Timeouts + +```python +class QuickJob(Job): + timeout = 30 # Quick tasks + +class DataProcessingJob(Job): + timeout = 300 # Data processing + +class ReportJob(Job): + timeout = 600 # Long-running reports +``` + +### 4. Use Descriptive Class Names + +```python +# โœ… Good +class SendPasswordResetEmailJob(Job): + pass + +class ProcessIoTSensorDataJob(Job): + pass + +# โŒ Bad +class Job1(Job): + pass + +class DoStuff(Job): + pass +``` + +### 5. Log Appropriately + +```python +class MyJob(Job): + async def handle(self): + logger.info(f"Processing job {self.job_id} (attempt {self.attempts})") + + try: + # ... work ... + logger.info("Job completed successfully") + except Exception as e: + logger.error(f"Job failed: {e}", exc_info=True) + raise +``` + +## Next Steps + +- [Learn how to dispatch jobs](./dispatching-jobs.md) +- [Run queue workers](./running-workers.md) +- [Handle failed jobs](./failed-jobs.md) diff --git a/docs/queue/dispatching-jobs.md b/docs/queue/dispatching-jobs.md new file mode 100644 index 0000000..db399dd --- /dev/null +++ b/docs/queue/dispatching-jobs.md @@ -0,0 +1,439 @@ +# Dispatching Jobs + +Once you've created a job, you need to dispatch it to the queue for processing. RouteMQ provides several methods for dispatching jobs. + +## Using the dispatch() Helper + +The simplest way to dispatch a job: + +```python +from core.queue.queue_manager import dispatch +from app.jobs.send_notification_job import SendNotificationJob + +# In your MQTT handler or anywhere in your code +async def handle_message(context): + # Create and configure the job + job = SendNotificationJob() + job.user_id = 123 + job.message = "Your order has been shipped!" + + # Dispatch to queue + await dispatch(job) +``` + +The `dispatch()` helper: +- Uses the queue specified in the job (`job.queue`) +- Uses the default connection from `.env` +- Returns immediately after pushing to queue + +## Using the Queue Manager + +For more control, use the `QueueManager` directly: + +```python +from core.queue.queue_manager import queue +from app.jobs.send_email_job import SendEmailJob + +# Create job +job = SendEmailJob() +job.to = "user@example.com" +job.subject = "Welcome!" +job.body = "Thanks for signing up!" + +# Dispatch to default queue +await queue.push(job) + +# Dispatch to specific queue +await queue.push(job, queue="emails") + +# Dispatch to specific connection +await queue.push(job, connection="database") +``` + +## Delayed Jobs + +Schedule a job to run after a delay: + +```python +from core.queue.queue_manager import queue +from app.jobs.generate_report_job import GenerateReportJob + +job = GenerateReportJob() +job.report_type = "monthly" +job.user_id = 456 + +# Run after 1 hour (3600 seconds) +await queue.later(3600, job) + +# Run after 5 minutes +await queue.later(300, job) + +# Run after 24 hours +await queue.later(86400, job) +``` + +**How delayed jobs work:** +- Job is stored with an `available_at` timestamp +- Worker ignores the job until the timestamp is reached +- Redis uses sorted sets for efficient delay handling +- Database uses datetime comparison + +## Bulk Dispatching + +Dispatch multiple jobs at once: + +```python +from core.queue.queue_manager import queue +from app.jobs.send_notification_job import SendNotificationJob + +jobs = [] +for user_id in user_ids: + job = SendNotificationJob() + job.user_id = user_id + job.message = "System maintenance tonight" + jobs.append(job) + +# Dispatch all jobs +await queue.bulk(jobs) +``` + +This is more efficient than dispatching jobs one by one in a loop. + +## Dispatching from MQTT Handlers + +### Example 1: Dispatch from Controller + +```python +# app/controllers/order_controller.py +from core.controller import Controller +from core.queue.queue_manager import dispatch +from app.jobs.process_order_job import ProcessOrderJob + + +class OrderController(Controller): + @staticmethod + async def handle_new_order(order_id: str, payload, client): + print(f"Received new order {order_id}") + + # Dispatch background job for processing + job = ProcessOrderJob() + job.order_id = order_id + job.order_data = payload + await dispatch(job) + + # Respond immediately without waiting for processing + return {"status": "accepted", "order_id": order_id} +``` + +### Example 2: Dispatch with Middleware + +```python +# app/middleware/queue_middleware.py +from core.middleware import Middleware +from core.queue.queue_manager import dispatch +from app.jobs.log_message_job import LogMessageJob + + +class QueueMiddleware(Middleware): + async def handle(self, context, next_handler): + # Dispatch logging job + job = LogMessageJob() + job.topic = context['topic'] + job.payload = context['payload'] + await dispatch(job) + + # Continue processing + return await next_handler(context) +``` + +### Example 3: Conditional Dispatching + +```python +async def handle_sensor_data(device_id: str, payload, client): + temperature = payload.get('temperature') + + # Only queue processing for high temperatures + if temperature > 30: + job = ProcessHighTempJob() + job.device_id = device_id + job.temperature = temperature + await dispatch(job) + + return {"status": "received"} +``` + +## Checking Queue Size + +Monitor how many jobs are pending: + +```python +from core.queue.queue_manager import queue + +# Check queue size +size = await queue.size("default") +print(f"Pending jobs: {size}") + +# Check multiple queues +for queue_name in ["default", "emails", "reports"]: + size = await queue.size(queue_name) + print(f"{queue_name}: {size} jobs") +``` + +## Queue Manager API Reference + +### push() + +Push a job to the queue immediately. + +```python +await queue.push( + job, # Job instance (required) + queue="default", # Queue name (optional) + connection="redis" # Connection (optional) +) +``` + +### later() + +Push a job with a delay. + +```python +await queue.later( + delay, # Delay in seconds (required) + job, # Job instance (required) + queue="default", # Queue name (optional) + connection="redis" # Connection (optional) +) +``` + +### bulk() + +Push multiple jobs at once. + +```python +await queue.bulk( + jobs, # List of Job instances (required) + queue="default", # Queue name (optional) + connection="redis" # Connection (optional) +) +``` + +### size() + +Get the number of pending jobs in a queue. + +```python +size = await queue.size( + queue="default", # Queue name (optional) + connection="redis" # Connection (optional) +) +``` + +## Common Patterns + +### Pattern 1: Fan-out + +Dispatch multiple jobs from one event: + +```python +async def handle_user_signup(user_id, email): + # Send welcome email + email_job = SendEmailJob() + email_job.to = email + email_job.template = "welcome" + await dispatch(email_job) + + # Create user profile + profile_job = CreateProfileJob() + profile_job.user_id = user_id + await dispatch(profile_job) + + # Send SMS verification + sms_job = SendSMSJob() + sms_job.user_id = user_id + await dispatch(sms_job) +``` + +### Pattern 2: Delayed Chain + +Schedule a series of jobs: + +```python +from core.queue.queue_manager import queue + +# Send welcome email immediately +welcome_job = SendEmailJob() +welcome_job.to = email +welcome_job.template = "welcome" +await dispatch(welcome_job) + +# Send tips email after 1 day +tips_job = SendEmailJob() +tips_job.to = email +tips_job.template = "tips" +await queue.later(86400, tips_job) # 24 hours + +# Send feedback request after 7 days +feedback_job = SendEmailJob() +feedback_job.to = email +feedback_job.template = "feedback" +await queue.later(604800, feedback_job) # 7 days +``` + +### Pattern 3: Priority Queues + +Use different queues for different priorities: + +```python +# High priority - immediate processing +if is_urgent: + job.queue = "high-priority" + await dispatch(job) + +# Normal priority +elif is_normal: + job.queue = "default" + await dispatch(job) + +# Low priority - background cleanup +else: + job.queue = "low-priority" + await dispatch(job) +``` + +Then run workers with appropriate settings: + +```bash +# High priority - check every second +python main.py --queue-work --queue high-priority --sleep 1 + +# Normal priority - check every 3 seconds +python main.py --queue-work --queue default --sleep 3 + +# Low priority - check every 10 seconds +python main.py --queue-work --queue low-priority --sleep 10 +``` + +### Pattern 4: Rate Limiting + +Prevent overwhelming external services: + +```python +async def send_to_external_api(data): + # Dispatch job instead of calling API directly + job = ExternalAPIJob() + job.data = data + await dispatch(job) + + # Worker processes these at controlled rate + # Can even add delays: await queue.later(5, job) +``` + +## Error Handling + +### Handle Dispatch Errors + +```python +from core.queue.queue_manager import dispatch + +try: + job = SendEmailJob() + job.to = "user@example.com" + await dispatch(job) + print("Job dispatched successfully") + +except Exception as e: + print(f"Failed to dispatch job: {e}") + # Handle error (log, retry, alert, etc.) +``` + +### Verify Job Data + +```python +def create_email_job(to, subject, body): + # Validate data before dispatching + if not to or '@' not in to: + raise ValueError("Invalid email address") + + if not subject: + raise ValueError("Subject is required") + + job = SendEmailJob() + job.to = to + job.subject = subject + job.body = body + + return job + +# Use it +try: + job = create_email_job(email, subject, body) + await dispatch(job) +except ValueError as e: + print(f"Invalid job data: {e}") +``` + +## Best Practices + +### 1. Dispatch Early, Process Later + +```python +# โœ… Good - dispatch and return quickly +async def handle_order(order_data): + job = ProcessOrderJob() + job.order_data = order_data + await dispatch(job) + return {"status": "accepted"} + +# โŒ Bad - blocking the handler +async def handle_order(order_data): + await process_order(order_data) # Takes 30 seconds! + return {"status": "processed"} +``` + +### 2. Don't Dispatch Too Much Data + +```python +# โœ… Good - store ID only +job = ProcessOrderJob() +job.order_id = order_id # Small +await dispatch(job) + +# In handle(): +# order = await fetch_order(self.order_id) + +# โŒ Bad - storing large data +job = ProcessOrderJob() +job.order_data = huge_dictionary # Large payload +await dispatch(job) +``` + +### 3. Use Appropriate Delays + +```python +# โœ… Good - reasonable delays +await queue.later(60, job) # 1 minute +await queue.later(3600, job) # 1 hour +await queue.later(86400, job) # 1 day + +# โŒ Bad - very long delays +await queue.later(31536000, job) # 1 year - use a scheduler instead +``` + +### 4. Choose the Right Queue + +```python +# โœ… Good - organized by purpose +email_job.queue = "emails" +report_job.queue = "reports" +cleanup_job.queue = "low-priority" + +# โŒ Bad - everything in default +job.queue = "default" # For all jobs +``` + +## Next Steps + +- [Learn how to run queue workers](./running-workers.md) +- [Understand queue drivers](./drivers.md) +- [Handle failed jobs](./failed-jobs.md) diff --git a/docs/queue/drivers.md b/docs/queue/drivers.md new file mode 100644 index 0000000..feded34 --- /dev/null +++ b/docs/queue/drivers.md @@ -0,0 +1,441 @@ +# Queue Drivers + +RouteMQ supports two queue drivers: Redis and Database. This guide explains how they work and when to use each. + +## Overview + +Queue drivers handle the storage and retrieval of jobs. The driver you choose affects: + +- **Performance** - How fast jobs are queued and processed +- **Persistence** - Whether jobs survive crashes +- **Infrastructure** - What services you need to run +- **Scalability** - How well it handles high job volumes + +## Redis Queue Driver + +Fast, in-memory queue backed by Redis. + +### Features + +- โœ… **Very fast** - In-memory storage for low latency +- โœ… **High throughput** - Handles thousands of jobs/second +- โœ… **Atomic operations** - Uses RPOPLPUSH for safe job claiming +- โœ… **Delayed jobs** - Efficient sorted sets for scheduling +- โœ… **Scalable** - Easy to cluster Redis for more capacity + +###Requirements + +- Redis server running (v5.0+) +- `ENABLE_REDIS=true` in `.env` +- `QUEUE_CONNECTION=redis` in `.env` + +### Configuration + +```env +# Enable Redis +ENABLE_REDIS=true +REDIS_HOST=localhost +REDIS_PORT=6379 +REDIS_DB=0 +REDIS_PASSWORD= + +# Use Redis for queue +QUEUE_CONNECTION=redis +``` + +### Data Structures + +Redis queue uses different data structures for different purposes: + +| Structure | Purpose | Type | +|-----------|---------|------| +| `routemq:queue:{name}` | Pending jobs | List (FIFO) | +| `routemq:queue:{name}:delayed` | Delayed jobs | Sorted Set (by timestamp) | +| `routemq:queue:{name}:reserved` | Processing jobs | List | +| `routemq:queue:failed:{name}` | Failed jobs | List | + +### How It Works + +**Pushing a job:** +``` +1. Job serialized to JSON +2. RPUSH to routemq:queue:{name} +3. Job ID returned +``` + +**Popping a job:** +``` +1. Check for delayed jobs ready to process +2. RPOPLPUSH from pending to reserved +3. Increment attempts +4. Return job data +``` + +**Completing a job:** +``` +1. LREM from reserved list +2. Job deleted +``` + +**Failing a job:** +``` +1. If attempts < max_tries: + - LREM from reserved + - RPUSH back to pending (or delayed) +2. Else: + - Move to failed list + - LREM from reserved +``` + +### Advantages + +- **Speed**: Sub-millisecond latency +- **Throughput**: Handle high job volumes +- **Simple**: No complex queries needed +- **Scalable**: Easy to add Redis replicas + +### Disadvantages + +- **Volatility**: Jobs lost if Redis crashes (unless AOF enabled) +- **Infrastructure**: Requires Redis server +- **Memory**: All jobs stored in RAM + +### Best For + +- High-volume applications +- Real-time job processing +- When you already use Redis +- When speed is critical + +## Database Queue Driver + +Persistent queue backed by MySQL. + +### Features + +- โœ… **Persistent** - Jobs survive crashes +- โœ… **ACID** - Transactional guarantees +- โœ… **Inspectable** - Easy to query with SQL +- โœ… **Reliable** - No job loss +- โœ… **No extra service** - Uses existing MySQL + +### Requirements + +- MySQL server running (v8.0+) +- `ENABLE_MYSQL=true` in `.env` +- `QUEUE_CONNECTION=database` in `.env` + +### Configuration + +```env +# Enable MySQL +ENABLE_MYSQL=true +DB_HOST=localhost +DB_PORT=3306 +DB_NAME=mqtt_framework +DB_USER=root +DB_PASS=your_password + +# Use database for queue +QUEUE_CONNECTION=database +``` + +### Database Tables + +**queue_jobs:** + +| Column | Type | Description | +|--------|------|-------------| +| `id` | INT | Primary key | +| `queue` | VARCHAR(255) | Queue name | +| `payload` | TEXT | Serialized job data | +| `attempts` | INT | Number of attempts | +| `reserved_at` | DATETIME | When job was claimed (NULL if pending) | +| `available_at` | DATETIME | When job becomes available | +| `created_at` | DATETIME | When job was created | + +**queue_failed_jobs:** + +| Column | Type | Description | +|--------|------|-------------| +| `id` | INT | Primary key | +| `connection` | VARCHAR(255) | Connection name | +| `queue` | VARCHAR(255) | Queue name | +| `payload` | TEXT | Serialized job data | +| `exception` | TEXT | Exception details | +| `failed_at` | DATETIME | When job failed | + +### How It Works + +**Pushing a job:** +```sql +INSERT INTO queue_jobs +(queue, payload, attempts, available_at, created_at) +VALUES (?, ?, 0, ?, NOW()) +``` + +**Popping a job:** +```sql +-- Use FOR UPDATE SKIP LOCKED for concurrency +SELECT * FROM queue_jobs +WHERE queue = ? + AND reserved_at IS NULL + AND available_at <= NOW() +ORDER BY id +LIMIT 1 +FOR UPDATE SKIP LOCKED; + +-- Mark as reserved +UPDATE queue_jobs +SET reserved_at = NOW(), attempts = attempts + 1 +WHERE id = ? +``` + +**Completing a job:** +```sql +DELETE FROM queue_jobs WHERE id = ? +``` + +**Failing a job:** +```sql +-- If retrying +UPDATE queue_jobs +SET reserved_at = NULL, + available_at = DATE_ADD(NOW(), INTERVAL ? SECOND) +WHERE id = ? + +-- If permanently failed +INSERT INTO queue_failed_jobs +(connection, queue, payload, exception, failed_at) +VALUES (?, ?, ?, ?, NOW()); + +DELETE FROM queue_jobs WHERE id = ? +``` + +### Advantages + +- **Persistence**: Jobs survive crashes +- **Reliability**: ACID transactions +- **Visibility**: Easy to inspect with SQL +- **Simplicity**: No additional infrastructure + +### Disadvantages + +- **Speed**: Slower than Redis (disk I/O) +- **Load**: Increases database queries +- **Scaling**: Harder to scale than Redis + +### Best For + +- Low to medium job volumes +- When persistence is critical +- When you don't want to manage Redis +- When you need to inspect jobs easily + +## Comparison + +| Feature | Redis | Database | +|---------|-------|----------| +| **Speed** | โญโญโญโญโญ Very Fast | โญโญโญ Moderate | +| **Persistence** | โญโญ Configurable | โญโญโญโญโญ Always | +| **Scalability** | โญโญโญโญโญ Excellent | โญโญโญ Good | +| **Reliability** | โญโญโญ Good | โญโญโญโญโญ Excellent | +| **Setup** | โญโญโญ Need Redis | โญโญโญโญโญ Use existing DB | +| **Inspection** | โญโญโญ Redis CLI | โญโญโญโญโญ SQL queries | +| **Memory** | โญโญ All in RAM | โญโญโญโญโญ On disk | + +## Switching Drivers + +You can switch drivers at any time: + +```env +# Change in .env +QUEUE_CONNECTION=redis # or 'database' +``` + +**Important notes:** +- Jobs in the old driver won't be transferred +- Complete existing jobs before switching +- Or manually migrate jobs between drivers + +## Performance Tips + +### Redis + +```env +# Use a dedicated Redis database +REDIS_DB=1 # Separate from cache + +# Enable persistence (optional) +# In redis.conf: +# appendonly yes +# appendfsync everysec +``` + +### Database + +```sql +-- Add indexes for performance +CREATE INDEX idx_queue_reserved ON queue_jobs(queue, reserved_at); +CREATE INDEX idx_available ON queue_jobs(available_at); + +-- Monitor slow queries +SHOW FULL PROCESSLIST; +``` + +## Monitoring + +### Redis + +```bash +# Connect to Redis +redis-cli + +# Check queue sizes +LLEN routemq:queue:default +LLEN routemq:queue:emails +LLEN routemq:queue:reports + +# Check delayed jobs +ZCARD routemq:queue:default:delayed + +# View pending jobs +LRANGE routemq:queue:default 0 9 + +# Check memory usage +INFO memory + +# Monitor commands +MONITOR +``` + +### Database + +```sql +-- Check pending jobs by queue +SELECT queue, COUNT(*) as pending_jobs +FROM queue_jobs +WHERE reserved_at IS NULL +GROUP BY queue; + +-- Check processing jobs +SELECT queue, COUNT(*) as processing_jobs +FROM queue_jobs +WHERE reserved_at IS NOT NULL +GROUP BY queue; + +-- Check job age +SELECT queue, + MIN(created_at) as oldest_job, + MAX(created_at) as newest_job, + COUNT(*) as total_jobs +FROM queue_jobs +GROUP BY queue; + +-- Check failed jobs +SELECT queue, COUNT(*) as failed_jobs +FROM queue_failed_jobs +GROUP BY queue; + +-- Find stuck jobs (reserved > 1 hour ago) +SELECT * FROM queue_jobs +WHERE reserved_at < DATE_SUB(NOW(), INTERVAL 1 HOUR); +``` + +## Troubleshooting + +### Redis Connection Issues + +```bash +# Test connection +redis-cli ping + +# Check if Redis is running +redis-cli INFO server + +# Restart Redis +# macOS: brew services restart redis +# Linux: sudo systemctl restart redis +``` + +### Database Connection Issues + +```bash +# Test connection +mysql -h localhost -u root -p -e "SELECT 1" + +# Check if MySQL is running +sudo systemctl status mysql + +# Check for locks +mysql> SHOW PROCESSLIST; +mysql> SHOW OPEN TABLES WHERE In_use > 0; +``` + +### Jobs Not Processing + +1. **Check driver configuration:** + ```env + QUEUE_CONNECTION=redis # Match your setup + ``` + +2. **Verify service is running:** + ```bash + # Redis + redis-cli ping + + # MySQL + mysql -h localhost -u root -p -e "SELECT 1" + ``` + +3. **Check worker connection:** + ```bash + python main.py --queue-work --connection redis + python main.py --queue-work --connection database + ``` + +## Best Practices + +### 1. Choose Based on Requirements + +``` +High volume, speed critical โ†’ Redis +Persistence critical โ†’ Database +Low volume, simple setup โ†’ Database +Already using Redis โ†’ Redis +``` + +### 2. Monitor Both + +Even if using Redis, keep failed jobs in database: + +```python +# RedisQueue already does this +await driver.failed(connection, queue, payload, exception) +# Stores in database if available +``` + +### 3. Regular Cleanup + +```sql +-- Clean old failed jobs (> 30 days) +DELETE FROM queue_failed_jobs +WHERE failed_at < DATE_SUB(NOW(), INTERVAL 30 DAY); +``` + +### 4. Backup Important Queues + +```bash +# Redis +redis-cli SAVE +cp /var/lib/redis/dump.rdb /backup/ + +# MySQL +mysqldump -u root -p mqtt_framework queue_jobs > backup.sql +``` + +## Next Steps + +- [Handle failed jobs](./failed-jobs.md) +- [Review best practices](./best-practices.md) +- [Docker deployment](../docker-deployment.md) diff --git a/docs/queue/failed-jobs.md b/docs/queue/failed-jobs.md new file mode 100644 index 0000000..8947271 --- /dev/null +++ b/docs/queue/failed-jobs.md @@ -0,0 +1,473 @@ +# Failed Jobs + +When jobs fail permanently (exceed max_tries), they're moved to the failed jobs storage for inspection and potential retry. This guide explains how to handle failed jobs. + +## What Makes a Job Fail? + +A job fails permanently when: + +1. **Exceeds max_tries** - Retried the maximum number of times +2. **Unrecoverable error** - Exception that can't be resolved by retrying +3. **Timeout** - Job exceeds its timeout limit repeatedly + +## Where Failed Jobs Are Stored + +### Database Storage (Recommended) + +Failed jobs are stored in the `queue_failed_jobs` table: + +```sql +CREATE TABLE queue_failed_jobs ( + id INT AUTO_INCREMENT PRIMARY KEY, + connection VARCHAR(255) NOT NULL, + queue VARCHAR(255) NOT NULL, + payload TEXT NOT NULL, + exception TEXT NOT NULL, + failed_at DATETIME NOT NULL, + INDEX(queue) +); +``` + +### Redis Storage (Fallback) + +If MySQL is disabled, failed jobs are stored in Redis: + +``` +routemq:queue:failed:{queue_name} +``` + +## Viewing Failed Jobs + +### Using MySQL + +```sql +-- View all failed jobs +SELECT * FROM queue_failed_jobs +ORDER BY failed_at DESC; + +-- Failed jobs by queue +SELECT queue, COUNT(*) as count +FROM queue_failed_jobs +GROUP BY queue; + +-- Recent failures +SELECT * FROM queue_failed_jobs +WHERE failed_at >= DATE_SUB(NOW(), INTERVAL 1 HOUR) +ORDER BY failed_at DESC; + +-- View specific failure +SELECT id, queue, exception, failed_at +FROM queue_failed_jobs +WHERE id = 123; + +-- View full payload +SELECT payload FROM queue_failed_jobs WHERE id = 123; +``` + +### Using Redis CLI + +```bash +# Connect to Redis +redis-cli + +# Check failed job count +LLEN routemq:queue:failed:default + +# View failed jobs +LRANGE routemq:queue:failed:default 0 9 + +# View specific job +LINDEX routemq:queue:failed:default 0 +``` + +## The failed() Method + +Override the `failed()` method in your job to handle permanent failures: + +```python +from core.job import Job +import logging + +logger = logging.getLogger("MyJob") + + +class MyJob(Job): + async def handle(self): + # Job logic + pass + + async def failed(self, exception: Exception): + """ + Called when job fails permanently. + + Args: + exception: The exception that caused the final failure + """ + logger.error(f"Job failed permanently: {exception}") + + # Send alert to admin + await send_admin_alert( + f"Job {self.__class__.__name__} failed", + str(exception) + ) + + # Log to monitoring service + await log_to_sentry(exception) + + # Clean up resources + await cleanup_resources(self.resource_id) + + # Update database status + await mark_as_failed(self.task_id) +``` + +## Common Failed Job Scenarios + +### Scenario 1: External API Failure + +```python +class CallExternalAPIJob(Job): + max_tries = 5 + retry_after = 60 # Wait 1 minute between retries + + async def handle(self): + response = await call_external_api(self.endpoint, self.data) + if not response.success: + raise Exception("API call failed") + + async def failed(self, exception: Exception): + # API still failing after 5 tries + logger.error(f"API {self.endpoint} unreachable: {exception}") + + # Store for manual retry later + await store_for_manual_processing(self.endpoint, self.data) + + # Notify operations team + await send_slack_message("#ops", f"API {self.endpoint} is down") +``` + +### Scenario 2: Invalid Data + +```python +class ProcessDataJob(Job): + max_tries = 1 # Don't retry invalid data + + async def handle(self): + if not self.validate_data(): + raise ValueError("Invalid data format") + + await process_data(self.data) + + async def failed(self, exception: Exception): + # Log invalid data for investigation + logger.error(f"Invalid data: {self.data}") + + # Store in error log + await save_error_log({ + "data": self.data, + "error": str(exception), + "timestamp": datetime.now() + }) +``` + +### Scenario 3: Resource Unavailable + +```python +class GenerateReportJob(Job): + max_tries = 3 + retry_after = 300 # Wait 5 minutes + + async def handle(self): + # Check if data is ready + if not await data_ready(self.report_id): + raise Exception("Data not ready") + + await generate_report(self.report_id) + + async def failed(self, exception: Exception): + # Data still not ready after 3 tries + logger.warning(f"Report {self.report_id} data not ready") + + # Notify user + await send_email( + self.user_email, + "Report Delayed", + f"Your report is delayed due to data availability" + ) +``` + +## Inspecting Failed Jobs + +### Get Job Details + +```python +from core.model import Model +from app.models.queue_failed_job import QueueFailedJob +from sqlalchemy import select + +async def inspect_failed_job(job_id: int): + session = await Model.get_session() + + result = await session.execute( + select(QueueFailedJob).where(QueueFailedJob.id == job_id) + ) + failed_job = result.scalars().first() + + if failed_job: + print(f"Queue: {failed_job.queue}") + print(f"Failed at: {failed_job.failed_at}") + print(f"Exception: {failed_job.exception}") + print(f"Payload: {failed_job.payload}") + + await session.close() +``` + +### Analyze Failure Patterns + +```sql +-- Most common failure reasons +SELECT + SUBSTRING_INDEX(exception, ':', 1) as error_type, + COUNT(*) as count +FROM queue_failed_jobs +GROUP BY error_type +ORDER BY count DESC; + +-- Failures by hour +SELECT + DATE_FORMAT(failed_at, '%Y-%m-%d %H:00') as hour, + COUNT(*) as failures +FROM queue_failed_jobs +WHERE failed_at >= DATE_SUB(NOW(), INTERVAL 24 HOUR) +GROUP BY hour +ORDER BY hour; + +-- Failure rate by queue +SELECT + queue, + COUNT(*) as total_failures, + COUNT(DISTINCT DATE(failed_at)) as days_with_failures +FROM queue_failed_jobs +GROUP BY queue; +``` + +## Retrying Failed Jobs + +### Manual Retry + +```python +from core.job import Job +from core.queue.queue_manager import dispatch + +async def retry_failed_job(failed_job_id: int): + """Retry a specific failed job.""" + # Fetch failed job from database + session = await Model.get_session() + result = await session.execute( + select(QueueFailedJob).where(QueueFailedJob.id == failed_job_id) + ) + failed_job = result.scalars().first() + + if not failed_job: + print(f"Failed job {failed_job_id} not found") + return + + # Deserialize and dispatch again + job = Job.unserialize(failed_job.payload) + await dispatch(job) + + # Delete from failed jobs + await session.delete(failed_job) + await session.commit() + await session.close() + + print(f"Retried failed job {failed_job_id}") +``` + +### Bulk Retry + +```python +async def retry_all_failed_jobs(queue: str = "default"): + """Retry all failed jobs in a queue.""" + session = await Model.get_session() + + result = await session.execute( + select(QueueFailedJob).where(QueueFailedJob.queue == queue) + ) + failed_jobs = result.scalars().all() + + retried = 0 + for failed_job in failed_jobs: + try: + job = Job.unserialize(failed_job.payload) + await dispatch(job) + await session.delete(failed_job) + retried += 1 + except Exception as e: + print(f"Failed to retry job {failed_job.id}: {e}") + + await session.commit() + await session.close() + + print(f"Retried {retried} failed jobs from queue '{queue}'") +``` + +## Cleaning Up Failed Jobs + +### Delete Old Failed Jobs + +```sql +-- Delete failed jobs older than 30 days +DELETE FROM queue_failed_jobs +WHERE failed_at < DATE_SUB(NOW(), INTERVAL 30 DAY); + +-- Delete all failed jobs from a specific queue +DELETE FROM queue_failed_jobs +WHERE queue = 'old-queue'; + +-- Keep only last 1000 failed jobs +DELETE FROM queue_failed_jobs +WHERE id NOT IN ( + SELECT id FROM ( + SELECT id FROM queue_failed_jobs + ORDER BY failed_at DESC + LIMIT 1000 + ) as recent +); +``` + +### Automated Cleanup Script + +```python +# cleanup_failed_jobs.py +import asyncio +from datetime import datetime, timedelta +from core.model import Model +from app.models.queue_failed_job import QueueFailedJob + +async def cleanup_old_failed_jobs(days: int = 30): + """Delete failed jobs older than specified days.""" + session = await Model.get_session() + + cutoff_date = datetime.utcnow() - timedelta(days=days) + + result = await session.execute( + delete(QueueFailedJob).where(QueueFailedJob.failed_at < cutoff_date) + ) + + deleted_count = result.rowcount + await session.commit() + await session.close() + + print(f"Deleted {deleted_count} failed jobs older than {days} days") + +if __name__ == "__main__": + asyncio.run(cleanup_old_failed_jobs(30)) +``` + +Schedule with cron: + +```cron +# Run cleanup daily at 2 AM +0 2 * * * cd /path/to/RouteMQ && /path/to/venv/bin/python cleanup_failed_jobs.py +``` + +## Monitoring Failed Jobs + +### Alert on Failure Threshold + +```python +async def check_failure_rate(): + """Alert if failure rate exceeds threshold.""" + session = await Model.get_session() + + # Count failures in last hour + one_hour_ago = datetime.utcnow() - timedelta(hours=1) + + result = await session.execute( + select(func.count(QueueFailedJob.id)) + .where(QueueFailedJob.failed_at >= one_hour_ago) + ) + failure_count = result.scalar() + + if failure_count > 100: # Threshold + await send_alert( + "High Failure Rate", + f"{failure_count} jobs failed in the last hour" + ) + + await session.close() +``` + +### Dashboard Query + +```sql +-- Failed jobs summary for dashboard +SELECT + queue, + COUNT(*) as total_failures, + MAX(failed_at) as last_failure, + MIN(failed_at) as first_failure +FROM queue_failed_jobs +WHERE failed_at >= DATE_SUB(NOW(), INTERVAL 7 DAY) +GROUP BY queue +ORDER BY total_failures DESC; +``` + +## Best Practices + +### 1. Always Implement failed() + +```python +class MyJob(Job): + async def failed(self, exception: Exception): + # Log the failure + logger.error(f"Job failed: {exception}") + + # Clean up resources + # Notify stakeholders + # Update status +``` + +### 2. Set Appropriate max_tries + +```python +# Quick tasks - fail fast +class QuickJob(Job): + max_tries = 2 + +# External API calls - retry more +class APIJob(Job): + max_tries = 5 + retry_after = 60 + +# Critical jobs - many retries +class CriticalJob(Job): + max_tries = 10 + retry_after = 300 +``` + +### 3. Regular Cleanup + +```python +# Clean up old failed jobs weekly +async def weekly_cleanup(): + await cleanup_old_failed_jobs(days=30) +``` + +### 4. Monitor Failure Patterns + +```python +# Track failure types +failures_by_type = {} +for exception_type in failure_exceptions: + failures_by_type[exception_type] = count + +# Alert on unusual patterns +``` + +## Next Steps + +- [Review best practices](./best-practices.md) +- [Learn about queue drivers](./drivers.md) +- [Docker deployment](../docker-deployment.md) diff --git a/docs/queue/getting-started.md b/docs/queue/getting-started.md new file mode 100644 index 0000000..db317c8 --- /dev/null +++ b/docs/queue/getting-started.md @@ -0,0 +1,228 @@ +# Getting Started with Queue System + +This guide will help you set up and configure the RouteMQ queue system. + +## Prerequisites + +You'll need one of the following: + +- **Redis** (recommended for production) - Fast, in-memory queue +- **MySQL** - Persistent, database-backed queue + +## Installation + +The queue system is included with RouteMQ. No additional installation required. + +## Configuration + +### 1. Choose Your Queue Driver + +Edit your `.env` file: + +```env +# Queue Configuration +QUEUE_CONNECTION=redis # or 'database' +``` + +### 2. Configure Redis (Recommended) + +If using Redis queue: + +```env +# Enable Redis +ENABLE_REDIS=true +REDIS_HOST=localhost +REDIS_PORT=6379 +REDIS_DB=0 +REDIS_PASSWORD= + +# Queue connection +QUEUE_CONNECTION=redis +``` + +**Install Redis:** +```bash +# macOS +brew install redis +brew services start redis + +# Ubuntu/Debian +sudo apt-get install redis-server +sudo systemctl start redis + +# Docker +docker run -d -p 6379:6379 redis:7-alpine +``` + +### 3. Configure Database Queue (Alternative) + +If using database queue: + +```env +# Enable MySQL +ENABLE_MYSQL=true +DB_HOST=localhost +DB_PORT=3306 +DB_NAME=mqtt_framework +DB_USER=root +DB_PASS=your_password + +# Queue connection +QUEUE_CONNECTION=database +``` + +**Create Tables:** + +The queue tables will be created automatically when you start the application. They include: + +- `queue_jobs` - Stores pending and reserved jobs +- `queue_failed_jobs` - Stores permanently failed jobs + +## Queue Drivers Comparison + +### Redis Driver + +**Pros:** +- โœ… Very fast (in-memory) +- โœ… Low latency +- โœ… Excellent for high-throughput +- โœ… Supports delayed jobs +- โœ… Built-in sorted sets for delays + +**Cons:** +- โš ๏ธ Requires Redis server +- โš ๏ธ Jobs lost if Redis crashes (unless persistence enabled) +- โš ๏ธ Additional infrastructure + +**Best for:** Production environments with high job volumes + +### Database Driver + +**Pros:** +- โœ… Persistent storage +- โœ… ACID transactions +- โœ… No additional services needed +- โœ… Reliable job storage +- โœ… Easy to inspect jobs with SQL + +**Cons:** +- โš ๏ธ Slower than Redis +- โš ๏ธ Higher database load +- โš ๏ธ May need index optimization for large queues + +**Best for:** Low to medium job volumes, or when Redis isn't available + +## Verify Configuration + +### Test Redis Connection + +```bash +# Using Redis CLI +redis-cli ping +# Should return: PONG + +# In Python +python -c " +import redis +r = redis.Redis(host='localhost', port=6379) +print(r.ping()) # Should print: True +" +``` + +### Test Database Connection + +```bash +# Using MySQL client +mysql -h localhost -u root -p -e "SHOW DATABASES;" + +# In Python +python -c " +import asyncio +from bootstrap.app import Application + +async def test(): + app = Application() + await app.initialize_database() + print('Database connected!') + +asyncio.run(test()) +" +``` + +## Docker Setup + +For Docker, use the provided configuration: + +```bash +# Start all services (includes Redis and MySQL) +docker compose up -d + +# Check services +docker compose ps + +# View worker logs +docker compose logs -f queue-worker-default +``` + +See [Docker Deployment](../docker-deployment.md) for details. + +## Environment Variables Reference + +| Variable | Default | Description | +|----------|---------|-------------| +| `QUEUE_CONNECTION` | `redis` | Queue driver: `redis` or `database` | +| `ENABLE_REDIS` | `false` | Enable Redis integration | +| `REDIS_HOST` | `localhost` | Redis server hostname | +| `REDIS_PORT` | `6379` | Redis server port | +| `REDIS_DB` | `0` | Redis database number | +| `REDIS_PASSWORD` | - | Redis password (optional) | +| `ENABLE_MYSQL` | `false` | Enable MySQL integration | +| `DB_HOST` | `localhost` | MySQL server hostname | +| `DB_PORT` | `3306` | MySQL server port | +| `DB_NAME` | `mqtt_framework` | MySQL database name | +| `DB_USER` | `root` | MySQL username | +| `DB_PASS` | - | MySQL password | + +## Next Steps + +Now that your queue is configured: + +1. [Create your first job](./creating-jobs.md) +2. [Learn how to dispatch jobs](./dispatching-jobs.md) +3. [Run queue workers](./running-workers.md) + +## Troubleshooting + +### Redis Connection Failed + +```bash +# Check if Redis is running +redis-cli ping + +# Check Redis logs +# macOS: /usr/local/var/log/redis.log +# Linux: /var/log/redis/redis-server.log + +# Restart Redis +# macOS: brew services restart redis +# Linux: sudo systemctl restart redis +``` + +### Database Connection Failed + +```bash +# Check if MySQL is running +sudo systemctl status mysql + +# Test connection +mysql -h localhost -u root -p + +# Check credentials in .env file +``` + +### Workers Not Processing Jobs + +1. Verify queue configuration matches between app and worker +2. Check worker logs: `docker compose logs queue-worker-default` +3. Ensure Redis/MySQL is accessible +4. Verify queue name matches in dispatch and worker diff --git a/docs/queue/running-workers.md b/docs/queue/running-workers.md new file mode 100644 index 0000000..9b61b7e --- /dev/null +++ b/docs/queue/running-workers.md @@ -0,0 +1,496 @@ +# Running Queue Workers + +Queue workers are background processes that fetch and execute jobs from the queue. This guide explains how to run and manage workers. + +## Starting a Worker + +### Basic Usage + +Start a worker to process jobs from the default queue: + +```bash +# Process jobs from 'default' queue +python main.py --queue-work + +# Or using the routemq command +routemq --queue-work +``` + +The worker will: +- Connect to Redis/MySQL based on `QUEUE_CONNECTION` +- Poll the queue for jobs +- Execute jobs as they become available +- Automatically retry failed jobs +- Run until stopped (Ctrl+C) + +## Worker Options + +### --queue + +Specify which queue to process: + +```bash +python main.py --queue-work --queue emails +python main.py --queue-work --queue high-priority +python main.py --queue-work --queue reports +``` + +### --connection + +Override the queue connection: + +```bash +# Use Redis queue +python main.py --queue-work --connection redis + +# Use database queue +python main.py --queue-work --connection database +``` + +### --max-jobs + +Process a maximum number of jobs then stop: + +```bash +# Process 100 jobs then exit +python main.py --queue-work --max-jobs 100 + +# Process 1 job then exit (useful for testing) +python main.py --queue-work --max-jobs 1 +``` + +### --max-time + +Run for a maximum time (in seconds) then stop: + +```bash +# Run for 1 hour +python main.py --queue-work --max-time 3600 + +# Run for 8 hours +python main.py --queue-work --max-time 28800 +``` + +### --sleep + +Seconds to sleep when no jobs are available: + +```bash +# Check every second (high priority queue) +python main.py --queue-work --sleep 1 + +# Check every 5 seconds (normal priority) +python main.py --queue-work --sleep 5 + +# Check every 10 seconds (low priority) +python main.py --queue-work --sleep 10 +``` + +### --max-tries + +Override the maximum retry attempts for all jobs: + +```bash +# Retry failed jobs up to 5 times +python main.py --queue-work --max-tries 5 + +# Never retry (fail immediately) +python main.py --queue-work --max-tries 1 +``` + +### --timeout + +Maximum seconds a job can run: + +```bash +# 2 minute timeout +python main.py --queue-work --timeout 120 + +# 10 minute timeout +python main.py --queue-work --timeout 600 +``` + +## Multiple Workers + +Run multiple workers for different queues: + +```bash +# Terminal 1: High-priority queue (check every second) +python main.py --queue-work --queue high-priority --sleep 1 + +# Terminal 2: Default queue (check every 3 seconds) +python main.py --queue-work --queue default --sleep 3 + +# Terminal 3: Low-priority queue (check every 10 seconds) +python main.py --queue-work --queue low-priority --sleep 10 + +# Terminal 4: Email queue (dedicated worker) +python main.py --queue-work --queue emails --sleep 5 +``` + +## Production Deployment + +### Using Docker Compose + +The easiest way to run workers in production: + +```bash +# Start all services including workers +docker compose up -d + +# Scale workers +docker compose up -d --scale queue-worker-default=5 + +# View worker logs +docker compose logs -f queue-worker-default +``` + +See [Docker Deployment](../docker-deployment.md) for details. + +### Using Supervisor + +For non-Docker deployments, use Supervisor: + +```ini +; /etc/supervisor/conf.d/routemq-queue.conf + +[program:routemq-queue-default] +command=/path/to/venv/bin/python main.py --queue-work --queue default --sleep 3 +directory=/path/to/RouteMQ +user=www-data +autostart=true +autorestart=true +redirect_stderr=true +stdout_logfile=/var/log/routemq/queue-default.log +startsecs=10 +stopwaitsecs=60 + +[program:routemq-queue-high] +command=/path/to/venv/bin/python main.py --queue-work --queue high-priority --sleep 1 +directory=/path/to/RouteMQ +user=www-data +autostart=true +autorestart=true +redirect_stderr=true +stdout_logfile=/var/log/routemq/queue-high.log +startsecs=10 +stopwaitsecs=60 + +[program:routemq-queue-emails] +command=/path/to/venv/bin/python main.py --queue-work --queue emails --sleep 5 +directory=/path/to/RouteMQ +user=www-data +autostart=true +autorestart=true +redirect_stderr=true +stdout_logfile=/var/log/routemq/queue-emails.log +startsecs=10 +stopwaitsecs=60 +``` + +Then manage with supervisorctl: + +```bash +# Reload configuration +sudo supervisorctl reread +sudo supervisorctl update + +# Start workers +sudo supervisorctl start routemq-queue-default +sudo supervisorctl start routemq-queue-high +sudo supervisorctl start routemq-queue-emails + +# Check status +sudo supervisorctl status + +# View logs +sudo supervisorctl tail -f routemq-queue-default + +# Restart worker +sudo supervisorctl restart routemq-queue-default + +# Stop worker +sudo supervisorctl stop routemq-queue-default +``` + +### Using systemd + +Create systemd service files: + +```ini +# /etc/systemd/system/routemq-queue-default.service + +[Unit] +Description=RouteMQ Queue Worker (Default) +After=network.target redis.service mysql.service + +[Service] +Type=simple +User=www-data +WorkingDirectory=/path/to/RouteMQ +ExecStart=/path/to/venv/bin/python main.py --queue-work --queue default --sleep 3 +Restart=always +RestartSec=10 +StandardOutput=append:/var/log/routemq/queue-default.log +StandardError=append:/var/log/routemq/queue-default-error.log + +[Install] +WantedBy=multi-user.target +``` + +Manage with systemctl: + +```bash +# Reload systemd +sudo systemctl daemon-reload + +# Enable service (start on boot) +sudo systemctl enable routemq-queue-default + +# Start service +sudo systemctl start routemq-queue-default + +# Check status +sudo systemctl status routemq-queue-default + +# View logs +sudo journalctl -u routemq-queue-default -f + +# Restart service +sudo systemctl restart routemq-queue-default + +# Stop service +sudo systemctl stop routemq-queue-default +``` + +## Worker Lifecycle + +Understanding how workers process jobs: + +``` +1. Worker Starts + โ†“ +2. Connect to Queue (Redis/MySQL) + โ†“ +3. Poll for Jobs + โ†“ + โ”œโ”€ Job Available + โ”‚ โ†“ + โ”‚ Reserve Job (mark as processing) + โ”‚ โ†“ + โ”‚ Execute job.handle() + โ”‚ โ†“ + โ”‚ โ”œโ”€ Success โ†’ Delete Job + โ”‚ โ””โ”€ Failure โ†’ Release or Move to Failed + โ”‚ โ†“ + โ”‚ Loop back to Poll + โ”‚ + โ””โ”€ No Jobs โ†’ Sleep โ†’ Loop back to Poll +``` + +## Graceful Shutdown + +Workers handle shutdown signals gracefully: + +```bash +# Send SIGTERM (recommended) +kill -TERM + +# Or Ctrl+C (sends SIGINT) +^C + +# Worker will: +# 1. Stop accepting new jobs +# 2. Finish current job +# 3. Clean up connections +# 4. Exit +``` + +## Monitoring Workers + +### View Worker Output + +```bash +# In terminal +python main.py --queue-work --queue default + +# Output: +# 2024-01-15 10:30:00 - RouteMQ.QueueWorker - INFO - Queue worker started for queue 'default' +# 2024-01-15 10:30:05 - RouteMQ.QueueWorker - INFO - Processing job 123 (attempt 1) +# 2024-01-15 10:30:07 - RouteMQ.QueueWorker - INFO - Job 123 completed successfully +``` + +### Check Queue Size + +```python +from core.queue.queue_manager import queue + +# Check how many jobs are waiting +size = await queue.size("default") +print(f"Pending jobs: {size}") +``` + +### Monitor with Redis CLI + +```bash +# Connect to Redis +redis-cli + +# Check queue length +LLEN routemq:queue:default + +# Check delayed jobs +ZCARD routemq:queue:default:delayed + +# Check reserved jobs +LLEN routemq:queue:default:reserved + +# View queue contents +LRANGE routemq:queue:default 0 9 # First 10 jobs +``` + +### Monitor with MySQL + +```sql +-- Connect to MySQL +mysql -u root -p routemq_production + +-- Check pending jobs +SELECT queue, COUNT(*) as pending +FROM queue_jobs +WHERE reserved_at IS NULL +GROUP BY queue; + +-- Check reserved jobs (being processed) +SELECT queue, COUNT(*) as processing +FROM queue_jobs +WHERE reserved_at IS NOT NULL +GROUP BY queue; + +-- Check failed jobs +SELECT queue, COUNT(*) as failed +FROM queue_failed_jobs +GROUP BY queue; + +-- View job details +SELECT * FROM queue_jobs +WHERE queue = 'default' +ORDER BY created_at DESC +LIMIT 10; +``` + +## Troubleshooting + +### Worker Not Processing Jobs + +**Check if worker is running:** +```bash +ps aux | grep "queue-work" +``` + +**Check worker logs for errors:** +```bash +# Docker +docker compose logs queue-worker-default + +# Supervisor +sudo supervisorctl tail -f routemq-queue-default + +# systemd +sudo journalctl -u routemq-queue-default -n 50 +``` + +**Common issues:** +- Queue name mismatch between dispatch and worker +- Redis/MySQL connection issues +- Jobs failing during execution + +### Jobs Timing Out + +If jobs are timing out: + +```bash +# Increase timeout +python main.py --queue-work --timeout 300 + +# Or set timeout in job class +class MyJob(Job): + timeout = 300 # 5 minutes +``` + +### High Memory Usage + +If worker memory grows: + +```bash +# Restart worker after processing N jobs +python main.py --queue-work --max-jobs 1000 + +# Then use supervisor/systemd to auto-restart +``` + +### Worker Stuck + +If worker seems stuck: + +1. Send SIGTERM to gracefully stop +2. Check for infinite loops in job code +3. Add timeouts to external API calls +4. Review job logs for errors + +## Best Practices + +### 1. Run Multiple Workers + +```bash +# Scale workers based on load +docker compose up -d --scale queue-worker-default=5 +``` + +### 2. Use Different Queues + +```bash +# High priority - fast polling +python main.py --queue-work --queue critical --sleep 1 + +# Normal priority +python main.py --queue-work --queue default --sleep 3 + +# Low priority - slow polling +python main.py --queue-work --queue cleanup --sleep 30 +``` + +### 3. Set Resource Limits + +```ini +# In supervisor config +[program:routemq-queue] +environment=PYTHONUNBUFFERED="1" +priority=999 +startsecs=10 +stopwaitsecs=60 +killasgroup=true +``` + +### 4. Log Everything + +```python +# In jobs +logger.info(f"Processing job {self.job_id}") +logger.info(f"Job completed in {elapsed}s") +``` + +### 5. Monitor Queue Depth + +```python +# Alert if queue grows too large +size = await queue.size("default") +if size > 1000: + await send_alert("Queue backlog detected") +``` + +## Next Steps + +- [Understand queue drivers](./drivers.md) +- [Handle failed jobs](./failed-jobs.md) +- [Review best practices](./best-practices.md) diff --git a/main.py b/main.py index b03b433..2472cd5 100644 --- a/main.py +++ b/main.py @@ -32,6 +32,12 @@ def create_env_file(): DB_USER=root DB_PASS= +# Redis Configuration +ENABLE_REDIS=false + +# Queue Configuration +QUEUE_CONNECTION=redis + # Timezone Configuration TIMEZONE=Asia/Jakarta @@ -135,12 +141,62 @@ def tinker(): from core.tinker import run_tinker run_tinker() +def queue_work(queue="default", connection=None, max_jobs=None, max_time=None, + sleep=3, max_tries=None, timeout=60): + """Start the queue worker to process background jobs.""" + import asyncio + from bootstrap.app import Application + from core.queue.queue_worker import QueueWorker + + # Initialize the application to setup database/redis connections + create_env_file() + app = create_app() + + async def run_worker(): + """Run the queue worker with proper initialization and cleanup.""" + # Initialize connections + await app._initialize_connections() + + try: + # Create and start the worker + worker = QueueWorker( + queue_name=queue, + connection=connection, + max_jobs=max_jobs, + max_time=max_time, + sleep=sleep, + max_tries=max_tries, + timeout=timeout, + ) + + print(f"Starting queue worker for queue: {queue}") + print(f"Connection: {connection or 'default'}") + print(f"Sleep when idle: {sleep}s") + print(f"Press Ctrl+C to stop gracefully\n") + + await worker.work() + + finally: + # Cleanup connections + await app._cleanup_connections() + + # Run the worker + asyncio.run(run_worker()) + def main(): """Main entry point for the CLI.""" parser = argparse.ArgumentParser(description="RouteMQ - MQTT routing framework") parser.add_argument('--init', action='store_true', help="Initialize a new RouteMQ project") parser.add_argument('--run', action='store_true', help="Run the MQTT application") parser.add_argument('--tinker', action='store_true', help="Start interactive REPL for testing ORM and queries") + parser.add_argument('--queue-work', action='store_true', help="Start queue worker to process background jobs") + parser.add_argument('--queue', type=str, default="default", help="The queue to process (default: default)") + parser.add_argument('--connection', type=str, help="Queue connection to use (redis or database)") + parser.add_argument('--max-jobs', type=int, help="Maximum number of jobs to process") + parser.add_argument('--max-time', type=int, help="Maximum time in seconds to run") + parser.add_argument('--sleep', type=int, default=3, help="Seconds to sleep when no job is available (default: 3)") + parser.add_argument('--max-tries', type=int, help="Maximum number of times to attempt a job") + parser.add_argument('--timeout', type=int, default=60, help="Maximum seconds a job can run (default: 60)") args = parser.parse_args() @@ -154,6 +210,18 @@ def main(): tinker() return + if args.queue_work: + queue_work( + queue=args.queue, + connection=args.connection, + max_jobs=args.max_jobs, + max_time=args.max_time, + sleep=args.sleep, + max_tries=args.max_tries, + timeout=args.timeout, + ) + return + if args.run or not sys.argv[1:]: create_env_file() app = create_app() diff --git a/test_queue.py b/test_queue.py new file mode 100644 index 0000000..8538998 --- /dev/null +++ b/test_queue.py @@ -0,0 +1,114 @@ +#!/usr/bin/env python3 +""" +Quick test script to verify the queue system works correctly. +""" +import asyncio +import sys +from pathlib import Path + +# Add project root to path +sys.path.insert(0, str(Path(__file__).parent)) + +from dotenv import load_dotenv +from core.job import Job +from core.queue.queue_manager import dispatch, queue +import logging + +# Setup logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) + +logger = logging.getLogger(__name__) + + +class TestJob(Job): + """Simple test job.""" + + max_tries = 2 + timeout = 10 + queue = "test" + + def __init__(self): + super().__init__() + self.test_data = None + + async def handle(self): + logger.info(f"Processing TestJob with data: {self.test_data}") + await asyncio.sleep(1) + logger.info("TestJob completed successfully!") + + async def failed(self, exception: Exception): + logger.error(f"TestJob failed: {exception}") + + +async def test_job_serialization(): + """Test job serialization and deserialization.""" + logger.info("Testing job serialization...") + + # Create a job + job = TestJob() + job.test_data = {"message": "Hello, Queue!", "number": 42} + + # Serialize + payload = job.serialize() + logger.info(f"Serialized payload: {payload}") + + # Deserialize + restored_job = Job.unserialize(payload) + logger.info(f"Restored job: {restored_job}") + logger.info(f"Restored data: {restored_job.test_data}") + + assert restored_job.test_data == job.test_data + logger.info("โœ… Serialization test passed!") + + +async def test_dispatch(): + """Test dispatching a job (without actually processing it).""" + logger.info("\nTesting job dispatch...") + + # Note: This requires Redis or MySQL to be enabled + # For a basic test, we'll just verify the job can be created and serialized + + job = TestJob() + job.test_data = {"test": "dispatch"} + + logger.info("Created test job successfully") + logger.info("โœ… Job creation test passed!") + + # To actually test dispatch, you would need: + # await dispatch(job) + # But this requires a running Redis/MySQL instance + + +async def main(): + """Run all tests.""" + logger.info("Starting queue system tests...") + logger.info("=" * 50) + + load_dotenv() + + try: + # Test 1: Serialization + await test_job_serialization() + + # Test 2: Dispatch + await test_dispatch() + + logger.info("\n" + "=" * 50) + logger.info("โœ… All tests passed!") + logger.info("\nTo test the full queue system:") + logger.info("1. Enable Redis or MySQL in your .env file") + logger.info("2. Run: python main.py --queue-work --queue test") + logger.info("3. In another terminal, dispatch a job using the example jobs") + + except Exception as e: + logger.error(f"โŒ Test failed: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + asyncio.run(main())