From 722bf3e227f84f534fe7c1e5146df7aba305a146 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Tue, 13 Jan 2026 14:45:38 +0000 Subject: [PATCH 1/5] Add basic tracing middleware and global control --- src/workflows/services/common_service.py | 32 ++++++++++++++ .../transport/middleware/otel_tracing.py | 42 +++++++++++++++++++ 2 files changed, 74 insertions(+) create mode 100644 src/workflows/transport/middleware/otel_tracing.py diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index de2ef70..d79f7fb 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -12,6 +12,13 @@ import workflows import workflows.logging +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware +from opentelemetry.sdk.resources import Resource, SERVICE_NAME + class Status(enum.Enum): """ @@ -185,6 +192,31 @@ def start_transport(self): self.transport.subscription_callback_set_intercept( self._transport_interceptor ) + + # Configure OTELTracing + resource = Resource.create({ + SERVICE_NAME: self._service_name, + }) + + self.log.debug("Configuring OTELTracing") + provider = TracerProvider(resource=resource) + trace.set_tracer_provider(provider) + + # Configure BatchProcessor and OTLPSpanExporter to point to OTELCollector + otlp_exporter = OTLPSpanExporter( + endpoint="https://otel.tracing.diamond.ac.uk:4318/v1/traces", + timeout=10 + ) + span_processor = BatchSpanProcessor(otlp_exporter) + provider.add_span_processor(span_processor) + + # Add OTELTracingMiddleware to the transport layer + tracer = trace.get_tracer(__name__) + otel_middleware = OTELTracingMiddleware(tracer, service_name=self._service_name) + self._transport.add_middleware(otel_middleware) + + self.log.debug("OTELTracingMiddleware added to transport layer of %s", self._service_name) + metrics = self._environment.get("metrics") if metrics: import prometheus_client diff --git a/src/workflows/transport/middleware/otel_tracing.py b/src/workflows/transport/middleware/otel_tracing.py new file mode 100644 index 0000000..af0a1a1 --- /dev/null +++ b/src/workflows/transport/middleware/otel_tracing.py @@ -0,0 +1,42 @@ +from opentelemetry import trace +from workflows.transport.middleware import BaseTransportMiddleware +from collections.abc import Callable +import functools +from opentelemetry.propagate import inject + +class OTELTracingMiddleware(BaseTransportMiddleware): + def __init__(self, tracer: trace.Tracer, service_name: str): + """ + Initialize the OpenTelemetry Tracing Middleware. + + :param tracer: An OpenTelemetry tracer instance used to create spans. + """ + self.tracer = tracer + self.service_name = service_name + + + def send(self, call_next: Callable, destination, message, **kwargs): + """ + Middleware for tracing the `send` operation + + :param call_next: The next middleware or the original `send` method. + :param destination: The destination service to which the message is being sent. + :param message: The message being sent. + :param kwargs: Additional arguments for the `send` method. + """ + + # Start a new span for the `send` operation + with self.tracer.start_as_current_span("transport.send") as span: + # Attributes we're interested in + span.set_attribute("service_name", self.service_name) + span.set_attribute("destination", destination) + span.set_attribute("message", str(message)) + + # Inject trace context into message headers + headers = kwargs.setdefault("headers", {}) + inject(headers) + kwargs["headers"] = headers + + # Call the next middleware or the original `send` method + return call_next(destination, message, **kwargs) + From 52cb04d756370e00294ff3ad17f38020191b374a Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 13:05:03 +0000 Subject: [PATCH 2/5] Instrument on subscribe and add dcid to span attributes --- src/workflows/recipe/__init__.py | 34 ++++++++++++++ .../transport/middleware/otel_tracing.py | 46 ++++++++----------- 2 files changed, 54 insertions(+), 26 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 0f1973f..5834bfe 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -3,6 +3,7 @@ import functools import logging from collections.abc import Callable +from opentelemetry import trace from typing import Any from workflows.recipe.recipe import Recipe @@ -69,6 +70,39 @@ def unwrap_recipe(header, message): message = mangle_for_receiving(message) if header.get("workflows-recipe") in {True, "True", "true", 1}: rw = RecipeWrapper(message=message, transport=transport_layer) + print(rw) + logger.log(1, rw) + + # Extract and set DCID on the current span + span = trace.get_current_span() + dcid = None + + # Try multiple locations where DCID might be stored + top_level_params = {} + if isinstance(message, dict): + # Direct parameters (top-level or in recipe) + top_level_params = message.get("parameters", {}) + + # Payload parameters (most common location) + payload = message.get("payload", {}) + payload_params = {} + if isinstance(payload, dict): + payload_params = payload.get("parameters", {}) + + # Try all common locations + dcid = ( + top_level_params.get("ispyb_dcid") or + top_level_params.get("dcid") or + payload_params.get("ispyb_dcid") or + payload_params.get("dcid") or + payload.get("ispyb_dcid") or + payload.get("dcid") + ) + + if dcid: + span.set_attribute("dcid", dcid) + span.add_event("recipe.dcid_extracted", attributes={"dcid": dcid}) + if log_extender and rw.environment and rw.environment.get("ID"): with log_extender("recipe_ID", rw.environment["ID"]): return callback(rw, header, message.get("payload")) diff --git a/src/workflows/transport/middleware/otel_tracing.py b/src/workflows/transport/middleware/otel_tracing.py index af0a1a1..27e89db 100644 --- a/src/workflows/transport/middleware/otel_tracing.py +++ b/src/workflows/transport/middleware/otel_tracing.py @@ -2,7 +2,7 @@ from workflows.transport.middleware import BaseTransportMiddleware from collections.abc import Callable import functools -from opentelemetry.propagate import inject +from opentelemetry.propagate import inject, extract class OTELTracingMiddleware(BaseTransportMiddleware): def __init__(self, tracer: trace.Tracer, service_name: str): @@ -14,29 +14,23 @@ def __init__(self, tracer: trace.Tracer, service_name: str): self.tracer = tracer self.service_name = service_name - - def send(self, call_next: Callable, destination, message, **kwargs): - """ - Middleware for tracing the `send` operation - - :param call_next: The next middleware or the original `send` method. - :param destination: The destination service to which the message is being sent. - :param message: The message being sent. - :param kwargs: Additional arguments for the `send` method. - """ - - # Start a new span for the `send` operation - with self.tracer.start_as_current_span("transport.send") as span: - # Attributes we're interested in - span.set_attribute("service_name", self.service_name) - span.set_attribute("destination", destination) - span.set_attribute("message", str(message)) + def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int: + @functools.wraps(callback) + def wrapped_callback(header, message): + # Extract trace context from message headers + ctx = extract(header) if header else None - # Inject trace context into message headers - headers = kwargs.setdefault("headers", {}) - inject(headers) - kwargs["headers"] = headers - - # Call the next middleware or the original `send` method - return call_next(destination, message, **kwargs) - + # Start a new span with the extracted context + with self.tracer.start_as_current_span( + "transport.subscribe", + context=ctx + ) as span: + span.set_attribute("service_name", self.service_name) + span.set_attribute("channel", channel) + + + # Call the original callback + return callback(header, message) + + # Call the next middleware with the wrapped callback + return call_next(channel, wrapped_callback, **kwargs) \ No newline at end of file From cc9ee124f06ae3c98f556f9aafbbc35c81430f7c Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 13:10:45 +0000 Subject: [PATCH 3/5] Add spanid and traceid metadata to greylog --- src/workflows/recipe/__init__.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 5834bfe..abd5854 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -102,6 +102,20 @@ def unwrap_recipe(header, message): if dcid: span.set_attribute("dcid", dcid) span.add_event("recipe.dcid_extracted", attributes={"dcid": dcid}) + + # Extract span_id and trace_id for logging + span_context = span.get_span_context() + if span_context.is_valid: + span_id = format(span_context.span_id, '016x') + trace_id = format(span_context.trace_id, '032x') + + logger.info( + "Processing recipe message", + extra={ + "span_id": span_id, + "trace_id": trace_id, + } + ) if log_extender and rw.environment and rw.environment.get("ID"): with log_extender("recipe_ID", rw.environment["ID"]): From f7cc6589b8af00e60544fae88e8919742b4e8499 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 13:29:04 +0000 Subject: [PATCH 4/5] Add recipe_id to spans --- src/workflows/recipe/__init__.py | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index abd5854..653ab61 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -70,13 +70,19 @@ def unwrap_recipe(header, message): message = mangle_for_receiving(message) if header.get("workflows-recipe") in {True, "True", "true", 1}: rw = RecipeWrapper(message=message, transport=transport_layer) - print(rw) - logger.log(1, rw) + logger.debug("RecipeWrapper created: %s", rw) - # Extract and set DCID on the current span + # Extract and set DCID and recipe_id on the current span span = trace.get_current_span() dcid = None + recipe_id = None + # Extract recipe ID from environment + if isinstance(message, dict): + environment = message.get("environment", {}) + if isinstance(environment, dict): + recipe_id = environment.get("ID") + # Try multiple locations where DCID might be stored top_level_params = {} if isinstance(message, dict): @@ -103,18 +109,28 @@ def unwrap_recipe(header, message): span.set_attribute("dcid", dcid) span.add_event("recipe.dcid_extracted", attributes={"dcid": dcid}) + if recipe_id: + span.set_attribute("recipe_id", recipe_id) + span.add_event("recipe.id_extracted", attributes={"recipe_id": recipe_id}) + # Extract span_id and trace_id for logging span_context = span.get_span_context() - if span_context.is_valid: + if span_context and span_context.is_valid: span_id = format(span_context.span_id, '016x') trace_id = format(span_context.trace_id, '032x') + log_extra = { + "span_id": span_id, + "trace_id": trace_id, + } + if dcid: + log_extra["dcid"] = dcid + if recipe_id: + log_extra["recipe_id"] = recipe_id + logger.info( "Processing recipe message", - extra={ - "span_id": span_id, - "trace_id": trace_id, - } + extra=log_extra ) if log_extender and rw.environment and rw.environment.get("ID"): From 42a7cb9dd946eb27f90139d43d4e48533fcbb645 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 26 Jan 2026 13:38:42 +0000 Subject: [PATCH 5/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/workflows/recipe/__init__.py | 36 +++++++++---------- src/workflows/services/common_service.py | 32 ++++++++++------- .../transport/middleware/otel_tracing.py | 23 ++++++------ 3 files changed, 50 insertions(+), 41 deletions(-) diff --git a/src/workflows/recipe/__init__.py b/src/workflows/recipe/__init__.py index 653ab61..828f583 100644 --- a/src/workflows/recipe/__init__.py +++ b/src/workflows/recipe/__init__.py @@ -3,9 +3,10 @@ import functools import logging from collections.abc import Callable -from opentelemetry import trace from typing import Any +from opentelemetry import trace + from workflows.recipe.recipe import Recipe from workflows.recipe.validate import validate_recipe from workflows.recipe.wrapper import RecipeWrapper @@ -82,27 +83,27 @@ def unwrap_recipe(header, message): environment = message.get("environment", {}) if isinstance(environment, dict): recipe_id = environment.get("ID") - + # Try multiple locations where DCID might be stored top_level_params = {} if isinstance(message, dict): # Direct parameters (top-level or in recipe) top_level_params = message.get("parameters", {}) - + # Payload parameters (most common location) payload = message.get("payload", {}) payload_params = {} if isinstance(payload, dict): payload_params = payload.get("parameters", {}) - + # Try all common locations dcid = ( - top_level_params.get("ispyb_dcid") or - top_level_params.get("dcid") or - payload_params.get("ispyb_dcid") or - payload_params.get("dcid") or - payload.get("ispyb_dcid") or - payload.get("dcid") + top_level_params.get("ispyb_dcid") + or top_level_params.get("dcid") + or payload_params.get("ispyb_dcid") + or payload_params.get("dcid") + or payload.get("ispyb_dcid") + or payload.get("dcid") ) if dcid: @@ -111,13 +112,15 @@ def unwrap_recipe(header, message): if recipe_id: span.set_attribute("recipe_id", recipe_id) - span.add_event("recipe.id_extracted", attributes={"recipe_id": recipe_id}) + span.add_event( + "recipe.id_extracted", attributes={"recipe_id": recipe_id} + ) # Extract span_id and trace_id for logging span_context = span.get_span_context() if span_context and span_context.is_valid: - span_id = format(span_context.span_id, '016x') - trace_id = format(span_context.trace_id, '032x') + span_id = format(span_context.span_id, "016x") + trace_id = format(span_context.trace_id, "032x") log_extra = { "span_id": span_id, @@ -128,11 +131,8 @@ def unwrap_recipe(header, message): if recipe_id: log_extra["recipe_id"] = recipe_id - logger.info( - "Processing recipe message", - extra=log_extra - ) - + logger.info("Processing recipe message", extra=log_extra) + if log_extender and rw.environment and rw.environment.get("ID"): with log_extender("recipe_ID", rw.environment["ID"]): return callback(rw, header, message.get("payload")) diff --git a/src/workflows/services/common_service.py b/src/workflows/services/common_service.py index d79f7fb..5aa8ee6 100644 --- a/src/workflows/services/common_service.py +++ b/src/workflows/services/common_service.py @@ -9,15 +9,15 @@ import time from typing import Any -import workflows -import workflows.logging - from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + +import workflows +import workflows.logging from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware -from opentelemetry.sdk.resources import Resource, SERVICE_NAME class Status(enum.Enum): @@ -192,11 +192,13 @@ def start_transport(self): self.transport.subscription_callback_set_intercept( self._transport_interceptor ) - + # Configure OTELTracing - resource = Resource.create({ - SERVICE_NAME: self._service_name, - }) + resource = Resource.create( + { + SERVICE_NAME: self._service_name, + } + ) self.log.debug("Configuring OTELTracing") provider = TracerProvider(resource=resource) @@ -204,18 +206,22 @@ def start_transport(self): # Configure BatchProcessor and OTLPSpanExporter to point to OTELCollector otlp_exporter = OTLPSpanExporter( - endpoint="https://otel.tracing.diamond.ac.uk:4318/v1/traces", - timeout=10 + endpoint="https://otel.tracing.diamond.ac.uk:4318/v1/traces", timeout=10 ) span_processor = BatchSpanProcessor(otlp_exporter) provider.add_span_processor(span_processor) # Add OTELTracingMiddleware to the transport layer tracer = trace.get_tracer(__name__) - otel_middleware = OTELTracingMiddleware(tracer, service_name=self._service_name) + otel_middleware = OTELTracingMiddleware( + tracer, service_name=self._service_name + ) self._transport.add_middleware(otel_middleware) - self.log.debug("OTELTracingMiddleware added to transport layer of %s", self._service_name) + self.log.debug( + "OTELTracingMiddleware added to transport layer of %s", + self._service_name, + ) metrics = self._environment.get("metrics") if metrics: diff --git a/src/workflows/transport/middleware/otel_tracing.py b/src/workflows/transport/middleware/otel_tracing.py index 27e89db..453ff6c 100644 --- a/src/workflows/transport/middleware/otel_tracing.py +++ b/src/workflows/transport/middleware/otel_tracing.py @@ -1,8 +1,13 @@ +from __future__ import annotations + +import functools +from collections.abc import Callable + from opentelemetry import trace +from opentelemetry.propagate import extract + from workflows.transport.middleware import BaseTransportMiddleware -from collections.abc import Callable -import functools -from opentelemetry.propagate import inject, extract + class OTELTracingMiddleware(BaseTransportMiddleware): def __init__(self, tracer: trace.Tracer, service_name: str): @@ -19,18 +24,16 @@ def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int: def wrapped_callback(header, message): # Extract trace context from message headers ctx = extract(header) if header else None - + # Start a new span with the extracted context with self.tracer.start_as_current_span( - "transport.subscribe", - context=ctx + "transport.subscribe", context=ctx ) as span: span.set_attribute("service_name", self.service_name) span.set_attribute("channel", channel) - - + # Call the original callback return callback(header, message) - + # Call the next middleware with the wrapped callback - return call_next(channel, wrapped_callback, **kwargs) \ No newline at end of file + return call_next(channel, wrapped_callback, **kwargs)