Skip to content

nanwio/fastapi-worker-pipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

FastAPI Worker Pipeline

Python 3.10+ License: MIT Code style: ruff

Template Method pattern for building data processing pipelines with FastAPI

A lightweight, zero-dependency library that provides a clean abstraction for building data processing workers using the Get-Process-Save pattern.

The Problem

Building data pipelines often leads to code like this:

# Without a pattern - scattered concerns, hard to test
def process_document(doc_id):
    # Fetch
    doc = db.get(doc_id)
    if not doc:
        return {"error": "not found"}

    # Process (mixed with error handling)
    try:
        result = heavy_computation(doc)
    except Exception as e:
        db.mark_failed(doc_id, str(e))
        return {"error": str(e)}

    # Save (duplicated error handling)
    try:
        db.save(doc_id, result)
    except Exception as e:
        return {"error": str(e)}

    return {"status": "success"}

Problems:

  • Fetch, process, and save logic are tangled
  • Error handling is duplicated
  • Hard to test individual phases
  • No clear contract for workers

The Solution

Get-Process-Save pattern with Template Method:

from worker_pipeline import PipelineHandler, ProcessingResult

class DocumentProcessor(PipelineHandler[ProcessedDoc]):
    def fetch_data(self, doc_id, requirements):
        return self.db.get(doc_id)

    def process(self, data):
        # Pure processing - no I/O!
        result = heavy_computation(data)
        return ProcessingResult.ok(result)

    def save(self, doc_id, result):
        return self.db.save(doc_id, result.data)

# Usage
handler = DocumentProcessor()
result = handler.execute(doc_id=123)  # {"status": "success", ...}

Benefits:

  • Clear separation of concerns
  • Each phase is independently testable
  • Standardized error handling
  • Type-safe with generics

Installation

pip install fastapi-worker-pipeline

With SQLAlchemy support:

pip install fastapi-worker-pipeline[sqlalchemy]

Quick Start

1. Define Your Handler

from dataclasses import dataclass
from worker_pipeline import PipelineHandler, ProcessingResult, DataRequirements

@dataclass
class NormalizedUser:
    id: int
    email: str

class UserNormalizer(PipelineHandler[NormalizedUser]):
    def __init__(self, db):
        self.db = db

    def get_data_requirements(self):
        """Define what data is needed."""
        return DataRequirements(required_fields=["id", "email"])

    def fetch_data(self, user_id, requirements):
        """GET: Fetch from database."""
        return self.db.get_user(user_id)

    def process(self, data):
        """PROCESS: Transform data (no I/O!)."""
        normalized = NormalizedUser(
            id=data["id"],
            email=data["email"].lower().strip()
        )
        return ProcessingResult.ok(normalized)

    def save(self, user_id, result):
        """SAVE: Persist results."""
        return self.db.update_user(user_id, result.data)

2. Execute

handler = UserNormalizer(db)

# Process single item
result = handler.execute(user_id=123)
# {"status": "success", "item_id": 123}

# Process multiple
for user_id in user_ids:
    handler.execute(user_id)

3. Integrate with FastAPI

from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

@app.post("/users/{user_id}/normalize")
def normalize_user(user_id: int):
    handler = UserNormalizer(db)
    return handler.execute(user_id)

@app.post("/users/{user_id}/normalize-async")
def normalize_user_async(user_id: int, tasks: BackgroundTasks):
    handler = UserNormalizer(db)
    tasks.add_task(handler.execute, user_id)
    return {"status": "queued"}

Core Concepts

The Three Phases

┌─────────────────────────────────────────────────────────────┐
│                     execute(item_id)                        │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────┐     ┌───────────┐     ┌──────────┐            │
│  │  GET    │ ──▶ │  PROCESS  │ ──▶ │   SAVE   │            │
│  │         │     │           │     │          │            │
│  │ fetch   │     │ transform │     │ persist  │            │
│  │ data    │     │ (no I/O)  │     │ results  │            │
│  └─────────┘     └───────────┘     └──────────┘            │
│                                                             │
└─────────────────────────────────────────────────────────────┘
  1. GET (fetch_data): Retrieve data from storage
  2. PROCESS (process): Transform data - NO I/O ALLOWED
  3. SAVE (save): Persist results to storage

ProcessingResult

Type-safe container for processing outcomes:

# Success
result = ProcessingResult.ok(data, processing_time=150)
result.success  # True
result.data     # Your data
result.metadata # {"processing_time": 150}

# Failure
result = ProcessingResult.fail("Invalid format", line=42)
result.success  # False
result.error    # "Invalid format"
result.metadata # {"line": 42}

DataRequirements

Configure what data is needed:

def get_data_requirements(self):
    return DataRequirements(
        required_fields=["id", "email"],      # Must be present
        optional_fields=["phone"],            # Nice to have
        custom={"include_history": True}      # Custom options
    )

Custom Error Handling

Override hooks for custom error behavior:

class MyHandler(PipelineHandler[T]):
    def on_fetch_error(self, item_id, error):
        # Log to monitoring service
        sentry.capture_exception(error)
        return {"status": "fetch_failed", "item_id": item_id}

    def on_process_error(self, item_id, data, error):
        # Custom retry logic
        return {"status": "retry_needed", "item_id": item_id}

    def on_save_error(self, item_id, result, error):
        # Send to dead letter queue
        dlq.send(item_id, result)
        return {"status": "queued_for_retry"}

SQLAlchemy Integration

from worker_pipeline.contrib.sqlalchemy import SQLAlchemyUnitOfWork

class MyHandler(PipelineHandler[MyResult]):
    def __init__(self, session_factory):
        self.session_factory = session_factory

    def save(self, item_id, result):
        with SQLAlchemyUnitOfWork(self.session_factory) as uow:
            uow.session.add(MyModel(**result.data))
            uow.commit()
        return True

Examples

See the examples/ directory for complete implementations:

Run examples:

python -m examples.etl_example
python -m examples.ml_inference

API Reference

PipelineHandler[T]

Method Type Description
fetch_data(item_id, requirements) Abstract Retrieve data for processing
process(data) Abstract Transform data (no I/O)
save(item_id, result) Abstract Persist results
get_data_requirements() Hook Define data requirements
on_fetch_error(item_id, error) Hook Handle fetch errors
on_process_error(item_id, data, error) Hook Handle process errors
on_save_error(item_id, result, error) Hook Handle save errors
execute(item_id) Template Run the pipeline (DO NOT OVERRIDE)

ProcessingResult[T]

Method/Attribute Description
ok(data, **metadata) Create success result
fail(error, **metadata) Create failure result
success Boolean success status
data Result data (on success)
error Error message (on failure)
metadata Additional metadata dict

DataRequirements

Attribute Description
required_fields List of required field names
optional_fields List of optional field names
custom Dict of custom requirements
to_dict() Convert to kwargs dict
validate(data) Return list of missing fields

Why This Pattern?

Testability

Test each phase independently:

def test_processing():
    handler = MyHandler()
    data = {"email": "  JOHN@EXAMPLE.COM  "}

    result = handler.process(data)

    assert result.success
    assert result.data.email == "john@example.com"

Separation of Concerns

  • fetch_data: Database/API calls
  • process: Pure business logic
  • save: Persistence logic

Idempotent Retries

Safe to retry failed items:

for item_id in failed_items:
    handler.execute(item_id)  # Safe - get-process-save is atomic

Framework Agnostic

Works with any web framework:

  • FastAPI
  • Flask
  • Django
  • Standalone scripts

Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Make your changes
  4. Run tests: pytest
  5. Run linting: ruff check .
  6. Submit a pull request

License

MIT License - see LICENSE for details.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages