From ee97e334ce3c98f65c9f47d1fc62f55799c46b78 Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 25 Feb 2026 15:00:25 -0800 Subject: [PATCH 1/2] Add sample for cancellation --- nexus_cancel/README.md | 50 ++++++++++++++++++ nexus_cancel/__init__.py | 0 nexus_cancel/caller/__init__.py | 0 nexus_cancel/caller/app.py | 43 +++++++++++++++ nexus_cancel/caller/workflows.py | 69 +++++++++++++++++++++++++ nexus_cancel/handler/__init__.py | 0 nexus_cancel/handler/service_handler.py | 27 ++++++++++ nexus_cancel/handler/worker.py | 48 +++++++++++++++++ nexus_cancel/handler/workflows.py | 49 ++++++++++++++++++ nexus_cancel/service.py | 35 +++++++++++++ 10 files changed, 321 insertions(+) create mode 100644 nexus_cancel/README.md create mode 100644 nexus_cancel/__init__.py create mode 100644 nexus_cancel/caller/__init__.py create mode 100644 nexus_cancel/caller/app.py create mode 100644 nexus_cancel/caller/workflows.py create mode 100644 nexus_cancel/handler/__init__.py create mode 100644 nexus_cancel/handler/service_handler.py create mode 100644 nexus_cancel/handler/worker.py create mode 100644 nexus_cancel/handler/workflows.py create mode 100644 nexus_cancel/service.py diff --git a/nexus_cancel/README.md b/nexus_cancel/README.md new file mode 100644 index 00000000..2f7f5703 --- /dev/null +++ b/nexus_cancel/README.md @@ -0,0 +1,50 @@ +# Nexus Cancellation + +This sample shows how a caller workflow can fan out multiple Nexus operations concurrently, take the first result, and cancel the rest using `WAIT_REQUESTED` cancellation semantics. + +With `WAIT_REQUESTED`, the caller proceeds once the handler has received the cancel request β€” it does not wait for the handler to finish processing the cancellation. + +Start a Temporal server. (See the main samples repo [README](../README.md)). + +Run the following: + +``` +temporal operator namespace create --namespace nexus-cancel-handler-namespace +temporal operator namespace create --namespace nexus-cancel-caller-namespace + +temporal operator nexus endpoint create \ + --name nexus-cancel-endpoint \ + --target-namespace nexus-cancel-handler-namespace \ + --target-task-queue nexus-cancel-handler-task-queue +``` + +Next, in separate terminal windows: + +## Nexus Handler Worker + +```bash +uv run nexus_cancel/handler/worker.py +``` + +## Nexus Caller App + +```bash +uv run nexus_cancel/caller/app.py +``` + +## Expected Output + +On the caller side, you should see a greeting in whichever language completed first: +``` +Hello Nexus πŸ‘‹ +``` + +On the handler side, you should see cancellation log messages for the remaining operations: +``` +HelloHandlerWorkflow was cancelled successfully. +HelloHandlerWorkflow was cancelled successfully. +HelloHandlerWorkflow was cancelled successfully. +HelloHandlerWorkflow was cancelled successfully. +``` + +The caller workflow returns before all handler workflows have completed their cancellation cleanup. This demonstrates `WAIT_REQUESTED` semantics: the caller didn't wait for the handler workflows to finish, but still guaranteed that all handlers received the cancellation request. diff --git a/nexus_cancel/__init__.py b/nexus_cancel/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_cancel/caller/__init__.py b/nexus_cancel/caller/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_cancel/caller/app.py b/nexus_cancel/caller/app.py new file mode 100644 index 00000000..bb74e9e0 --- /dev/null +++ b/nexus_cancel/caller/app.py @@ -0,0 +1,43 @@ +import asyncio +import uuid +from typing import Optional + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig +from temporalio.worker import Worker + +from nexus_cancel.caller.workflows import HelloCallerWorkflow + +NAMESPACE = "nexus-cancel-caller-namespace" +TASK_QUEUE = "nexus-cancel-caller-task-queue" + + +async def execute_caller_workflow( + client: Optional[Client] = None, +) -> str: + if client is None: + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + config.setdefault("namespace", NAMESPACE) + client = await Client.connect(**config) + + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[HelloCallerWorkflow], + ): + return await client.execute_workflow( + HelloCallerWorkflow.run, + "Nexus", + id=f"hello-caller-{uuid.uuid4()}", + task_queue=TASK_QUEUE, + ) + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + result = loop.run_until_complete(execute_caller_workflow()) + print(result) + except KeyboardInterrupt: + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/nexus_cancel/caller/workflows.py b/nexus_cancel/caller/workflows.py new file mode 100644 index 00000000..0bb1931c --- /dev/null +++ b/nexus_cancel/caller/workflows.py @@ -0,0 +1,69 @@ +""" +Caller workflow that demonstrates Nexus operation cancellation. + +Fans out 5 concurrent Nexus hello operations (one per language), takes the first +result, and cancels the rest using WAIT_REQUESTED cancellation semantics. +""" + +import asyncio +from datetime import timedelta + +from temporalio import workflow +from temporalio.exceptions import CancelledError, NexusOperationError + +with workflow.unsafe.imports_passed_through(): + from nexus_cancel.service import HelloInput, Language, NexusService + +NEXUS_ENDPOINT = "nexus-cancel-endpoint" + + +@workflow.defn +class HelloCallerWorkflow: + def __init__(self) -> None: + self.nexus_client = workflow.create_nexus_client( + service=NexusService, + endpoint=NEXUS_ENDPOINT, + ) + + @workflow.run + async def run(self, message: str) -> str: + # Fan out 5 concurrent Nexus calls, one per language. + # Each task starts and awaits its own operation so all race concurrently. + async def run_operation(language: Language): + handle = await self.nexus_client.start_operation( + NexusService.hello, + HelloInput(name=message, language=language), + schedule_to_close_timeout=timedelta(seconds=10), + cancellation_type=workflow.NexusOperationCancellationType.WAIT_REQUESTED, + ) + return await handle + + tasks = [asyncio.create_task(run_operation(lang)) for lang in Language] + + # Wait for the first operation to complete + workflow.logger.info( + f"Started {len(tasks)} operations, waiting for first to complete..." + ) + done, pending = await workflow.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + + # Get the result from the first completed operation + result = await done.pop() + workflow.logger.info(f"First operation completed with: {result.message}") + + # Cancel all remaining operations + workflow.logger.info(f"Cancelling {len(pending)} remaining operations...") + for task in pending: + task.cancel() + + # Wait for all cancellations to be acknowledged. + # If the workflow completes before cancellation requests are delivered, + # the server drops them. Waiting ensures all handlers receive the + # cancellation. + for task in pending: + try: + await task + except (NexusOperationError, CancelledError): + # Expected: the operation was cancelled + workflow.logger.info("Operation was cancelled") + + return result.message \ No newline at end of file diff --git a/nexus_cancel/handler/__init__.py b/nexus_cancel/handler/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nexus_cancel/handler/service_handler.py b/nexus_cancel/handler/service_handler.py new file mode 100644 index 00000000..92868510 --- /dev/null +++ b/nexus_cancel/handler/service_handler.py @@ -0,0 +1,27 @@ +""" +Nexus service handler for the cancellation sample. + +The hello operation is backed by a workflow, using the Nexus request ID as the +workflow ID for idempotency across retries. +""" + +from __future__ import annotations + +import nexusrpc +from temporalio import nexus + +from nexus_cancel.handler.workflows import HelloHandlerWorkflow +from nexus_cancel.service import HelloInput, HelloOutput, NexusService + + +@nexusrpc.handler.service_handler(service=NexusService) +class NexusServiceHandler: + @nexus.workflow_run_operation + async def hello( + self, ctx: nexus.WorkflowRunOperationContext, input: HelloInput + ) -> nexus.WorkflowHandle[HelloOutput]: + return await ctx.start_workflow( + HelloHandlerWorkflow.run, + input, + id=ctx.request_id, + ) diff --git a/nexus_cancel/handler/worker.py b/nexus_cancel/handler/worker.py new file mode 100644 index 00000000..e29df355 --- /dev/null +++ b/nexus_cancel/handler/worker.py @@ -0,0 +1,48 @@ +""" +Worker for the handler namespace that processes Nexus operations and workflows. +""" + +import asyncio +import logging +from typing import Optional + +from temporalio.client import Client +from temporalio.envconfig import ClientConfig +from temporalio.worker import Worker + +from nexus_cancel.handler.service_handler import NexusServiceHandler +from nexus_cancel.handler.workflows import HelloHandlerWorkflow + +interrupt_event = asyncio.Event() + +NAMESPACE = "nexus-cancel-handler-namespace" +TASK_QUEUE = "nexus-cancel-handler-task-queue" + + +async def main(client: Optional[Client] = None): + logging.basicConfig(level=logging.INFO) + + if not client: + config = ClientConfig.load_client_connect_config() + config.setdefault("target_host", "localhost:7233") + config.setdefault("namespace", NAMESPACE) + client = await Client.connect(**config) + + async with Worker( + client, + task_queue=TASK_QUEUE, + workflows=[HelloHandlerWorkflow], + nexus_service_handlers=[NexusServiceHandler()], + ): + logging.info("Worker started, ctrl+c to exit") + await interrupt_event.wait() + logging.info("Shutting down") + + +if __name__ == "__main__": + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(main()) + except KeyboardInterrupt: + interrupt_event.set() + loop.run_until_complete(loop.shutdown_asyncgens()) diff --git a/nexus_cancel/handler/workflows.py b/nexus_cancel/handler/workflows.py new file mode 100644 index 00000000..d799c62b --- /dev/null +++ b/nexus_cancel/handler/workflows.py @@ -0,0 +1,49 @@ +""" +Handler workflow started by the hello Nexus operation. + +Demonstrates how to handle cancellation from the caller workflow using a +detached cancellation scope (asyncio.shield) for cleanup work. +""" + +import asyncio + +from temporalio import workflow + +with workflow.unsafe.imports_passed_through(): + from nexus_cancel.service import HelloInput, HelloOutput, Language + +GREETINGS = { + Language.EN: "Hello {name} πŸ‘‹", + Language.FR: "Bonjour {name} πŸ‘‹", + Language.DE: "Hallo {name} πŸ‘‹", + Language.ES: "Β‘Hola! {name} πŸ‘‹", + Language.TR: "Merhaba {name} πŸ‘‹", +} + + +@workflow.defn +class HelloHandlerWorkflow: + @workflow.run + async def run(self, input: HelloInput) -> HelloOutput: + try: + # Sleep for a random duration to simulate work (0-5 seconds) + random_seconds = workflow.random().randint(0, 5) + workflow.logger.info(f"Working for {random_seconds} seconds...") + await asyncio.sleep(random_seconds) + + # Return a greeting based on the language + greeting = GREETINGS[input.language].format(name=input.name) + return HelloOutput(message=greeting) + + except asyncio.CancelledError: + # Perform cleanup in a detached cancellation scope. + # asyncio.shield prevents the cleanup work from being cancelled. + workflow.logger.info("Received cancellation request, performing cleanup...") + try: + cleanup_seconds = workflow.random().randint(0, 5) + await asyncio.shield(asyncio.sleep(cleanup_seconds)) + except asyncio.CancelledError: + pass + workflow.logger.info("HelloHandlerWorkflow was cancelled successfully.") + # Re-raise the cancellation error + raise diff --git a/nexus_cancel/service.py b/nexus_cancel/service.py new file mode 100644 index 00000000..454a32f7 --- /dev/null +++ b/nexus_cancel/service.py @@ -0,0 +1,35 @@ +""" +Nexus service definition for the cancellation sample. + +Defines a NexusService with a single `hello` operation that takes a name and +language, and returns a greeting message. +""" + +from dataclasses import dataclass +from enum import IntEnum + +import nexusrpc + + +class Language(IntEnum): + EN = 0 + FR = 1 + DE = 2 + ES = 3 + TR = 4 + + +@dataclass +class HelloInput: + name: str + language: Language + + +@dataclass +class HelloOutput: + message: str + + +@nexusrpc.service +class NexusService: + hello: nexusrpc.Operation[HelloInput, HelloOutput] From 046712d43feb1a003a8e012180c2bd38f5d2cedd Mon Sep 17 00:00:00 2001 From: Quinn Klassen Date: Wed, 25 Feb 2026 15:00:54 -0800 Subject: [PATCH 2/2] format --- nexus_cancel/caller/workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nexus_cancel/caller/workflows.py b/nexus_cancel/caller/workflows.py index 0bb1931c..f8a2e1ff 100644 --- a/nexus_cancel/caller/workflows.py +++ b/nexus_cancel/caller/workflows.py @@ -66,4 +66,4 @@ async def run_operation(language: Language): # Expected: the operation was cancelled workflow.logger.info("Operation was cancelled") - return result.message \ No newline at end of file + return result.message