diff --git a/src/event_processors/deployment_protection_rule.py b/src/event_processors/deployment_protection_rule.py index 390ad19..7b5f3fd 100644 --- a/src/event_processors/deployment_protection_rule.py +++ b/src/event_processors/deployment_protection_rule.py @@ -3,18 +3,21 @@ from typing import Any from src.agents import get_agent -from src.core.models import Violation +from src.core.utils.retry import retry_async +from src.core.utils.timeout import execute_with_timeout from src.event_processors.base import BaseEventProcessor, ProcessingResult from src.tasks.scheduler.deployment_scheduler import get_deployment_scheduler from src.tasks.task_queue import Task logger = logging.getLogger(__name__) +AGENT_TIMEOUT_SECONDS = 30.0 + class DeploymentProtectionRuleProcessor(BaseEventProcessor): """Processor for deployment protection rule events using hybrid agentic rule evaluation.""" - def __init__(self) -> None: + def __init__(self): # Call super class __init__ first super().__init__() @@ -24,6 +27,14 @@ def __init__(self) -> None: def get_event_type(self) -> str: return "deployment_protection_rule" + @staticmethod + def _is_valid_callback_url(url: str | None) -> bool: + return bool(url and isinstance(url, str) and url.strip().startswith("http")) + + @staticmethod + def _is_valid_environment(env: str | None) -> bool: + return bool(env and isinstance(env, str) and env.strip()) + async def process(self, task: Task) -> ProcessingResult: start_time = time.time() @@ -37,30 +48,46 @@ async def process(self, task: Task) -> ProcessingResult: installation_id = task.installation_id repo_full_name = task.repo_full_name - if not installation_id: - logger.error("No installation ID found in task") - return ProcessingResult( - success=False, - violations=[], - api_calls_made=0, - processing_time_ms=int((time.time() - start_time) * 1000), - error="No installation ID found", + can_call_callback = self._is_valid_callback_url(deployment_callback_url) and self._is_valid_environment( + environment + ) + if not can_call_callback: + logger.warning( + "deployment_status_skipped", + extra={ + "operation": "deployment_protection_rule", + "deployment_id": deployment_id, + "environment": environment, + "reason": "invalid or missing callback_url or environment", + }, ) - logger.info("=" * 80) - logger.info(f"🚀 Processing DEPLOYMENT_PROTECTION_RULE event for {repo_full_name}") - logger.info(f" Environment: {environment} | Deployment ID: {deployment_id}") - logger.info("=" * 80) + logger.info( + "deployment_processing_start", + extra={ + "operation": "deployment_protection_rule", + "deployment_id": deployment_id, + "environment": environment, + "repo": repo_full_name, + }, + ) - rules_optional = await self.rule_provider.get_rules(repo_full_name, installation_id) - rules = rules_optional if rules_optional is not None else [] + rules = await self.rule_provider.get_rules(repo_full_name, installation_id) if not rules: - logger.info("📋 No rules found for repository") - if deployment_callback_url and environment: - await self._approve_deployment( + logger.info("No rules found for repository") + if can_call_callback: + approved = await self._approve_deployment( deployment_callback_url, environment, "No rules configured", installation_id ) + if not approved: + return ProcessingResult( + success=False, + violations=[], + api_calls_made=1, + processing_time_ms=int((time.time() - start_time) * 1000), + error="Approval API failed after retries", + ) return ProcessingResult( success=True, violations=[], @@ -75,18 +102,29 @@ async def process(self, task: Task) -> ProcessingResult: elif isinstance(r, dict): event_types = r.get("event_types", []) else: - logger.error(f"Rule is not a dict or object: {r} (type: {type(r)})") + logger.error("rule_invalid", extra={"rule": str(r), "rule_type": type(r).__name__}) continue if "deployment" in event_types: deployment_rules.append(r) if not deployment_rules: - logger.info("📋 No deployment rules found") - if deployment_callback_url and environment: - await self._approve_deployment( - deployment_callback_url, environment, "No deployment rules configured", installation_id + logger.info("No deployment rules found") + if can_call_callback: + approved = await self._approve_deployment( + deployment_callback_url, + environment, + "No deployment rules configured", + installation_id, ) + if not approved: + return ProcessingResult( + success=False, + violations=[], + api_calls_made=1, + processing_time_ms=int((time.time() - start_time) * 1000), + error="Approval API failed after retries", + ) return ProcessingResult( success=True, violations=[], @@ -94,7 +132,7 @@ async def process(self, task: Task) -> ProcessingResult: processing_time_ms=int((time.time() - start_time) * 1000), ) - logger.info(f"📋 Found {len(deployment_rules)} applicable rules for deployment") + logger.info("Found %d applicable rules for deployment", len(deployment_rules)) formatted_rules = self._convert_rules_to_new_format(deployment_rules) @@ -109,74 +147,143 @@ async def process(self, task: Task) -> ProcessingResult: "github_client": self.github_client, # Pass GitHub client for validators } - analysis_result = await self.engine_agent.execute( - event_type="deployment", - event_data=event_data, - rules=formatted_rules, + analysis_result = await execute_with_timeout( + self.engine_agent.execute( + event_type="deployment", + event_data=event_data, + rules=formatted_rules, + ), + timeout=AGENT_TIMEOUT_SECONDS, + timeout_message=f"Agent execution timed out after {AGENT_TIMEOUT_SECONDS}s", ) - # Extract violations from AgentResult - same pattern as acknowledgment processor - violations: list[Violation] = [] + violations = [] if analysis_result.data and "evaluation_result" in analysis_result.data: eval_result = analysis_result.data["evaluation_result"] if hasattr(eval_result, "violations"): - violations = [Violation.model_validate(v) for v in eval_result.violations] + violations = [ + v.model_dump(mode="json") if hasattr(v, "model_dump") else v for v in eval_result.violations + ] - logger.info("🔍 Analysis completed:") - logger.info(f" Violations found: {len(violations)}") + logger.info("Analysis completed: %d violations", len(violations)) for violation in violations: - logger.info(f" • {violation.message}") + logger.info("Violation: %s", violation.get("message", "Unknown violation")) if not violations: - if deployment_callback_url and environment: - await self._approve_deployment( + if can_call_callback: + approved = await self._approve_deployment( deployment_callback_url, environment, "All deployment rules passed", installation_id ) - logger.info("✅ All rules passed - deployment approved!") + if not approved: + return ProcessingResult( + success=False, + violations=[], + api_calls_made=1, + processing_time_ms=int((time.time() - start_time) * 1000), + error="Approval API failed after retries", + ) + logger.info("All rules passed, deployment approved") else: - violations_dicts = [v.model_dump() for v in violations] - time_based_violations = self._check_time_based_violations(violations_dicts) - if time_based_violations: + time_based_violations = self._check_time_based_violations(violations) + if time_based_violations and can_call_callback: await get_deployment_scheduler().add_pending_deployment( { "deployment_id": deployment_id, "repo": task.repo_full_name, "installation_id": task.installation_id, - "environment": deployment.get("environment"), + "environment": environment or deployment.get("environment"), "event_data": payload, "rules": deployment_rules, - "violations": violations_dicts, + "violations": violations, "time_based_violations": time_based_violations, "created_at": time.time(), "callback_url": deployment_callback_url, } ) - logger.info("⏰ Time-based violations detected - added to scheduler for re-evaluation") + logger.info("Time-based violations detected, added to scheduler for re-evaluation") - if deployment_callback_url and environment: - await self._reject_deployment( - deployment_callback_url, environment, violations_dicts, installation_id + if can_call_callback: + rejected = await self._reject_deployment( + deployment_callback_url, environment, violations, installation_id ) - logger.info(f"❌ Deployment rejected due to {len(violations)} violations") + if not rejected: + return ProcessingResult( + success=False, + violations=violations, + api_calls_made=1, + processing_time_ms=int((time.time() - start_time) * 1000), + error="Rejection API failed after retries", + ) + logger.info("Deployment rejected due to %d violations", len(violations)) processing_time = int((time.time() - start_time) * 1000) - logger.info("=" * 80) - logger.info(f"🏁 DEPLOYMENT_PROTECTION_RULE processing completed in {processing_time}ms") - logger.info(f" State: {'approved' if not violations else 'rejected'}") - logger.info(f" Violations: {len(violations)}") - logger.info("=" * 80) + logger.info( + "deployment_processing_complete", + extra={ + "operation": "deployment_protection_rule", + "deployment_id": deployment_id, + "environment": environment, + "processing_time_ms": processing_time, + "state": "approved" if not violations else "rejected", + "violations_count": len(violations), + }, + ) return ProcessingResult( success=(not violations), violations=violations, api_calls_made=1, processing_time_ms=processing_time ) except Exception as e: - logger.error(f"❌ Error processing deployment protection rule: {str(e)}") + processing_time = int((time.time() - start_time) * 1000) + exc_payload = task.payload + exc_deployment = exc_payload.get("deployment", {}) + exc_deployment_id = exc_deployment.get("id") + exc_callback_url = exc_payload.get("deployment_callback_url") + exc_environment = exc_payload.get("environment") + logger.error( + "deployment_processing_error", + extra={ + "operation": "deployment_protection_rule", + "deployment_id": exc_deployment_id, + "error": str(e), + "processing_time_ms": processing_time, + }, + ) + if self._is_valid_callback_url(exc_callback_url) and self._is_valid_environment(exc_environment): + fallback_comment = "Processing failed. Approved as fallback to avoid indefinite blocking." + approved = await self._approve_deployment( + exc_callback_url, exc_environment, fallback_comment, task.installation_id + ) + if approved: + logger.info( + "deployment_fallback_approval", + extra={ + "operation": "deployment_protection_rule", + "deployment_id": exc_deployment_id, + "reason": "exception during processing", + }, + ) + else: + logger.warning( + "deployment_fallback_approval_failed", + extra={ + "operation": "deployment_protection_rule", + "deployment_id": exc_deployment_id, + "reason": "fallback approval API failed after retries", + }, + ) + return ProcessingResult( + success=False, + violations=[], + api_calls_made=0, + processing_time_ms=processing_time, + error=f"{e!s}. Fallback approval also failed.", + ) return ProcessingResult( success=False, violations=[], api_calls_made=0, - processing_time_ms=int((time.time() - start_time) * 1000), + processing_time_ms=processing_time, error=str(e), ) @@ -188,70 +295,89 @@ def _check_time_based_violations(violations: list[dict[str, Any]]) -> list[dict[ if any(k in v.get("rule_description", "").lower() for k in ["hours", "weekend", "time", "day"]) ] - async def _approve_deployment( - self, callback_url: str, environment: str, comment: str, installation_id: int - ) -> None: - try: + async def _send_deployment_review( + self, + callback_url: str, + environment: str, + state: str, + comment: str, + installation_id: int, + ) -> bool: + async def _do_send() -> dict[str, Any]: result = await self.github_client.review_deployment_protection_rule( callback_url=callback_url, environment=environment, - state="approved", - comment=f"✅ {comment}", + state=state, + comment=comment, installation_id=installation_id, ) if result is None: - logger.error("Failed to approve deployment - API call returned None") - else: - logger.info("Successfully approved deployment") - except Exception as e: - logger.error(f"Error approving deployment: {e}") + raise RuntimeError("review_deployment_protection_rule returned None") + return result - async def _reject_deployment( - self, callback_url: str, environment: str, violations: list[dict[str, Any]], installation_id: int - ) -> None: try: - comment_text = self._format_violations_comment(violations) - result = await self.github_client.review_deployment_protection_rule( - callback_url=callback_url, - environment=environment, - state="rejected", - comment=comment_text, - installation_id=installation_id, + await retry_async( + _do_send, + max_retries=3, + initial_delay=1.0, + max_delay=30.0, + exceptions=(Exception,), ) - if result is None: - logger.error("Failed to reject deployment - API call returned None") - else: - logger.info(f"Successfully rejected deployment with {len(violations)} violations") + logger.info("deployment_review_sent", extra={"operation": state, "environment": environment}) + return True except Exception as e: - logger.error(f"Error rejecting deployment: {e}") - # Note: We can't create a fallback deployment status here because we don't have the repo name - # The deployment will remain in "waiting" state, which is better than failing completely + logger.error( + "deployment_review_error", + extra={"operation": state, "environment": environment, "error": str(e)}, + ) + return False + + async def _approve_deployment( + self, callback_url: str, environment: str, comment: str, installation_id: int + ) -> bool: + return await self._send_deployment_review(callback_url, environment, "approved", comment, installation_id) + + async def _reject_deployment( + self, callback_url: str, environment: str, violations: list[dict[str, Any]], installation_id: int + ) -> bool: + comment_text = self._format_violations_comment(violations) + return await self._send_deployment_review(callback_url, environment, "rejected", comment_text, installation_id) def _convert_rules_to_new_format(self, rules: list[Any]) -> list[dict[str, Any]]: formatted_rules = [] for rule in rules: - # Convert Rule object to dict format - rule_dict = { - "description": rule.description, - "enabled": rule.enabled, - "severity": rule.severity.value if hasattr(rule.severity, "value") else rule.severity, - "event_types": [et.value if hasattr(et, "value") else et for et in rule.event_types], - "parameters": rule.parameters if hasattr(rule, "parameters") else {}, - } - # If no parameters field, try to extract from conditions (backward compatibility) - if not rule_dict["parameters"] and hasattr(rule, "conditions"): - for condition in rule.conditions: - rule_dict["parameters"].update(condition.parameters) + if isinstance(rule, dict): + rule_dict = { + "description": rule.get("description", rule.get("rule_description", "")), + "enabled": rule.get("enabled", True), + "severity": rule.get("severity", "medium"), + "event_types": rule.get("event_types", []), + "parameters": rule.get("parameters", {}), + } + else: + rule_dict = { + "description": getattr(rule, "description", ""), + "enabled": getattr(rule, "enabled", True), + "severity": ( + rule.severity.value if hasattr(rule.severity, "value") else getattr(rule, "severity", "medium") + ), + "event_types": [ + et.value if hasattr(et, "value") else et for et in getattr(rule, "event_types", []) + ], + "parameters": getattr(rule, "parameters", {}) or {}, + } + if not rule_dict["parameters"] and hasattr(rule, "conditions"): + for condition in rule.conditions: + rule_dict["parameters"].update(getattr(condition, "parameters", {})) formatted_rules.append(rule_dict) return formatted_rules @staticmethod - def _format_violations_comment(violations: list[dict[str, Any]]) -> str: - text = "🚫 **Deployment Blocked - Rule Violations Detected**\n" + def _format_violations_comment(violations): + text = "**Deployment Blocked - Rule Violations Detected**\n" for v in violations: - emoji = "❌" if v.get("severity", "high") in ("critical", "high") else "⚠️" rule_description = v.get("rule_description", v.get("rule", v.get("description", "Unknown"))) - text += f"{emoji} **{rule_description}**\n" + text += f"**{rule_description}**\n" text += f"**Severity:** {v.get('severity', 'high').capitalize()}\n" text += f"**Issue:** {v.get('message', '')}\n" text += f"**Solution:** {v.get('how_to_fix', 'See documentation.')}\n" @@ -264,7 +390,7 @@ async def prepare_webhook_data(self, task: Task) -> dict[str, Any]: async def prepare_api_data(self, task: Task) -> dict[str, Any]: return {} - def _get_rule_provider(self) -> Any: + def _get_rule_provider(self): from src.rules.loaders.github_loader import github_rule_loader return github_rule_loader diff --git a/src/tasks/scheduler/deployment_scheduler.py b/src/tasks/scheduler/deployment_scheduler.py index 7f3a5a6..d2c77cb 100644 --- a/src/tasks/scheduler/deployment_scheduler.py +++ b/src/tasks/scheduler/deployment_scheduler.py @@ -6,6 +6,8 @@ import structlog from src.agents import get_agent +from src.core.utils.retry import retry_async +from src.core.utils.timeout import execute_with_timeout if TYPE_CHECKING: from src.agents.base import BaseAgent @@ -13,6 +15,9 @@ logger = structlog.get_logger(__name__) +AGENT_TIMEOUT_SECONDS = 30.0 +MAX_CONSECUTIVE_FAILURES = 3 + class DeploymentScheduler: """Scheduler for re-evaluating time-based deployment rules.""" @@ -39,7 +44,7 @@ async def start(self) -> None: self.running = True self.scheduler_task = asyncio.create_task(self._scheduler_loop()) - logger.info("🕒 Deployment scheduler started - checking every 15 minutes") + logger.info("Deployment scheduler started, checking every 15 minutes") async def stop(self) -> None: """Stop the scheduler.""" @@ -49,7 +54,7 @@ async def stop(self) -> None: # SIM105: Use contextlib.suppress instead of try-except-pass with contextlib.suppress(asyncio.CancelledError): await self.scheduler_task - logger.info("🛑 Deployment scheduler stopped") + logger.info("Deployment scheduler stopped") async def add_pending_deployment(self, deployment_data: dict[str, Any]) -> None: """ @@ -79,20 +84,23 @@ async def add_pending_deployment(self, deployment_data: dict[str, Any]) -> None: "violations", "time_based_violations", "created_at", + "callback_url", ] - missing_fields = [field for field in required_fields if not deployment_data.get(field)] + missing_fields = [f for f in required_fields if f not in deployment_data] if missing_fields: - logger.error(f"Missing required fields for deployment scheduler: {missing_fields}") + logger.error("deployment_scheduler_missing_fields", missing=missing_fields) return - logger.info(f"⏰ Adding deployment {deployment_data['deployment_id']} to scheduler") - logger.info(f" Repo: {deployment_data['repo']}") - logger.info(f" Installation: {deployment_data['installation_id']}") - logger.info(f" Time-based violations: {len(deployment_data.get('time_based_violations', []))}") + logger.info( + "deployment_scheduler_add", + deployment_id=deployment_data["deployment_id"], + repo=deployment_data["repo"], + time_based_violations=len(deployment_data.get("time_based_violations", [])), + ) self.pending_deployments.append(deployment_data) except Exception as e: - logger.error(f"Error adding deployment to scheduler: {e}") + logger.error("deployment_scheduler_add_error", error=str(e)) async def _scheduler_loop(self) -> None: """Main scheduler loop - runs every 15 minutes.""" @@ -105,7 +113,7 @@ async def _scheduler_loop(self) -> None: logger.info("Scheduler loop cancelled") break except Exception as e: - logger.error(f"Scheduler error: {e}") + logger.error("deployment_scheduler_loop_error", error=str(e)) # Wait 1 minute on error before retrying await asyncio.sleep(60) @@ -116,7 +124,9 @@ async def _check_pending_deployments(self) -> None: current_time = datetime.now(UTC) logger.info( - f"🔍 Checking {len(self.pending_deployments)} pending deployments at {current_time.strftime('%Y-%m-%d %H:%M:%S')} UTC" + "deployment_scheduler_check", + pending_count=len(self.pending_deployments), + time_utc=current_time.strftime("%Y-%m-%d %H:%M:%S"), ) deployments_to_remove = [] @@ -125,87 +135,149 @@ async def _check_pending_deployments(self) -> None: try: # Check if deployment is too old (remove after 7 days) created_at = deployment.get("created_at") - if created_at: - if isinstance(created_at, int | float): - # Convert timestamp to datetime - created_at = datetime.fromtimestamp(created_at) - age = current_time - created_at - if age > timedelta(days=7): - logger.info( - f"⏰ Removing expired deployment for {deployment.get('repo')} (age: {age.days} days)" - ) - deployments_to_remove.append(i) - continue + if not created_at: + logger.warning( + "deployment_scheduler_remove_invalid", + repo=deployment.get("repo"), + reason="no created_at timestamp", + ) + deployments_to_remove.append(i) + continue + + if isinstance(created_at, int | float): + created_at_dt = datetime.fromtimestamp(created_at, tz=UTC) + elif hasattr(created_at, "year"): + created_at_dt = ( + created_at if getattr(created_at, "tzinfo", None) else created_at.replace(tzinfo=UTC) + ) else: - # If no created_at timestamp, remove the deployment as it's invalid - logger.warning(f"⏰ Removing deployment for {deployment.get('repo')} with no created_at timestamp") + logger.warning( + "deployment_scheduler_remove_invalid", + repo=deployment.get("repo"), + reason="invalid created_at format", + ) + deployments_to_remove.append(i) + continue + + age = current_time - created_at_dt + if age > timedelta(days=7): + logger.info( + "deployment_scheduler_remove_expired", + repo=deployment.get("repo"), + age_days=age.days, + ) deployments_to_remove.append(i) continue # Update last checked time deployment["last_checked"] = current_time - # Re-evaluate the deployment - should_approve = await self._re_evaluate_deployment(deployment) + should_approve, should_remove = await self._re_evaluate_deployment(deployment) if should_approve: - # Approve the deployment - await self._approve_deployment(deployment) + approved = await self._approve_deployment(deployment) + if approved: + deployments_to_remove.append(i) + else: + logger.warning( + "deployment_scheduler_approval_failed", + repo=deployment.get("repo"), + reason="GitHub API approval failed", + ) + elif should_remove: deployments_to_remove.append(i) else: - logger.info(f"⏳ Deployment for {deployment.get('repo')} still blocked by time-based rules") + logger.info( + "deployment_scheduler_still_blocked", + repo=deployment.get("repo"), + reason="time-based rules", + ) except Exception as e: - logger.error(f"Error re-evaluating deployment {deployment.get('repo', 'unknown')}: {e}") + logger.error("deployment_scheduler_check_error", repo=deployment.get("repo", "unknown"), error=str(e)) # Remove processed deployments (in reverse order to maintain indices) for i in reversed(deployments_to_remove): removed = self.pending_deployments.pop(i) - logger.info(f"✅ Removed deployment for {removed.get('repo')} from scheduler") + logger.info("deployment_scheduler_removed", repo=removed.get("repo")) if self.pending_deployments: - logger.info(f"📋 {len(self.pending_deployments)} deployments still pending") + logger.info("deployment_scheduler_pending", count=len(self.pending_deployments)) - async def _re_evaluate_deployment(self, deployment: dict[str, Any]) -> bool: - """Re-evaluate a deployment against current time-based rules.""" + async def _re_evaluate_deployment(self, deployment: dict[str, Any]) -> tuple[bool, bool]: + """ + Re-evaluate a deployment against current time-based rules. + Returns (should_approve, should_remove). When should_remove is True, + deployment is removed from scheduler without approval (e.g. non-time violations). + """ try: # Validate required fields required_fields = ["repo", "environment", "installation_id", "event_data", "rules"] - missing_fields = [field for field in required_fields if not deployment.get(field)] + missing_fields = [f for f in required_fields if f not in deployment] if missing_fields: - logger.error(f"Missing required fields for deployment re-evaluation: {missing_fields}") - return False + logger.error( + "deployment_scheduler_missing_fields", + repo=deployment.get("repo"), + missing=missing_fields, + ) + return False, True logger.info( - f"🔄 Re-evaluating deployment for {deployment.get('repo')} environment {deployment.get('environment')}" + "deployment_scheduler_reevaluate", + repo=deployment.get("repo"), + environment=deployment.get("environment"), ) # Refresh the GitHub token (it might have expired) try: fresh_token = await github_client.get_installation_access_token(deployment["installation_id"]) if not fresh_token: - logger.error(f"Failed to get fresh GitHub token for installation {deployment['installation_id']}") - return False + logger.error( + "deployment_scheduler_token_failed", + installation_id=deployment["installation_id"], + ) + failure_count = deployment.get("failure_count", 0) + 1 + deployment["failure_count"] = failure_count + if failure_count >= MAX_CONSECUTIVE_FAILURES: + return False, True + return False, False deployment["github_token"] = fresh_token except Exception as e: - logger.error(f"Failed to refresh GitHub token: {e}") - return False + logger.error("deployment_scheduler_token_error", error=str(e)) + failure_count = deployment.get("failure_count", 0) + 1 + deployment["failure_count"] = failure_count + if failure_count >= MAX_CONSECUTIVE_FAILURES: + return False, True + return False, False # Convert rules to the format expected by the analysis agent formatted_rules = DeploymentScheduler._convert_rules_to_new_format(deployment["rules"]) - # Re-run rule analysis - result = await self.engine_agent.execute( - event_type="deployment", - event_data=deployment["event_data"], - rules=formatted_rules, + result = await execute_with_timeout( + self.engine_agent.execute( + event_type="deployment", + event_data=deployment["event_data"], + rules=formatted_rules, + ), + timeout=AGENT_TIMEOUT_SECONDS, + timeout_message=f"Agent execution timed out after {AGENT_TIMEOUT_SECONDS}s", ) - violations = result.data.get("violations", []) + violations = [] + eval_result = result.data.get("evaluation_result") if result.data else None + if eval_result and hasattr(eval_result, "violations"): + for v in eval_result.violations: + violations.append( + { + "rule_description": getattr(v, "rule_description", ""), + "message": getattr(v, "message", ""), + } + ) if not violations: - logger.info("✅ No violations found - deployment can be approved") - return True + deployment["failure_count"] = 0 + logger.info("deployment_scheduler_no_violations", repo=deployment.get("repo")) + return True, False # Check if any violations are still time-based time_based_violations = [] @@ -237,59 +309,99 @@ async def _re_evaluate_deployment(self, deployment: dict[str, Any]) -> bool: other_violations.append(violation) if other_violations: - logger.info(f"❌ Deployment still has non-time-based violations: {len(other_violations)}") - # Remove from scheduler since these won't resolve automatically - return False - - if time_based_violations: - logger.info(f"⏰ Deployment still blocked by {len(time_based_violations)} time-based violations") - return False + deployment["failure_count"] = 0 + logger.info( + "deployment_scheduler_non_time_violations", + repo=deployment.get("repo"), + count=len(other_violations), + ) + return False, True - # No violations left - logger.info("✅ All violations resolved - deployment can be approved") - return True + deployment["failure_count"] = 0 + logger.info( + "deployment_scheduler_time_violations", + repo=deployment.get("repo"), + count=len(time_based_violations), + ) + return False, False except Exception as e: - logger.error(f"Error re-evaluating deployment: {e}") + failure_count = deployment.get("failure_count", 0) + 1 + deployment["failure_count"] = failure_count + logger.error( + "deployment_scheduler_reevaluate_error", + repo=deployment.get("repo"), + error=str(e), + failure_count=failure_count, + ) + if failure_count >= MAX_CONSECUTIVE_FAILURES: + logger.warning( + "deployment_scheduler_remove_after_failures", + repo=deployment.get("repo"), + failure_count=failure_count, + ) + return False, True + return False, False + + async def _approve_deployment(self, deployment: dict[str, Any]) -> bool: + """Approve a previously rejected deployment. Returns True if approval succeeded.""" + callback_url = deployment.get("callback_url") + installation_id = deployment.get("installation_id") + repo = deployment.get("repo", "unknown") + environment = deployment.get("environment", "unknown") + deployment_id = deployment.get("deployment_id") + + if not callback_url: + logger.error("deployment_approve_skipped", repo=repo, reason="no callback URL") return False - async def _approve_deployment(self, deployment: dict[str, Any]) -> None: - """Approve a previously rejected deployment.""" - try: - callback_url = deployment.get("callback_url") - installation_id = deployment.get("installation_id") - repo = deployment.get("repo", "unknown") - environment = deployment.get("environment", "unknown") - - if not callback_url: - logger.error(f"No callback URL found for deployment {repo}") - return - - if not installation_id: - logger.error(f"No installation ID found for deployment {repo}") - return + if not installation_id: + logger.error("deployment_approve_skipped", repo=repo, reason="no installation ID") + return False - comment = ( - "✅ **Deployment Automatically Approved**\n\n" - "Time-based restrictions have been lifted. The deployment can now proceed.\n\n" - f"**Environment:** {environment}\n" - f"**Approved at:** {datetime.now(UTC).strftime('%Y-%m-%d %H:%M:%S')} UTC\n\n" - "The deployment will be automatically approved on GitHub." - ) + comment = ( + "**Deployment Automatically Approved**\n\n" + "Time-based restrictions have been lifted. The deployment can now proceed.\n\n" + f"**Environment:** {environment}\n" + f"**Approved at:** {datetime.now(UTC).strftime('%Y-%m-%d %H:%M:%S')} UTC\n\n" + "The deployment will be automatically approved on GitHub." + ) - # Approve the deployment protection rule - await github_client.review_deployment_protection_rule( + async def _do_approve() -> dict[str, Any]: + result = await github_client.review_deployment_protection_rule( callback_url=callback_url, environment=environment, state="approved", comment=comment, installation_id=installation_id, ) + if result is None: + raise RuntimeError("review_deployment_protection_rule returned None") + return result - logger.info(f"✅ Deployment {deployment.get('deployment_id')} automatically approved for {repo}") - + try: + await retry_async( + _do_approve, + max_retries=3, + initial_delay=1.0, + max_delay=30.0, + exceptions=(Exception,), + ) + logger.info( + "deployment_scheduler_approved", + deployment_id=deployment_id, + repo=repo, + environment=environment, + ) + return True except Exception as e: - logger.error(f"Error approving deployment {deployment.get('deployment_id')}: {e}") + logger.error( + "deployment_approve_error", + deployment_id=deployment_id, + repo=repo, + error=str(e), + ) + return False def get_status(self) -> dict[str, Any]: """Get current scheduler status.""" @@ -300,7 +412,7 @@ def get_status(self) -> dict[str, Any]: created_at_iso = None if created_at: if isinstance(created_at, int | float): - created_at_iso = datetime.fromtimestamp(created_at).isoformat() + created_at_iso = datetime.fromtimestamp(created_at, tz=UTC).isoformat() elif hasattr(created_at, "isoformat"): created_at_iso = created_at.isoformat() else: @@ -327,7 +439,7 @@ def get_status(self) -> dict[str, Any]: "pending_deployments": pending_deployments_status, } except Exception as e: - logger.error(f"Error getting scheduler status: {e}") + logger.error("deployment_scheduler_status_error", error=str(e)) return {"running": self.running, "pending_count": len(self.pending_deployments), "error": str(e)} async def start_background_scheduler(self) -> None: diff --git a/tests/unit/event_processors/test_deployment_protection_rule.py b/tests/unit/event_processors/test_deployment_protection_rule.py new file mode 100644 index 0000000..3e2dbb2 --- /dev/null +++ b/tests/unit/event_processors/test_deployment_protection_rule.py @@ -0,0 +1,228 @@ +"""Unit tests for DeploymentProtectionRuleProcessor.""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from src.core.models import EventType +from src.event_processors.deployment_protection_rule import ( + AGENT_TIMEOUT_SECONDS, + DeploymentProtectionRuleProcessor, +) +from src.rules.models import Rule, RuleSeverity +from src.tasks.task_queue import Task + + +def _make_deployment_rule() -> Rule: + return Rule( + description="Test deployment rule", + enabled=True, + severity=RuleSeverity.MEDIUM, + event_types=[EventType.DEPLOYMENT], + parameters={}, + ) + + +@pytest.fixture +def mock_agent(): + return AsyncMock() + + +@pytest.fixture +def processor(monkeypatch, mock_agent): + monkeypatch.setattr("src.event_processors.deployment_protection_rule.get_agent", lambda x: mock_agent) + monkeypatch.setattr( + "src.event_processors.deployment_protection_rule.get_deployment_scheduler", + lambda: MagicMock(add_pending_deployment=AsyncMock()), + ) + proc = DeploymentProtectionRuleProcessor() + proc.github_client = AsyncMock() + proc.rule_provider = AsyncMock() + return proc + + +@pytest.fixture +def task(): + t = MagicMock(spec=Task) + t.repo_full_name = "owner/repo" + t.installation_id = 123 + t.payload = { + "environment": "production", + "deployment": {"id": 456, "environment": "production", "creator": {}}, + "deployment_callback_url": "https://api.github.com/repos/owner/repo/deployments/456", + "repository": {"full_name": "owner/repo"}, + "organization": {}, + } + return t + + +@pytest.mark.asyncio +async def test_exception_calls_fallback_approval(processor, mock_agent, task): + processor.rule_provider.get_rules.return_value = [_make_deployment_rule()] + mock_agent.execute.side_effect = RuntimeError("agent failed") + processor.github_client.review_deployment_protection_rule.return_value = {"status": "success"} + + result = await processor.process(task) + + assert result.success is False + assert result.error == "agent failed" + processor.github_client.review_deployment_protection_rule.assert_called_once() + call_kwargs = processor.github_client.review_deployment_protection_rule.call_args.kwargs + assert call_kwargs["state"] == "approved" + assert "fallback" in call_kwargs["comment"].lower() + + +@pytest.mark.asyncio +async def test_exception_without_callback_skips_approval(processor, mock_agent): + task = MagicMock(spec=Task) + task.repo_full_name = "owner/repo" + task.installation_id = 123 + task.payload = { + "environment": "production", + "deployment": {"id": 456}, + "deployment_callback_url": None, + "repository": {}, + } + processor.rule_provider.get_rules.return_value = [_make_deployment_rule()] + mock_agent.execute.side_effect = RuntimeError("agent failed") + + result = await processor.process(task) + + assert result.success is False + processor.github_client.review_deployment_protection_rule.assert_not_called() + + +@pytest.mark.asyncio +async def test_validation_rejects_invalid_callback_url(processor, mock_agent): + """Invalid callback URL prevents API calls even when rules exist and violations are found.""" + task = MagicMock(spec=Task) + task.repo_full_name = "owner/repo" + task.installation_id = 123 + task.payload = { + "environment": "production", + "deployment": {"id": 456}, + "deployment_callback_url": "not-a-valid-url", + "repository": {}, + } + processor.rule_provider.get_rules.return_value = [_make_deployment_rule()] + mock_agent.execute.return_value = MagicMock( + data={ + "evaluation_result": MagicMock( + violations=[ + { + "rule_description": "Deploy only during business hours", + "message": "Deployment outside allowed time", + } + ] + ) + } + ) + + result = await processor.process(task) + + assert result.success is False + assert len(result.violations) == 1 + processor.github_client.review_deployment_protection_rule.assert_not_called() + + +def test_is_valid_callback_url(): + assert DeploymentProtectionRuleProcessor._is_valid_callback_url("https://api.github.com/callback") is True + assert DeploymentProtectionRuleProcessor._is_valid_callback_url("http://localhost/callback") is True + assert DeploymentProtectionRuleProcessor._is_valid_callback_url("") is False + assert DeploymentProtectionRuleProcessor._is_valid_callback_url(None) is False + assert DeploymentProtectionRuleProcessor._is_valid_callback_url("ftp://invalid") is False + + +def test_is_valid_environment(): + assert DeploymentProtectionRuleProcessor._is_valid_environment("production") is True + assert DeploymentProtectionRuleProcessor._is_valid_environment("") is False + assert DeploymentProtectionRuleProcessor._is_valid_environment(None) is False + + +@pytest.mark.asyncio +async def test_timeout_triggers_fallback_approval(processor, mock_agent, task): + """When agent exceeds AGENT_TIMEOUT_SECONDS, fallback approval is triggered.""" + processor.rule_provider.get_rules.return_value = [_make_deployment_rule()] + processor.github_client.review_deployment_protection_rule.return_value = {"status": "success"} + + with patch( + "src.event_processors.deployment_protection_rule.execute_with_timeout", + side_effect=TimeoutError(f"Agent execution timed out after {AGENT_TIMEOUT_SECONDS}s"), + ): + result = await processor.process(task) + + assert result.success is False + assert "timed out" in result.error.lower() + processor.github_client.review_deployment_protection_rule.assert_called_once() + call_kwargs = processor.github_client.review_deployment_protection_rule.call_args.kwargs + assert call_kwargs["state"] == "approved" + assert "fallback" in call_kwargs["comment"].lower() + + +@pytest.mark.asyncio +async def test_retry_exhaustion_returns_failure(processor, mock_agent, task): + """When review_deployment_protection_rule returns None and retries exhaust, process returns failure.""" + processor.rule_provider.get_rules.return_value = [_make_deployment_rule()] + mock_agent.execute.side_effect = RuntimeError("agent failed") + processor.github_client.review_deployment_protection_rule.return_value = None + + result = await processor.process(task) + + assert result.success is False + assert "agent failed" in result.error + assert "fallback" in result.error.lower() + assert processor.github_client.review_deployment_protection_rule.call_count >= 1 + + +@pytest.mark.asyncio +async def test_time_based_violations_forwarded_to_scheduler(processor, mock_agent, task): + """Violations matching time-based keywords are forwarded to the deployment scheduler.""" + mock_scheduler = MagicMock(add_pending_deployment=AsyncMock()) + with patch( + "src.event_processors.deployment_protection_rule.get_deployment_scheduler", + return_value=mock_scheduler, + ): + processor.rule_provider.get_rules.return_value = [_make_deployment_rule()] + mock_agent.execute.return_value = MagicMock( + data={ + "evaluation_result": MagicMock( + violations=[ + {"rule_description": "Deploy only during business hours", "message": "Outside allowed time"} + ] + ) + } + ) + + result = await processor.process(task) + + assert result.success is False + assert len(result.violations) == 1 + mock_scheduler.add_pending_deployment.assert_called_once() + call_args = mock_scheduler.add_pending_deployment.call_args[0][0] + assert call_args["deployment_id"] == 456 + assert call_args["repo"] == "owner/repo" + assert len(call_args["time_based_violations"]) == 1 + assert "business hours" in call_args["time_based_violations"][0]["rule_description"] + + +@pytest.mark.asyncio +async def test_happy_path_with_violations_rejects(processor, mock_agent, task): + """When agent returns violations, deployment is rejected via review API.""" + processor.rule_provider.get_rules.return_value = [_make_deployment_rule()] + mock_agent.execute.return_value = MagicMock( + data={ + "evaluation_result": MagicMock( + violations=[{"rule_description": "No deployment on weekends", "message": "Deployment blocked"}] + ) + } + ) + processor.github_client.review_deployment_protection_rule.return_value = {"status": "success"} + + result = await processor.process(task) + + assert result.success is False + assert len(result.violations) == 1 + processor.github_client.review_deployment_protection_rule.assert_called_once() + call_kwargs = processor.github_client.review_deployment_protection_rule.call_args.kwargs + assert call_kwargs["state"] == "rejected" + assert "weekends" in call_kwargs["comment"].lower()