From 76813654e3f231bad5816b0024e832f4d3cfdf59 Mon Sep 17 00:00:00 2001 From: naaa760 Date: Tue, 17 Feb 2026 19:56:45 +0530 Subject: [PATCH 1/4] feat: add event filtering to skip irrelevant gitHub events --- src/core/utils/event_filter.py | 93 ++++++++++++ src/event_processors/push.py | 9 ++ src/webhooks/dispatcher.py | 5 + src/webhooks/handlers/pull_request.py | 9 -- .../integration/webhooks/test_webhook_flow.py | 58 +++++++- tests/unit/core/test_event_filter.py | 135 ++++++++++++++++++ .../test_pull_request_processor.py | 22 +++ .../event_processors/test_push_processor.py | 20 +++ 8 files changed, 339 insertions(+), 12 deletions(-) create mode 100644 src/core/utils/event_filter.py create mode 100644 tests/unit/core/test_event_filter.py diff --git a/src/core/utils/event_filter.py b/src/core/utils/event_filter.py new file mode 100644 index 0000000..47dd6e9 --- /dev/null +++ b/src/core/utils/event_filter.py @@ -0,0 +1,93 @@ +""" +Event filtering for GitHub webhooks. + +Centralized logic to skip rule evaluation on irrelevant events: +branch deletions, closed/merged PRs, archived repos, etc. +""" + +from dataclasses import dataclass + +import structlog + +from src.core.models import EventType, WebhookEvent + +logger = structlog.get_logger() + +NULL_SHA = "0000000000000000000000000000000000000000" + +PR_ACTIONS_PROCESS = frozenset({"opened", "synchronize", "reopened"}) + + +@dataclass +class FilterResult: + """Result of event filter check.""" + + should_process: bool + reason: str = "" + + +def should_process_event(event: WebhookEvent) -> FilterResult: + """ + Determine if an event should trigger rule evaluation. + + Returns FilterResult with should_process=True to process, False to skip. + Logs filtered events for observability. + """ + payload = event.payload + event_type = event.event_type + + result = _apply_filters(event_type, payload) + if not result.should_process: + logger.info( + "event_filtered", + event_type=event_type.value if hasattr(event_type, "value") else str(event_type), + repo=event.repo_full_name, + reason=result.reason, + ) + return result + + +def _apply_filters(event_type: EventType, payload: dict) -> FilterResult: + if _is_repo_archived(payload): + return FilterResult(should_process=False, reason="Repository is archived") + + if event_type == EventType.PULL_REQUEST: + return _filter_pull_request(payload) + if event_type == EventType.PUSH: + return _filter_push(payload) + return FilterResult(should_process=True) + + +def _filter_pull_request(payload: dict) -> FilterResult: + action = payload.get("action") + if action not in PR_ACTIONS_PROCESS: + return FilterResult(should_process=False, reason=f"PR action '{action}' not processed") + + pr = payload.get("pull_request", {}) + state = pr.get("state", "") + if state != "open": + return FilterResult(should_process=False, reason=f"PR state '{state}' not open") + + if pr.get("merged"): + return FilterResult(should_process=False, reason="PR already merged") + + if pr.get("draft"): + return FilterResult(should_process=False, reason="PR is draft") + + return FilterResult(should_process=True) + + +def _filter_push(payload: dict) -> FilterResult: + if payload.get("deleted"): + return FilterResult(should_process=False, reason="Branch deletion event") + + after = payload.get("after") + if not after or after == NULL_SHA: + return FilterResult(should_process=False, reason="No valid commit SHA (deleted or empty push)") + + return FilterResult(should_process=True) + + +def _is_repo_archived(payload: dict) -> bool: + repo = payload.get("repository", {}) + return isinstance(repo, dict) and bool(repo.get("archived")) diff --git a/src/event_processors/push.py b/src/event_processors/push.py index 2e77bf8..78672bd 100644 --- a/src/event_processors/push.py +++ b/src/event_processors/push.py @@ -37,6 +37,15 @@ async def process(self, task: Task) -> ProcessingResult: logger.info(f" Commits: {len(commits)}") logger.info("=" * 80) + if payload.get("deleted") or not payload.get("after") or payload.get("after") == "0000000000000000000000000000000000000000": + logger.info("push_skipped_deleted_or_empty") + return ProcessingResult( + success=True, + violations=[], + api_calls_made=0, + processing_time_ms=int((time.time() - start_time) * 1000), + ) + event_data = { "push": { "ref": ref, diff --git a/src/webhooks/dispatcher.py b/src/webhooks/dispatcher.py index 541b14a..acf3d0a 100644 --- a/src/webhooks/dispatcher.py +++ b/src/webhooks/dispatcher.py @@ -4,6 +4,7 @@ import structlog from src.core.models import EventType, WebhookEvent +from src.core.utils.event_filter import should_process_event from src.tasks.task_queue import TaskQueue, task_queue logger = structlog.get_logger() @@ -41,6 +42,10 @@ async def dispatch(self, event: WebhookEvent) -> dict[str, Any]: log.warning("handler_not_found") return {"status": "skipped", "reason": f"No handler for event type {event_type}"} + filter_result = should_process_event(event) + if not filter_result.should_process: + return {"status": "filtered", "reason": filter_result.reason, "event_type": event_type} + # Offload to TaskQueue for background execution (delivery_id so each webhook delivery is processed) success = await self.queue.enqueue(handler, event_type, event.payload, event, delivery_id=event.delivery_id) diff --git a/src/webhooks/handlers/pull_request.py b/src/webhooks/handlers/pull_request.py index fdb9fdd..b64d33b 100644 --- a/src/webhooks/handlers/pull_request.py +++ b/src/webhooks/handlers/pull_request.py @@ -33,15 +33,6 @@ async def handle(self, event: WebhookEvent) -> WebhookResponse: pr_number=event.payload.get("pull_request", {}).get("number"), action=event.payload.get("action"), ) - - # Filter relevant actions to reduce noise (optional but good practice) - action = event.payload.get("action") - if action not in ["opened", "synchronize", "reopened", "edited"]: - log.info("pr_action_ignored", action=action) - return WebhookResponse( - status="ignored", detail=f"PR action '{action}' is not processed", event_type=EventType.PULL_REQUEST - ) - log.info("pr_handler_invoked") try: diff --git a/tests/integration/webhooks/test_webhook_flow.py b/tests/integration/webhooks/test_webhook_flow.py index 937b2e9..fe15cab 100644 --- a/tests/integration/webhooks/test_webhook_flow.py +++ b/tests/integration/webhooks/test_webhook_flow.py @@ -36,7 +36,7 @@ def fresh_queue() -> TaskQueue: @pytest.fixture def valid_pr_payload() -> dict[str, object]: - """Valid pull request webhook payload.""" + """Valid pull request webhook payload (open PR, passes event filter).""" return { "action": "opened", "sender": {"login": "octocat", "id": 1, "type": "User"}, @@ -47,7 +47,14 @@ def valid_pr_payload() -> dict[str, object]: "private": False, "html_url": "https://github.com/octocat/watchflow", }, - "pull_request": {"number": 42, "title": "Test PR", "body": "Test body"}, + "pull_request": { + "number": 42, + "title": "Test PR", + "body": "Test body", + "state": "open", + "merged": False, + "draft": False, + }, } @@ -157,7 +164,9 @@ async def test_multiple_event_types_flow( "html_url": "https://github.com/octocat/watchflow", }, "ref": "refs/heads/main", - "commits": [], + "deleted": False, + "after": "abc123def456", + "commits": [{"id": "abc123def456"}], } with patch("src.webhooks.router.dispatcher", fresh_dispatcher): @@ -223,6 +232,49 @@ async def test_handler_exception_doesnt_break_flow( # Handler was called and exception was caught assert failing_handler.called + @pytest.mark.asyncio + @respx.mock + async def test_filtered_event_not_dispatched( + self, + app: FastAPI, + fresh_dispatcher: WebhookDispatcher, + fresh_queue: TaskQueue, + valid_headers: dict[str, str], + ) -> None: + """Test that filtered events (e.g. branch deletion) are not dispatched.""" + mock_handler = AsyncMock() + fresh_dispatcher.register_handler("push", mock_handler) + await fresh_queue.start_workers() + + deleted_branch_payload = { + "sender": {"login": "octocat", "id": 1, "type": "User"}, + "repository": { + "id": 123456, + "name": "watchflow", + "full_name": "octocat/watchflow", + "private": False, + "html_url": "https://github.com/octocat/watchflow", + }, + "ref": "refs/heads/feature", + "deleted": True, + "after": "0000000000000000000000000000000000000000", + "commits": [], + } + + with patch("src.webhooks.router.dispatcher", fresh_dispatcher): + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + response = await client.post( + "/webhooks/github", + json=deleted_branch_payload, + headers={**valid_headers, "X-GitHub-Event": "push"}, + ) + + assert response.status_code == 200 + await asyncio.sleep(0.1) + await fresh_queue.queue.join() + + assert mock_handler.call_count == 0 + @pytest.mark.asyncio @respx.mock async def test_no_handler_registered_flow( diff --git a/tests/unit/core/test_event_filter.py b/tests/unit/core/test_event_filter.py new file mode 100644 index 0000000..773dd71 --- /dev/null +++ b/tests/unit/core/test_event_filter.py @@ -0,0 +1,135 @@ +import pytest + +from src.core.models import EventType, WebhookEvent +from src.core.utils.event_filter import FilterResult, should_process_event + + +def _make_event(event_type: EventType, payload: dict) -> WebhookEvent: + return WebhookEvent(event_type=event_type, payload=payload) + + +def test_pull_request_opened_processes(): + payload = { + "action": "opened", + "repository": {"full_name": "owner/repo"}, + "pull_request": {"state": "open", "merged": False, "draft": False}, + } + result = should_process_event(_make_event(EventType.PULL_REQUEST, payload)) + assert result.should_process is True + + +def test_pull_request_synchronize_processes(): + payload = { + "action": "synchronize", + "repository": {"full_name": "owner/repo"}, + "pull_request": {"state": "open", "merged": False, "draft": False}, + } + result = should_process_event(_make_event(EventType.PULL_REQUEST, payload)) + assert result.should_process is True + + +def test_pull_request_reopened_processes(): + payload = { + "action": "reopened", + "repository": {"full_name": "owner/repo"}, + "pull_request": {"state": "open", "merged": False, "draft": False}, + } + result = should_process_event(_make_event(EventType.PULL_REQUEST, payload)) + assert result.should_process is True + + +def test_pull_request_closed_action_filtered(): + payload = { + "action": "closed", + "repository": {"full_name": "owner/repo"}, + "pull_request": {"state": "closed", "merged": True}, + } + result = should_process_event(_make_event(EventType.PULL_REQUEST, payload)) + assert result.should_process is False + assert "closed" in result.reason or "not processed" in result.reason + + +def test_pull_request_merged_filtered(): + payload = { + "action": "opened", + "repository": {"full_name": "owner/repo"}, + "pull_request": {"state": "closed", "merged": True, "draft": False}, + } + result = should_process_event(_make_event(EventType.PULL_REQUEST, payload)) + assert result.should_process is False + assert "merged" in result.reason or "not open" in result.reason + + +def test_pull_request_draft_filtered(): + payload = { + "action": "opened", + "repository": {"full_name": "owner/repo"}, + "pull_request": {"state": "open", "merged": False, "draft": True}, + } + result = should_process_event(_make_event(EventType.PULL_REQUEST, payload)) + assert result.should_process is False + assert "draft" in result.reason + + +def test_push_valid_processes(): + payload = { + "repository": {"full_name": "owner/repo"}, + "ref": "refs/heads/main", + "deleted": False, + "after": "abc123", + "commits": [{}], + } + result = should_process_event(_make_event(EventType.PUSH, payload)) + assert result.should_process is True + + +def test_push_deleted_branch_filtered(): + payload = { + "repository": {"full_name": "owner/repo"}, + "ref": "refs/heads/feature", + "deleted": True, + "after": "0000000000000000000000000000000000000000", + } + result = should_process_event(_make_event(EventType.PUSH, payload)) + assert result.should_process is False + assert "deletion" in result.reason or "Branch" in result.reason + + +def test_push_null_sha_filtered(): + payload = { + "repository": {"full_name": "owner/repo"}, + "ref": "refs/heads/main", + "deleted": False, + "after": "0000000000000000000000000000000000000000", + } + result = should_process_event(_make_event(EventType.PUSH, payload)) + assert result.should_process is False + + +def test_push_empty_after_filtered(): + payload = { + "repository": {"full_name": "owner/repo"}, + "ref": "refs/heads/main", + "deleted": False, + "after": "", + } + result = should_process_event(_make_event(EventType.PUSH, payload)) + assert result.should_process is False + + +def test_archived_repo_filtered(): + payload = { + "repository": {"full_name": "owner/repo", "archived": True}, + "action": "opened", + "pull_request": {"state": "open", "merged": False, "draft": False}, + } + result = should_process_event(_make_event(EventType.PULL_REQUEST, payload)) + assert result.should_process is False + assert "archived" in result.reason + + +def test_other_event_types_process(): + payload = {"repository": {"full_name": "owner/repo"}} + for evt in (EventType.CHECK_RUN, EventType.DEPLOYMENT, EventType.DEPLOYMENT_STATUS): + result = should_process_event(_make_event(evt, payload)) + assert result.should_process is True diff --git a/tests/unit/event_processors/test_pull_request_processor.py b/tests/unit/event_processors/test_pull_request_processor.py index 23962fb..c299832 100644 --- a/tests/unit/event_processors/test_pull_request_processor.py +++ b/tests/unit/event_processors/test_pull_request_processor.py @@ -56,6 +56,28 @@ async def test_process_success(processor, mock_agent): processor.check_run_manager.create_check_run.assert_awaited_once() +@pytest.mark.asyncio +async def test_process_closed_pr_skipped(processor): + task = MagicMock(spec=Task) + task.repo_full_name = "owner/repo" + task.installation_id = 1 + task.payload = { + "action": "closed", + "pull_request": { + "number": 1, + "state": "closed", + "merged": True, + "head": {"sha": "sha123"}, + }, + } + + result = await processor.process(task) + + assert result.success is True + assert result.violations == [] + processor.enricher.enrich_event_data.assert_not_called() + + @pytest.mark.asyncio async def test_process_with_violations(processor, mock_agent): task = MagicMock(spec=Task) diff --git a/tests/unit/event_processors/test_push_processor.py b/tests/unit/event_processors/test_push_processor.py index 5f02641..b36c224 100644 --- a/tests/unit/event_processors/test_push_processor.py +++ b/tests/unit/event_processors/test_push_processor.py @@ -86,6 +86,26 @@ async def test_process_success_no_violations(processor, task, mock_rule_provider assert call_args["violations"] == [] +@pytest.mark.asyncio +async def test_process_deleted_branch_skipped(processor, mock_rule_provider): + task = MagicMock(spec=Task) + task.repo_full_name = "owner/repo" + task.installation_id = 123 + task.payload = { + "ref": "refs/heads/feature", + "deleted": True, + "after": "0000000000000000000000000000000000000000", + "commits": [], + } + + result = await processor.process(task) + + assert result.success is True + assert result.violations == [] + processor.engine_agent.execute.assert_not_called() + processor.check_run_manager.create_check_run.assert_not_awaited() + + @pytest.mark.asyncio async def test_process_with_violations(processor, task, mock_rule_provider): # Setup rules From 99989b5309dd0514abb49b899aea88187b2f7283 Mon Sep 17 00:00:00 2001 From: naaa760 Date: Tue, 17 Feb 2026 19:57:38 +0530 Subject: [PATCH 2/4] update --- src/event_processors/pull_request/processor.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/event_processors/pull_request/processor.py b/src/event_processors/pull_request/processor.py index ccbc86a..501180f 100644 --- a/src/event_processors/pull_request/processor.py +++ b/src/event_processors/pull_request/processor.py @@ -48,6 +48,20 @@ async def process(self, task: Task) -> ProcessingResult: error="No installation ID found", ) + if pr_data.get("state") == "closed" or pr_data.get("merged") or pr_data.get("draft"): + logger.info( + "pr_skipped_invalid_state", + state=pr_data.get("state"), + merged=pr_data.get("merged"), + draft=pr_data.get("draft"), + ) + return ProcessingResult( + success=True, + violations=[], + api_calls_made=0, + processing_time_ms=int((time.time() - start_time) * 1000), + ) + try: logger.info("=" * 80) logger.info(f"🚀 Processing PR event for {repo_full_name}") From f171cd5e916af83c57395ba2cf23294c8cfcaa7f Mon Sep 17 00:00:00 2001 From: naaa760 Date: Fri, 20 Feb 2026 17:45:40 +0530 Subject: [PATCH 3/4] update --- src/event_processors/push.py | 6 +++++- tests/unit/core/test_event_filter.py | 4 +--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/event_processors/push.py b/src/event_processors/push.py index 78672bd..1bb5e3e 100644 --- a/src/event_processors/push.py +++ b/src/event_processors/push.py @@ -37,7 +37,11 @@ async def process(self, task: Task) -> ProcessingResult: logger.info(f" Commits: {len(commits)}") logger.info("=" * 80) - if payload.get("deleted") or not payload.get("after") or payload.get("after") == "0000000000000000000000000000000000000000": + if ( + payload.get("deleted") + or not payload.get("after") + or payload.get("after") == "0000000000000000000000000000000000000000" + ): logger.info("push_skipped_deleted_or_empty") return ProcessingResult( success=True, diff --git a/tests/unit/core/test_event_filter.py b/tests/unit/core/test_event_filter.py index 773dd71..007ccb5 100644 --- a/tests/unit/core/test_event_filter.py +++ b/tests/unit/core/test_event_filter.py @@ -1,7 +1,5 @@ -import pytest - from src.core.models import EventType, WebhookEvent -from src.core.utils.event_filter import FilterResult, should_process_event +from src.core.utils.event_filter import should_process_event def _make_event(event_type: EventType, payload: dict) -> WebhookEvent: From e49625952e1f4d86b723feb49643d47f2c0af8a5 Mon Sep 17 00:00:00 2001 From: naaa760 Date: Mon, 23 Feb 2026 17:08:09 +0530 Subject: [PATCH 4/4] fix: address code review feedback for event filtering" --- src/core/utils/event_filter.py | 11 ++++++----- src/event_processors/push.py | 7 ++----- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/src/core/utils/event_filter.py b/src/core/utils/event_filter.py index 47dd6e9..2c8d959 100644 --- a/src/core/utils/event_filter.py +++ b/src/core/utils/event_filter.py @@ -6,6 +6,7 @@ """ from dataclasses import dataclass +from typing import Any import structlog @@ -18,7 +19,7 @@ PR_ACTIONS_PROCESS = frozenset({"opened", "synchronize", "reopened"}) -@dataclass +@dataclass(frozen=True) class FilterResult: """Result of event filter check.""" @@ -47,7 +48,7 @@ def should_process_event(event: WebhookEvent) -> FilterResult: return result -def _apply_filters(event_type: EventType, payload: dict) -> FilterResult: +def _apply_filters(event_type: EventType, payload: dict[str, Any]) -> FilterResult: if _is_repo_archived(payload): return FilterResult(should_process=False, reason="Repository is archived") @@ -58,7 +59,7 @@ def _apply_filters(event_type: EventType, payload: dict) -> FilterResult: return FilterResult(should_process=True) -def _filter_pull_request(payload: dict) -> FilterResult: +def _filter_pull_request(payload: dict[str, Any]) -> FilterResult: action = payload.get("action") if action not in PR_ACTIONS_PROCESS: return FilterResult(should_process=False, reason=f"PR action '{action}' not processed") @@ -77,7 +78,7 @@ def _filter_pull_request(payload: dict) -> FilterResult: return FilterResult(should_process=True) -def _filter_push(payload: dict) -> FilterResult: +def _filter_push(payload: dict[str, Any]) -> FilterResult: if payload.get("deleted"): return FilterResult(should_process=False, reason="Branch deletion event") @@ -88,6 +89,6 @@ def _filter_push(payload: dict) -> FilterResult: return FilterResult(should_process=True) -def _is_repo_archived(payload: dict) -> bool: +def _is_repo_archived(payload: dict[str, Any]) -> bool: repo = payload.get("repository", {}) return isinstance(repo, dict) and bool(repo.get("archived")) diff --git a/src/event_processors/push.py b/src/event_processors/push.py index 1bb5e3e..a720741 100644 --- a/src/event_processors/push.py +++ b/src/event_processors/push.py @@ -4,6 +4,7 @@ from src.agents import get_agent from src.core.models import Severity, Violation +from src.core.utils.event_filter import NULL_SHA from src.event_processors.base import BaseEventProcessor, ProcessingResult from src.integrations.github.check_runs import CheckRunManager from src.tasks.task_queue import Task @@ -37,11 +38,7 @@ async def process(self, task: Task) -> ProcessingResult: logger.info(f" Commits: {len(commits)}") logger.info("=" * 80) - if ( - payload.get("deleted") - or not payload.get("after") - or payload.get("after") == "0000000000000000000000000000000000000000" - ): + if payload.get("deleted") or not payload.get("after") or payload.get("after") == NULL_SHA: logger.info("push_skipped_deleted_or_empty") return ProcessingResult( success=True,