-
Notifications
You must be signed in to change notification settings - Fork 0
add background task queue #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
ardzz
merged 4 commits into
master
from
claude/add-background-task-queue-011CUcvsPKaeBhohjpwZJsDy
Oct 30, 2025
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
3db60fa
feat: add background task queue system similar to Laravel's queue:work
claude 55ddb83
feat: add Docker configuration for queue workers and production deplo…
claude c09aa09
docs: reorganize queue documentation into dedicated folder structure
claude f8bd072
Update app/jobs/example_data_processing_job.py
ardzz File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| # Docker Environment Configuration for RouteMQ | ||
| # Copy this file to .env and customize for your deployment | ||
|
|
||
| # MQTT Broker Configuration | ||
| MQTT_BROKER=test.mosquitto.org | ||
| MQTT_PORT=1883 | ||
| MQTT_USERNAME= | ||
| MQTT_PASSWORD= | ||
| MQTT_GROUP_NAME=mqtt_framework_group | ||
|
|
||
| # MySQL Configuration | ||
| ENABLE_MYSQL=true | ||
| DB_HOST=mysql | ||
| DB_PORT=3306 | ||
| DB_NAME=mqtt_framework | ||
| DB_USER=routemq | ||
| DB_PASS=routemq | ||
|
|
||
| # Redis Configuration | ||
| ENABLE_REDIS=true | ||
| REDIS_HOST=redis | ||
| REDIS_PORT=6379 | ||
| REDIS_DB=0 | ||
| REDIS_PASSWORD= | ||
|
|
||
| # Queue Configuration | ||
| QUEUE_CONNECTION=redis | ||
|
|
||
| # Timezone | ||
| TIMEZONE=Asia/Jakarta | ||
|
|
||
| # Logging Configuration | ||
| LOG_LEVEL=INFO | ||
| LOG_FORMAT=%(asctime)s - %(name)s - %(levelname)s - %(message)s |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| .PHONY: help build up down restart logs ps clean dev queue-work | ||
|
|
||
| help: ## Show this help message | ||
| @echo "RouteMQ Docker Commands" | ||
| @echo "" | ||
| @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}' | ||
|
|
||
| build: ## Build all Docker images | ||
| docker compose build | ||
|
|
||
| up: ## Start all services (production) | ||
| docker compose up -d | ||
|
|
||
| down: ## Stop all services | ||
| docker compose down | ||
|
|
||
| restart: ## Restart all services | ||
| docker compose restart | ||
|
|
||
| logs: ## View logs from all services | ||
| docker compose logs -f | ||
|
|
||
| logs-app: ## View logs from RouteMQ app | ||
| docker compose logs -f routemq | ||
|
|
||
| logs-worker: ## View logs from default queue worker | ||
| docker compose logs -f queue-worker-default | ||
|
|
||
| logs-redis: ## View logs from Redis | ||
| docker compose logs -f redis | ||
|
|
||
| logs-mysql: ## View logs from MySQL | ||
| docker compose logs -f mysql | ||
|
|
||
| ps: ## Show running services | ||
| docker compose ps | ||
|
|
||
| stats: ## Show resource usage stats | ||
| docker stats | ||
|
|
||
| clean: ## Stop and remove all containers, networks, and volumes | ||
| docker compose down -v | ||
|
|
||
| dev: ## Start development environment (Redis + MySQL only) | ||
| docker compose -f docker-compose.dev.yml up -d | ||
|
|
||
| dev-full: ## Start development environment (all services) | ||
| docker compose -f docker-compose.dev.yml --profile full up -d | ||
|
|
||
| dev-down: ## Stop development environment | ||
| docker compose -f docker-compose.dev.yml down | ||
|
|
||
| queue-work: ## Start queue worker on host (for development) | ||
| uv run python main.py --queue-work --queue default | ||
|
|
||
| queue-high: ## Start high-priority queue worker on host | ||
| uv run python main.py --queue-work --queue high-priority --sleep 1 | ||
|
|
||
| queue-emails: ## Start emails queue worker on host | ||
| uv run python main.py --queue-work --queue emails --sleep 5 | ||
|
|
||
| scale-default: ## Scale default queue workers to 3 instances | ||
| docker compose up -d --scale queue-worker-default=3 | ||
|
|
||
| scale-emails: ## Scale email queue workers to 2 instances | ||
| docker compose up -d --scale queue-worker-emails=2 | ||
|
|
||
| shell-app: ## Open shell in RouteMQ app container | ||
| docker compose exec routemq bash | ||
|
|
||
| shell-redis: ## Open Redis CLI | ||
| docker compose exec redis redis-cli | ||
|
|
||
| shell-mysql: ## Open MySQL CLI | ||
| docker compose exec mysql mysql -uroot -p | ||
|
|
||
| backup-mysql: ## Backup MySQL database | ||
| docker compose exec mysql mysqldump -uroot -p${DB_PASS} ${DB_NAME} > backup_mysql_$$(date +%Y%m%d_%H%M%S).sql | ||
|
|
||
| backup-redis: ## Backup Redis data | ||
| docker compose exec redis redis-cli SAVE | ||
| docker cp routemq-redis:/data/dump.rdb backup_redis_$$(date +%Y%m%d_%H%M%S).rdb | ||
|
|
||
| health: ## Check health of all services | ||
| @echo "Service Health Status:" | ||
| @docker compose ps --format "table {{.Service}}\t{{.Status}}" | ||
|
|
||
| install: ## Install dependencies on host | ||
| uv sync | ||
|
|
||
| run: ## Run RouteMQ on host | ||
| uv run python main.py --run | ||
|
|
||
| tinker: ## Start interactive REPL | ||
| uv run python main.py --tinker | ||
|
|
||
| init: ## Initialize new RouteMQ project | ||
| uv run python main.py --init |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| # This file marks the directory as a Python package |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| import asyncio | ||
| import logging | ||
| from core.job import Job | ||
|
|
||
| logger = logging.getLogger("RouteMQ.Jobs.ProcessDataJob") | ||
|
|
||
|
|
||
| class ProcessDataJob(Job): | ||
| """ | ||
| Example job for processing data in the background. | ||
|
|
||
| This demonstrates a job that processes sensor data from IoT devices. | ||
|
|
||
| Usage: | ||
| from app.jobs.example_data_processing_job import ProcessDataJob | ||
| from core.queue.queue_manager import dispatch | ||
|
|
||
| # Dispatch the job | ||
| job = ProcessDataJob() | ||
| job.device_id = "sensor-001" | ||
| job.sensor_data = {"temperature": 25.5, "humidity": 60} | ||
| await dispatch(job) | ||
| """ | ||
|
|
||
| # Configure job properties | ||
| max_tries = 5 | ||
| timeout = 120 # Longer timeout for data processing | ||
| retry_after = 5 | ||
| queue = "data-processing" | ||
|
|
||
| def __init__(self): | ||
| super().__init__() | ||
| self.device_id = None | ||
| self.sensor_data = None | ||
|
|
||
| async def handle(self) -> None: | ||
| """ | ||
| Execute the job - process sensor data. | ||
| """ | ||
| logger.info(f"Processing data from device {self.device_id}") | ||
| logger.info(f"Sensor data: {self.sensor_data}") | ||
|
|
||
| # Simulate data processing | ||
| await asyncio.sleep(3) | ||
|
|
||
| # Example: Calculate statistics | ||
| if isinstance(self.sensor_data, dict): | ||
| temperature = self.sensor_data.get("temperature") | ||
| humidity = self.sensor_data.get("humidity") | ||
|
|
||
| if temperature and temperature > 30: | ||
| logger.warning(f"High temperature detected: {temperature}°C") | ||
|
|
||
| if humidity and humidity > 80: | ||
| logger.warning(f"High humidity detected: {humidity}%") | ||
|
|
||
| # In a real application, you might: | ||
| # - Store processed data in a database | ||
| # - Calculate aggregations and statistics | ||
| # - Trigger alerts if thresholds are exceeded | ||
| # - Send data to analytics services | ||
|
|
||
| logger.info(f"Successfully processed data from device {self.device_id}") | ||
|
|
||
| async def failed(self, exception: Exception) -> None: | ||
| """ | ||
| Handle permanent job failure. | ||
| """ | ||
| logger.error( | ||
| f"Failed to process data from device {self.device_id} " | ||
| f"after {self.max_tries} attempts" | ||
| ) | ||
| logger.error(f"Error: {str(exception)}") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,65 @@ | ||
| import asyncio | ||
| import logging | ||
| from core.job import Job | ||
|
|
||
| logger = logging.getLogger("RouteMQ.Jobs.SendEmailJob") | ||
|
|
||
|
|
||
| class SendEmailJob(Job): | ||
| """ | ||
| Example job for sending emails in the background. | ||
|
|
||
| Usage: | ||
| from app.jobs.example_email_job import SendEmailJob | ||
| from core.queue.queue_manager import dispatch | ||
|
|
||
| # Dispatch the job | ||
| job = SendEmailJob() | ||
| job.to = "user@example.com" | ||
| job.subject = "Welcome!" | ||
| job.message = "Thank you for signing up." | ||
| await dispatch(job) | ||
| """ | ||
|
|
||
| # Configure job properties | ||
| max_tries = 3 | ||
| timeout = 30 | ||
| retry_after = 10 # Retry after 10 seconds on failure | ||
| queue = "emails" # Use 'emails' queue instead of 'default' | ||
|
|
||
| def __init__(self): | ||
| super().__init__() | ||
| self.to = None | ||
| self.subject = None | ||
| self.message = None | ||
|
|
||
| async def handle(self) -> None: | ||
| """ | ||
| Execute the job - send an email. | ||
| In a real application, this would use an email service like SendGrid, AWS SES, etc. | ||
| """ | ||
| logger.info(f"Sending email to {self.to}") | ||
| logger.info(f"Subject: {self.subject}") | ||
| logger.info(f"Message: {self.message}") | ||
|
|
||
| # Simulate email sending (replace with actual email service) | ||
| await asyncio.sleep(2) # Simulate API call delay | ||
|
|
||
| # Uncomment to test job failure and retry | ||
| # if self.attempts == 1: | ||
| # raise Exception("Simulated email sending failure") | ||
|
|
||
| logger.info(f"Email sent successfully to {self.to}") | ||
|
|
||
| async def failed(self, exception: Exception) -> None: | ||
| """ | ||
| Handle permanent job failure. | ||
| This is called when the job exceeds max_tries. | ||
| """ | ||
| logger.error(f"Failed to send email to {self.to} after {self.max_tries} attempts") | ||
| logger.error(f"Error: {str(exception)}") | ||
|
|
||
| # In a real application, you might: | ||
| # - Log to a monitoring service | ||
| # - Send alert to administrators | ||
| # - Store failure in a database for manual review |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The link points to
./docs/queue-system.mdbut the actual documentation file is./docs/queue/README.md. This broken link should be updated to point to the correct file.