Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions nexus_cancel/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Nexus Cancellation

This sample shows how a caller workflow can fan out multiple Nexus operations concurrently, take the first result, and cancel the rest using `WAIT_REQUESTED` cancellation semantics.

With `WAIT_REQUESTED`, the caller proceeds once the handler has received the cancel request — it does not wait for the handler to finish processing the cancellation.

Start a Temporal server. (See the main samples repo [README](../README.md)).

Run the following:

```
temporal operator namespace create --namespace nexus-cancel-handler-namespace
temporal operator namespace create --namespace nexus-cancel-caller-namespace

temporal operator nexus endpoint create \
--name nexus-cancel-endpoint \
--target-namespace nexus-cancel-handler-namespace \
--target-task-queue nexus-cancel-handler-task-queue
```

Next, in separate terminal windows:

## Nexus Handler Worker

```bash
uv run nexus_cancel/handler/worker.py
```

## Nexus Caller App

```bash
uv run nexus_cancel/caller/app.py
```

## Expected Output

On the caller side, you should see a greeting in whichever language completed first:
```
Hello Nexus 👋
```

On the handler side, you should see cancellation log messages for the remaining operations:
```
HelloHandlerWorkflow was cancelled successfully.
HelloHandlerWorkflow was cancelled successfully.
HelloHandlerWorkflow was cancelled successfully.
HelloHandlerWorkflow was cancelled successfully.
```

The caller workflow returns before all handler workflows have completed their cancellation cleanup. This demonstrates `WAIT_REQUESTED` semantics: the caller didn't wait for the handler workflows to finish, but still guaranteed that all handlers received the cancellation request.
Empty file added nexus_cancel/__init__.py
Empty file.
Empty file.
43 changes: 43 additions & 0 deletions nexus_cancel/caller/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import asyncio
import uuid
from typing import Optional

from temporalio.client import Client
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker

from nexus_cancel.caller.workflows import HelloCallerWorkflow

NAMESPACE = "nexus-cancel-caller-namespace"
TASK_QUEUE = "nexus-cancel-caller-task-queue"


async def execute_caller_workflow(
client: Optional[Client] = None,
) -> str:
if client is None:
config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
config.setdefault("namespace", NAMESPACE)
client = await Client.connect(**config)

async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[HelloCallerWorkflow],
):
return await client.execute_workflow(
HelloCallerWorkflow.run,
"Nexus",
id=f"hello-caller-{uuid.uuid4()}",
task_queue=TASK_QUEUE,
)


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
result = loop.run_until_complete(execute_caller_workflow())
print(result)
except KeyboardInterrupt:
loop.run_until_complete(loop.shutdown_asyncgens())
69 changes: 69 additions & 0 deletions nexus_cancel/caller/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
Caller workflow that demonstrates Nexus operation cancellation.

Fans out 5 concurrent Nexus hello operations (one per language), takes the first
result, and cancels the rest using WAIT_REQUESTED cancellation semantics.
"""

import asyncio
from datetime import timedelta

from temporalio import workflow
from temporalio.exceptions import CancelledError, NexusOperationError

with workflow.unsafe.imports_passed_through():
from nexus_cancel.service import HelloInput, Language, NexusService

NEXUS_ENDPOINT = "nexus-cancel-endpoint"


@workflow.defn
class HelloCallerWorkflow:
def __init__(self) -> None:
self.nexus_client = workflow.create_nexus_client(
service=NexusService,
endpoint=NEXUS_ENDPOINT,
)

@workflow.run
async def run(self, message: str) -> str:
# Fan out 5 concurrent Nexus calls, one per language.
# Each task starts and awaits its own operation so all race concurrently.
async def run_operation(language: Language):
handle = await self.nexus_client.start_operation(
NexusService.hello,
HelloInput(name=message, language=language),
schedule_to_close_timeout=timedelta(seconds=10),
cancellation_type=workflow.NexusOperationCancellationType.WAIT_REQUESTED,
)
return await handle

tasks = [asyncio.create_task(run_operation(lang)) for lang in Language]

# Wait for the first operation to complete
workflow.logger.info(
f"Started {len(tasks)} operations, waiting for first to complete..."
)
done, pending = await workflow.wait(tasks, return_when=asyncio.FIRST_COMPLETED)

# Get the result from the first completed operation
result = await done.pop()
workflow.logger.info(f"First operation completed with: {result.message}")

# Cancel all remaining operations
workflow.logger.info(f"Cancelling {len(pending)} remaining operations...")
for task in pending:
task.cancel()

# Wait for all cancellations to be acknowledged.
# If the workflow completes before cancellation requests are delivered,
# the server drops them. Waiting ensures all handlers receive the
# cancellation.
for task in pending:
try:
await task
except (NexusOperationError, CancelledError):
# Expected: the operation was cancelled
workflow.logger.info("Operation was cancelled")

return result.message
Empty file.
27 changes: 27 additions & 0 deletions nexus_cancel/handler/service_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""
Nexus service handler for the cancellation sample.

The hello operation is backed by a workflow, using the Nexus request ID as the
workflow ID for idempotency across retries.
"""

from __future__ import annotations

import nexusrpc
from temporalio import nexus

from nexus_cancel.handler.workflows import HelloHandlerWorkflow
from nexus_cancel.service import HelloInput, HelloOutput, NexusService


@nexusrpc.handler.service_handler(service=NexusService)
class NexusServiceHandler:
@nexus.workflow_run_operation
async def hello(
self, ctx: nexus.WorkflowRunOperationContext, input: HelloInput
) -> nexus.WorkflowHandle[HelloOutput]:
return await ctx.start_workflow(
HelloHandlerWorkflow.run,
input,
id=ctx.request_id,
)
48 changes: 48 additions & 0 deletions nexus_cancel/handler/worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
"""
Worker for the handler namespace that processes Nexus operations and workflows.
"""

import asyncio
import logging
from typing import Optional

from temporalio.client import Client
from temporalio.envconfig import ClientConfig
from temporalio.worker import Worker

from nexus_cancel.handler.service_handler import NexusServiceHandler
from nexus_cancel.handler.workflows import HelloHandlerWorkflow

interrupt_event = asyncio.Event()

NAMESPACE = "nexus-cancel-handler-namespace"
TASK_QUEUE = "nexus-cancel-handler-task-queue"


async def main(client: Optional[Client] = None):
logging.basicConfig(level=logging.INFO)

if not client:
config = ClientConfig.load_client_connect_config()
config.setdefault("target_host", "localhost:7233")
config.setdefault("namespace", NAMESPACE)
client = await Client.connect(**config)

async with Worker(
client,
task_queue=TASK_QUEUE,
workflows=[HelloHandlerWorkflow],
nexus_service_handlers=[NexusServiceHandler()],
):
logging.info("Worker started, ctrl+c to exit")
await interrupt_event.wait()
logging.info("Shutting down")


if __name__ == "__main__":
loop = asyncio.new_event_loop()
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
interrupt_event.set()
loop.run_until_complete(loop.shutdown_asyncgens())
49 changes: 49 additions & 0 deletions nexus_cancel/handler/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
"""
Handler workflow started by the hello Nexus operation.

Demonstrates how to handle cancellation from the caller workflow using a
detached cancellation scope (asyncio.shield) for cleanup work.
"""

import asyncio

from temporalio import workflow

with workflow.unsafe.imports_passed_through():
from nexus_cancel.service import HelloInput, HelloOutput, Language

GREETINGS = {
Language.EN: "Hello {name} 👋",
Language.FR: "Bonjour {name} 👋",
Language.DE: "Hallo {name} 👋",
Language.ES: "¡Hola! {name} 👋",
Language.TR: "Merhaba {name} 👋",
}


@workflow.defn
class HelloHandlerWorkflow:
@workflow.run
async def run(self, input: HelloInput) -> HelloOutput:
try:
# Sleep for a random duration to simulate work (0-5 seconds)
random_seconds = workflow.random().randint(0, 5)
workflow.logger.info(f"Working for {random_seconds} seconds...")
await asyncio.sleep(random_seconds)

# Return a greeting based on the language
greeting = GREETINGS[input.language].format(name=input.name)
return HelloOutput(message=greeting)

except asyncio.CancelledError:
# Perform cleanup in a detached cancellation scope.
# asyncio.shield prevents the cleanup work from being cancelled.
workflow.logger.info("Received cancellation request, performing cleanup...")
try:
cleanup_seconds = workflow.random().randint(0, 5)
await asyncio.shield(asyncio.sleep(cleanup_seconds))
except asyncio.CancelledError:
pass
workflow.logger.info("HelloHandlerWorkflow was cancelled successfully.")
# Re-raise the cancellation error
raise
35 changes: 35 additions & 0 deletions nexus_cancel/service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""
Nexus service definition for the cancellation sample.

Defines a NexusService with a single `hello` operation that takes a name and
language, and returns a greeting message.
"""

from dataclasses import dataclass
from enum import IntEnum

import nexusrpc


class Language(IntEnum):
EN = 0
FR = 1
DE = 2
ES = 3
TR = 4


@dataclass
class HelloInput:
name: str
language: Language


@dataclass
class HelloOutput:
message: str


@nexusrpc.service
class NexusService:
hello: nexusrpc.Operation[HelloInput, HelloOutput]
Loading