Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions src/core/utils/event_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""
Event filtering for GitHub webhooks.

Centralized logic to skip rule evaluation on irrelevant events:
branch deletions, closed/merged PRs, archived repos, etc.
"""

from dataclasses import dataclass
from typing import Any

import structlog

from src.core.models import EventType, WebhookEvent

logger = structlog.get_logger()

NULL_SHA = "0000000000000000000000000000000000000000"

PR_ACTIONS_PROCESS = frozenset({"opened", "synchronize", "reopened"})


@dataclass(frozen=True)
class FilterResult:
"""Result of event filter check."""

should_process: bool
reason: str = ""


def should_process_event(event: WebhookEvent) -> FilterResult:
"""
Determine if an event should trigger rule evaluation.

Returns FilterResult with should_process=True to process, False to skip.
Logs filtered events for observability.
"""
payload = event.payload
event_type = event.event_type

result = _apply_filters(event_type, payload)
if not result.should_process:
logger.info(
"event_filtered",
event_type=event_type.value if hasattr(event_type, "value") else str(event_type),
repo=event.repo_full_name,
reason=result.reason,
)
Comment on lines +42 to +47
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Structured log at filter boundary is missing required fields per guidelines

The coding guidelines require structured log entries at boundaries to carry operation, subject_ids, decision, and latency_ms. The current log omits all four.

♻️ Proposed fix
-        logger.info(
-            "event_filtered",
-            event_type=event_type.value if hasattr(event_type, "value") else str(event_type),
-            repo=event.repo_full_name,
-            reason=result.reason,
-        )
+        logger.info(
+            "event_filtered",
+            operation="should_process_event",
+            subject_ids={"repo": event.repo_full_name, "delivery_id": event.delivery_id},
+            decision="skip",
+            event_type=event_type.value,
+            reason=result.reason,
+        )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/core/utils/event_filter.py` around lines 42 - 47, The structured log call
for "event_filtered" is missing required fields; update the logger.info
invocation (the one using event_type, event.repo_full_name and result.reason in
event_filter.py) to include operation (e.g., "event_filter" or event_type),
subject_ids (pull from event.subject_ids or event.get_subject_ids()), decision
(use result.decision or result.allowed/blocked), and latency_ms (capture a start
timestamp before filtering and compute int((time.time() - start_ts) * 1000)
after; import time if needed); keep the existing repo and reason fields and add
these four keys to the structured payload.

return result


def _apply_filters(event_type: EventType, payload: dict[str, Any]) -> FilterResult:
if _is_repo_archived(payload):
return FilterResult(should_process=False, reason="Repository is archived")

if event_type == EventType.PULL_REQUEST:
return _filter_pull_request(payload)
if event_type == EventType.PUSH:
return _filter_push(payload)
return FilterResult(should_process=True)


def _filter_pull_request(payload: dict[str, Any]) -> FilterResult:
action = payload.get("action")
if action not in PR_ACTIONS_PROCESS:
return FilterResult(should_process=False, reason=f"PR action '{action}' not processed")

pr = payload.get("pull_request", {})
state = pr.get("state", "")
if state != "open":
return FilterResult(should_process=False, reason=f"PR state '{state}' not open")

if pr.get("merged"):
return FilterResult(should_process=False, reason="PR already merged")

if pr.get("draft"):
return FilterResult(should_process=False, reason="PR is draft")

return FilterResult(should_process=True)


def _filter_push(payload: dict[str, Any]) -> FilterResult:
if payload.get("deleted"):
return FilterResult(should_process=False, reason="Branch deletion event")

after = payload.get("after")
if not after or after == NULL_SHA:
return FilterResult(should_process=False, reason="No valid commit SHA (deleted or empty push)")

return FilterResult(should_process=True)


def _is_repo_archived(payload: dict[str, Any]) -> bool:
repo = payload.get("repository", {})
return isinstance(repo, dict) and bool(repo.get("archived"))
14 changes: 14 additions & 0 deletions src/event_processors/pull_request/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,20 @@ async def process(self, task: Task) -> ProcessingResult:
error="No installation ID found",
)

if pr_data.get("state") == "closed" or pr_data.get("merged") or pr_data.get("draft"):
logger.info(
"pr_skipped_invalid_state",
state=pr_data.get("state"),
merged=pr_data.get("merged"),
draft=pr_data.get("draft"),
)
return ProcessingResult(
success=True,
violations=[],
api_calls_made=0,
processing_time_ms=int((time.time() - start_time) * 1000),
)

try:
logger.info("=" * 80)
logger.info(f"🚀 Processing PR event for {repo_full_name}")
Expand Down
10 changes: 10 additions & 0 deletions src/event_processors/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from src.agents import get_agent
from src.core.models import Severity, Violation
from src.core.utils.event_filter import NULL_SHA
from src.event_processors.base import BaseEventProcessor, ProcessingResult
from src.integrations.github.check_runs import CheckRunManager
from src.tasks.task_queue import Task
Expand Down Expand Up @@ -37,6 +38,15 @@ async def process(self, task: Task) -> ProcessingResult:
logger.info(f" Commits: {len(commits)}")
logger.info("=" * 80)

if payload.get("deleted") or not payload.get("after") or payload.get("after") == NULL_SHA:
logger.info("push_skipped_deleted_or_empty")
return ProcessingResult(
success=True,
violations=[],
api_calls_made=0,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DRY / Constant: Please import NULL_SHA from src.core.utils.event_filter instead of hardcoding the "0000..." string here. This ensures consistency if we ever need to update how null SHAs are handled.

processing_time_ms=int((time.time() - start_time) * 1000),
)
Comment on lines +41 to +48
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Unstructured log for the new skip path; missing required guideline fields

logger.info("push_skipped_deleted_or_empty") is a bare string with no structured fields. Guidelines require operation, subject_ids, decision, and latency_ms at processing boundaries.

♻️ Add structured fields
-            logger.info("push_skipped_deleted_or_empty")
+            logger.info(
+                "push_skipped_deleted_or_empty",
+                operation="process_push",
+                subject_ids={"repo": task.repo_full_name, "ref": ref},
+                decision="skip",
+                latency_ms=int((time.time() - start_time) * 1000),
+            )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/event_processors/push.py` around lines 41 - 48, The info log emitted when
skipping deleted/empty pushes is unstructured; update the logger.info call
inside the push skip branch (the block guarded by payload.get("deleted") or not
payload.get("after") or payload.get("after") == NULL_SHA) to emit structured
fields including operation (e.g., "push.process"), subject_ids (derive from
payload — e.g., payload.get("repository", {}).get("id") or
payload.get("ref")/commit id), decision ("skipped_deleted_or_empty"), and
latency_ms (use the computed int((time.time() - start_time) * 1000) value
already used for ProcessingResult.processing_time_ms). Keep the ProcessingResult
return as-is but ensure the log fields match those names exactly so callers can
correlate logs and the returned processing_time_ms.


event_data = {
"push": {
"ref": ref,
Expand Down
5 changes: 5 additions & 0 deletions src/webhooks/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import structlog

from src.core.models import EventType, WebhookEvent
from src.core.utils.event_filter import should_process_event
from src.tasks.task_queue import TaskQueue, task_queue

logger = structlog.get_logger()
Expand Down Expand Up @@ -41,6 +42,10 @@ async def dispatch(self, event: WebhookEvent) -> dict[str, Any]:
log.warning("handler_not_found")
return {"status": "skipped", "reason": f"No handler for event type {event_type}"}

filter_result = should_process_event(event)
if not filter_result.should_process:
return {"status": "filtered", "reason": filter_result.reason, "event_type": event_type}

# Offload to TaskQueue for background execution (delivery_id so each webhook delivery is processed)
success = await self.queue.enqueue(handler, event_type, event.payload, event, delivery_id=event.delivery_id)

Expand Down
9 changes: 0 additions & 9 deletions src/webhooks/handlers/pull_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,6 @@ async def handle(self, event: WebhookEvent) -> WebhookResponse:
pr_number=event.payload.get("pull_request", {}).get("number"),
action=event.payload.get("action"),
)

# Filter relevant actions to reduce noise (optional but good practice)
action = event.payload.get("action")
if action not in ["opened", "synchronize", "reopened", "edited"]:
log.info("pr_action_ignored", action=action)
return WebhookResponse(
status="ignored", detail=f"PR action '{action}' is not processed", event_type=EventType.PULL_REQUEST
)

log.info("pr_handler_invoked")

try:
Expand Down
58 changes: 55 additions & 3 deletions tests/integration/webhooks/test_webhook_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def fresh_queue() -> TaskQueue:

@pytest.fixture
def valid_pr_payload() -> dict[str, object]:
"""Valid pull request webhook payload."""
"""Valid pull request webhook payload (open PR, passes event filter)."""
return {
"action": "opened",
"sender": {"login": "octocat", "id": 1, "type": "User"},
Expand All @@ -47,7 +47,14 @@ def valid_pr_payload() -> dict[str, object]:
"private": False,
"html_url": "https://github.com/octocat/watchflow",
},
"pull_request": {"number": 42, "title": "Test PR", "body": "Test body"},
"pull_request": {
"number": 42,
"title": "Test PR",
"body": "Test body",
"state": "open",
"merged": False,
"draft": False,
},
}


Expand Down Expand Up @@ -157,7 +164,9 @@ async def test_multiple_event_types_flow(
"html_url": "https://github.com/octocat/watchflow",
},
"ref": "refs/heads/main",
"commits": [],
"deleted": False,
"after": "abc123def456",
"commits": [{"id": "abc123def456"}],
}

with patch("src.webhooks.router.dispatcher", fresh_dispatcher):
Expand Down Expand Up @@ -223,6 +232,49 @@ async def test_handler_exception_doesnt_break_flow(
# Handler was called and exception was caught
assert failing_handler.called

@pytest.mark.asyncio
@respx.mock
async def test_filtered_event_not_dispatched(
self,
app: FastAPI,
fresh_dispatcher: WebhookDispatcher,
fresh_queue: TaskQueue,
valid_headers: dict[str, str],
) -> None:
"""Test that filtered events (e.g. branch deletion) are not dispatched."""
mock_handler = AsyncMock()
fresh_dispatcher.register_handler("push", mock_handler)
await fresh_queue.start_workers()

deleted_branch_payload = {
"sender": {"login": "octocat", "id": 1, "type": "User"},
"repository": {
"id": 123456,
"name": "watchflow",
"full_name": "octocat/watchflow",
"private": False,
"html_url": "https://github.com/octocat/watchflow",
},
"ref": "refs/heads/feature",
"deleted": True,
"after": "0000000000000000000000000000000000000000",
"commits": [],
}

with patch("src.webhooks.router.dispatcher", fresh_dispatcher):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
response = await client.post(
"/webhooks/github",
json=deleted_branch_payload,
headers={**valid_headers, "X-GitHub-Event": "push"},
)

assert response.status_code == 200
await asyncio.sleep(0.1)
await fresh_queue.queue.join()

assert mock_handler.call_count == 0

@pytest.mark.asyncio
@respx.mock
async def test_no_handler_registered_flow(
Expand Down
133 changes: 133 additions & 0 deletions tests/unit/core/test_event_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
from src.core.models import EventType, WebhookEvent
from src.core.utils.event_filter import should_process_event


def _make_event(event_type: EventType, payload: dict) -> WebhookEvent:
return WebhookEvent(event_type=event_type, payload=payload)


def test_pull_request_opened_processes():
payload = {
"action": "opened",
"repository": {"full_name": "owner/repo"},
"pull_request": {"state": "open", "merged": False, "draft": False},
}
result = should_process_event(_make_event(EventType.PULL_REQUEST, payload))
assert result.should_process is True


def test_pull_request_synchronize_processes():
payload = {
"action": "synchronize",
"repository": {"full_name": "owner/repo"},
"pull_request": {"state": "open", "merged": False, "draft": False},
}
result = should_process_event(_make_event(EventType.PULL_REQUEST, payload))
assert result.should_process is True


def test_pull_request_reopened_processes():
payload = {
"action": "reopened",
"repository": {"full_name": "owner/repo"},
"pull_request": {"state": "open", "merged": False, "draft": False},
}
result = should_process_event(_make_event(EventType.PULL_REQUEST, payload))
assert result.should_process is True


def test_pull_request_closed_action_filtered():
payload = {
"action": "closed",
"repository": {"full_name": "owner/repo"},
"pull_request": {"state": "closed", "merged": True},
}
result = should_process_event(_make_event(EventType.PULL_REQUEST, payload))
assert result.should_process is False
assert "closed" in result.reason or "not processed" in result.reason


def test_pull_request_merged_filtered():
payload = {
"action": "opened",
"repository": {"full_name": "owner/repo"},
"pull_request": {"state": "closed", "merged": True, "draft": False},
}
result = should_process_event(_make_event(EventType.PULL_REQUEST, payload))
assert result.should_process is False
assert "merged" in result.reason or "not open" in result.reason


def test_pull_request_draft_filtered():
payload = {
"action": "opened",
"repository": {"full_name": "owner/repo"},
"pull_request": {"state": "open", "merged": False, "draft": True},
}
result = should_process_event(_make_event(EventType.PULL_REQUEST, payload))
assert result.should_process is False
assert "draft" in result.reason


def test_push_valid_processes():
payload = {
"repository": {"full_name": "owner/repo"},
"ref": "refs/heads/main",
"deleted": False,
"after": "abc123",
"commits": [{}],
}
result = should_process_event(_make_event(EventType.PUSH, payload))
assert result.should_process is True


def test_push_deleted_branch_filtered():
payload = {
"repository": {"full_name": "owner/repo"},
"ref": "refs/heads/feature",
"deleted": True,
"after": "0000000000000000000000000000000000000000",
}
result = should_process_event(_make_event(EventType.PUSH, payload))
assert result.should_process is False
assert "deletion" in result.reason or "Branch" in result.reason


def test_push_null_sha_filtered():
payload = {
"repository": {"full_name": "owner/repo"},
"ref": "refs/heads/main",
"deleted": False,
"after": "0000000000000000000000000000000000000000",
}
result = should_process_event(_make_event(EventType.PUSH, payload))
assert result.should_process is False


def test_push_empty_after_filtered():
payload = {
"repository": {"full_name": "owner/repo"},
"ref": "refs/heads/main",
"deleted": False,
"after": "",
}
result = should_process_event(_make_event(EventType.PUSH, payload))
assert result.should_process is False


def test_archived_repo_filtered():
payload = {
"repository": {"full_name": "owner/repo", "archived": True},
"action": "opened",
"pull_request": {"state": "open", "merged": False, "draft": False},
}
result = should_process_event(_make_event(EventType.PULL_REQUEST, payload))
assert result.should_process is False
assert "archived" in result.reason


def test_other_event_types_process():
payload = {"repository": {"full_name": "owner/repo"}}
for evt in (EventType.CHECK_RUN, EventType.DEPLOYMENT, EventType.DEPLOYMENT_STATUS):
result = should_process_event(_make_event(evt, payload))
assert result.should_process is True
Loading