From 56ccb1532398a0da4cc7600b7f141022321908cd Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Wed, 4 Feb 2026 21:17:30 -0300 Subject: [PATCH] feat: add processor plugin system Adds support for third-party processor plugins via plugin discovery: - PluginType.PROCESSOR for external processor plugins - ProcessorRegistry discovers and loads processor plugins - processor_types.py with plugin-injected type union - PluginRegistry uses RLock for nested imports Demo processors: - RegexFilterProcessor (preprocess stage) - SemanticDedupProcessor (postprocess stage) --- demo/data_designer_demo_processors/README.md | 61 +++++ .../notebooks/demo_processors.py | 120 +++++++++ .../pyproject.toml | 25 ++ .../data_designer_demo_processors/__init__.py | 4 + .../download_model.py | 19 ++ .../regex_filter/__init__.py | 7 + .../regex_filter/config.py | 20 ++ .../regex_filter/impl.py | 47 ++++ .../regex_filter/plugin.py | 10 + .../semantic_dedup/__init__.py | 7 + .../semantic_dedup/config.py | 28 ++ .../semantic_dedup/impl.py | 74 ++++++ .../semantic_dedup/plugin.py | 10 + .../tests/__init__.py | 2 + .../tests/test_regex_filter.py | 56 ++++ .../tests/test_semantic_dedup.py | 70 +++++ docs/plugins/overview.md | 22 +- .../data_designer/config/config_builder.py | 3 +- .../config/data_designer_config.py | 2 +- .../data_designer/config/processor_types.py | 17 ++ .../src/data_designer/config/processors.py | 4 - .../src/data_designer/plugin_manager.py | 37 +++ .../src/data_designer/plugins/plugin.py | 3 + .../src/data_designer/plugins/registry.py | 2 +- .../engine/processing/processors/registry.py | 11 +- .../src/data_designer/engine/validation.py | 3 +- plans/processor-plugins.md | 246 ++++++++++++++++++ 27 files changed, 898 insertions(+), 12 deletions(-) create mode 100644 demo/data_designer_demo_processors/README.md create mode 100644 demo/data_designer_demo_processors/notebooks/demo_processors.py create mode 100644 demo/data_designer_demo_processors/pyproject.toml create mode 100644 demo/data_designer_demo_processors/src/data_designer_demo_processors/__init__.py create mode 100644 demo/data_designer_demo_processors/src/data_designer_demo_processors/download_model.py create mode 100644 demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/__init__.py create mode 100644 demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/config.py create mode 100644 demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/impl.py create mode 100644 demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/plugin.py create mode 100644 demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/__init__.py create mode 100644 demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/config.py create mode 100644 demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/impl.py create mode 100644 demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/plugin.py create mode 100644 demo/data_designer_demo_processors/tests/__init__.py create mode 100644 demo/data_designer_demo_processors/tests/test_regex_filter.py create mode 100644 demo/data_designer_demo_processors/tests/test_semantic_dedup.py create mode 100644 packages/data-designer-config/src/data_designer/config/processor_types.py create mode 100644 plans/processor-plugins.md diff --git a/demo/data_designer_demo_processors/README.md b/demo/data_designer_demo_processors/README.md new file mode 100644 index 00000000..847ff0ea --- /dev/null +++ b/demo/data_designer_demo_processors/README.md @@ -0,0 +1,61 @@ +# Data Designer Demo Processors + +Demo processor plugins demonstrating PRE_GENERATION and POST_GENERATION stages. + +## Installation + +```bash +uv pip install -e demo/data_designer_demo_processors +``` + +## Processors + +### RegexFilterProcessor (PRE_GENERATION) + +Filters seed data rows based on regex pattern matching. + +```python +from data_designer.config.config_builder import DataDesignerConfigBuilder +from data_designer_demo_processors.regex_filter import RegexFilterProcessorConfig + +builder = DataDesignerConfigBuilder(model_configs=[...]) +builder.add_processor(RegexFilterProcessorConfig( + name="filter_emails", + column="email", + pattern=r"@company\.com$", + invert=False, # Keep only matching rows +)) +``` + +### SemanticDedupProcessor (POST_GENERATION) + +Removes semantically similar rows using sentence embeddings. + +```python +from data_designer_demo_processors.semantic_dedup import SemanticDedupProcessorConfig + +builder.add_processor(SemanticDedupProcessorConfig( + name="dedup_responses", + column="response", + similarity_threshold=0.9, # Remove rows with >90% similarity + model_name="all-MiniLM-L6-v2", +)) +``` + +## Pre-downloading the Embedding Model + +The semantic dedup processor downloads the embedding model on first use. To pre-download: + +```bash +download-semantic-dedup-model +``` + +## Entry Points + +The package registers plugins via entry points: + +```toml +[project.entry-points."data_designer.plugins"] +regex-filter = "data_designer_demo_processors.regex_filter.plugin:regex_filter_plugin" +semantic-dedup = "data_designer_demo_processors.semantic_dedup.plugin:semantic_dedup_plugin" +``` diff --git a/demo/data_designer_demo_processors/notebooks/demo_processors.py b/demo/data_designer_demo_processors/notebooks/demo_processors.py new file mode 100644 index 00000000..5ce55641 --- /dev/null +++ b/demo/data_designer_demo_processors/notebooks/demo_processors.py @@ -0,0 +1,120 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Demo: Processor Plugins with PRE_GENERATION and POST_GENERATION stages. + +This notebook demonstrates: +1. RegexFilterProcessor (PRE_GENERATION) - filters seed data before generation +2. SemanticDedupProcessor (POST_GENERATION) - deduplicates final dataset + +Run cells with `#%%` markers in VS Code or PyCharm. +""" + +# %% Imports +import tempfile +from pathlib import Path + +import pandas as pd +from data_designer_demo_processors.regex_filter import RegexFilterProcessorConfig +from data_designer_demo_processors.semantic_dedup import SemanticDedupProcessorConfig + +import data_designer.config as dd +from data_designer.interface import DataDesigner + +# %% Create seed data with some rows we want to filter out +seed_data = pd.DataFrame( + { + "topic": [ + "Python programming", + "Machine learning", + "SPAM: Buy now!", # Will be filtered by regex + "Data science", + "SPAM: Click here", # Will be filtered by regex + "Natural language processing", + "Computer vision", + ], + "difficulty": ["beginner", "advanced", "N/A", "intermediate", "N/A", "advanced", "advanced"], + } +) + +print("Seed data before PRE_GENERATION filtering:") +print(seed_data) +print(f"Total rows: {len(seed_data)}") + +# %% Setup temporary directory and save seed data +output_dir = Path(tempfile.mkdtemp()) +seed_path = output_dir / "seed.parquet" +seed_data.to_parquet(seed_path, index=False) + +# %% Build the Data Designer configuration (uses default openai-text model) +config_builder = dd.DataDesignerConfigBuilder() + +# Add seed dataset +config_builder.with_seed_dataset(dd.LocalFileSeedSource(path=str(seed_path))) + +# Add LLM column to generate explanations +config_builder.add_column( + dd.LLMTextColumnConfig( + name="explanation", + prompt="""Write a brief one-sentence explanation of the topic: {{ topic }} +Difficulty level: {{ difficulty }} + +Keep it concise and educational.""", + model_alias="openai-text", + ) +) + +# Add PRE_GENERATION processor to filter out spam rows +config_builder.add_processor( + RegexFilterProcessorConfig( + name="filter_spam", + column="topic", + pattern=r"^SPAM:", + invert=True, # Keep rows that do NOT match (i.e., filter out spam) + ) +) + +# Add POST_GENERATION processor to deduplicate similar explanations +config_builder.add_processor( + SemanticDedupProcessorConfig( + name="dedup_explanations", + column="explanation", + similarity_threshold=0.85, + ) +) + +print("Configuration created successfully!") +processor_configs = config_builder.get_processor_configs() +print(f"Processors configured: {[p.name for p in processor_configs]}") + +# %% Run preview to test with a few records +data_designer = DataDesigner() + +print("\nRunning preview (3 records)...") +preview = data_designer.preview(config_builder, num_records=3) + +print("\nPreview dataset:") +print(preview.dataset) + +# %% Run full generation +print("\nRunning full generation...") +results = data_designer.create( + config_builder, + num_records=5, + dataset_name="processor-demo", +) + +# Load the final dataset +final_dataset = results.load_dataset() + +print("\nFinal dataset after all processors:") +print(final_dataset) +print(f"\nTotal rows in final dataset: {len(final_dataset)}") + +# %% Summary +print("\n" + "=" * 60) +print("DEMO SUMMARY") +print("=" * 60) +print(f"Original seed rows: {len(seed_data)}") +print("After PRE_GENERATION (regex filter): Expected ~5 rows (SPAM removed)") +print(f"After POST_GENERATION (semantic dedup): {len(final_dataset)} rows") diff --git a/demo/data_designer_demo_processors/pyproject.toml b/demo/data_designer_demo_processors/pyproject.toml new file mode 100644 index 00000000..49cbda2c --- /dev/null +++ b/demo/data_designer_demo_processors/pyproject.toml @@ -0,0 +1,25 @@ +[project] +name = "data-designer-demo-processors" +version = "0.1.0" +description = "Demo processor plugins for Data Designer showing PRE_GENERATION and POST_GENERATION stages" +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "data-designer-config", + "data-designer-engine", + "sentence-transformers>=2.2.0", +] + +[project.entry-points."data_designer.plugins"] +regex-filter = "data_designer_demo_processors.regex_filter.plugin:regex_filter_plugin" +semantic-dedup = "data_designer_demo_processors.semantic_dedup.plugin:semantic_dedup_plugin" + +[project.scripts] +download-semantic-dedup-model = "data_designer_demo_processors.download_model:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/data_designer_demo_processors"] diff --git a/demo/data_designer_demo_processors/src/data_designer_demo_processors/__init__.py b/demo/data_designer_demo_processors/src/data_designer_demo_processors/__init__.py new file mode 100644 index 00000000..955d001d --- /dev/null +++ b/demo/data_designer_demo_processors/src/data_designer_demo_processors/__init__.py @@ -0,0 +1,4 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Demo processor plugins for Data Designer.""" diff --git a/demo/data_designer_demo_processors/src/data_designer_demo_processors/download_model.py b/demo/data_designer_demo_processors/src/data_designer_demo_processors/download_model.py new file mode 100644 index 00000000..72ad60e5 --- /dev/null +++ b/demo/data_designer_demo_processors/src/data_designer_demo_processors/download_model.py @@ -0,0 +1,19 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Pre-download the semantic dedup embedding model.""" + +DEFAULT_MODEL = "all-MiniLM-L6-v2" + + +def main(): + """Download the embedding model to cache.""" + from sentence_transformers import SentenceTransformer + + print(f"Downloading model: {DEFAULT_MODEL}") + SentenceTransformer(DEFAULT_MODEL) + print("Model downloaded successfully!") + + +if __name__ == "__main__": + main() diff --git a/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/__init__.py b/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/__init__.py new file mode 100644 index 00000000..b486d99f --- /dev/null +++ b/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/__init__.py @@ -0,0 +1,7 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from data_designer_demo_processors.regex_filter.config import RegexFilterProcessorConfig +from data_designer_demo_processors.regex_filter.impl import RegexFilterProcessor + +__all__ = ["RegexFilterProcessorConfig", "RegexFilterProcessor"] diff --git a/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/config.py b/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/config.py new file mode 100644 index 00000000..da7e6916 --- /dev/null +++ b/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/config.py @@ -0,0 +1,20 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from typing import Literal + +from pydantic import Field + +from data_designer.config.processors import ProcessorConfig + + +class RegexFilterProcessorConfig(ProcessorConfig): + """Filter rows based on regex matching on a column. + + This processor filters seed data during the preprocess stage. + """ + + processor_type: Literal["regex-filter"] = "regex-filter" + column: str = Field(description="Column to apply regex filter on") + pattern: str = Field(description="Regex pattern to match") + invert: bool = Field(default=False, description="If True, keep rows that do NOT match") diff --git a/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/impl.py b/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/impl.py new file mode 100644 index 00000000..98dd8555 --- /dev/null +++ b/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/impl.py @@ -0,0 +1,47 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import logging +import re +from typing import TYPE_CHECKING + +from data_designer.engine.processing.processors.base import Processor +from data_designer_demo_processors.regex_filter.config import RegexFilterProcessorConfig + +if TYPE_CHECKING: + import pandas as pd + +logger = logging.getLogger(__name__) + + +class RegexFilterProcessor(Processor[RegexFilterProcessorConfig]): + """Filters rows based on regex matching on a specified column. + + Runs during preprocess to filter seed data before generation. + """ + + def preprocess(self, data: pd.DataFrame) -> pd.DataFrame: + column = self.config.column + pattern = self.config.pattern + invert = self.config.invert + + if column not in data.columns: + logger.warning(f"โš ๏ธ Column '{column}' not found in dataset. Skipping regex filter.") + return data + + compiled = re.compile(pattern) + mask = data[column].astype(str).apply(lambda x: bool(compiled.search(x))) + + if invert: + mask = ~mask + + original_count = len(data) + data = data[mask].reset_index(drop=True) + filtered_count = original_count - len(data) + + action = "excluded" if not invert else "kept only non-matching" + logger.info(f"๐Ÿ” Regex filter: {filtered_count} rows {action} (pattern: {pattern!r} on column '{column}')") + + return data diff --git a/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/plugin.py b/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/plugin.py new file mode 100644 index 00000000..bc091b5d --- /dev/null +++ b/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/plugin.py @@ -0,0 +1,10 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from data_designer.plugins.plugin import Plugin, PluginType + +regex_filter_plugin = Plugin( + config_qualified_name="data_designer_demo_processors.regex_filter.config.RegexFilterProcessorConfig", + impl_qualified_name="data_designer_demo_processors.regex_filter.impl.RegexFilterProcessor", + plugin_type=PluginType.PROCESSOR, +) diff --git a/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/__init__.py b/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/__init__.py new file mode 100644 index 00000000..993b7eb2 --- /dev/null +++ b/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/__init__.py @@ -0,0 +1,7 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from data_designer_demo_processors.semantic_dedup.config import SemanticDedupProcessorConfig +from data_designer_demo_processors.semantic_dedup.impl import SemanticDedupProcessor + +__all__ = ["SemanticDedupProcessorConfig", "SemanticDedupProcessor"] diff --git a/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/config.py b/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/config.py new file mode 100644 index 00000000..04ecb1ab --- /dev/null +++ b/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/config.py @@ -0,0 +1,28 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from typing import Literal + +from pydantic import Field + +from data_designer.config.processors import ProcessorConfig + + +class SemanticDedupProcessorConfig(ProcessorConfig): + """Remove semantically similar rows using embeddings. + + This processor deduplicates the final dataset during the postprocess stage. + """ + + processor_type: Literal["semantic-dedup"] = "semantic-dedup" + column: str = Field(description="Column to compute embeddings on for deduplication") + similarity_threshold: float = Field( + default=0.9, + ge=0.0, + le=1.0, + description="Cosine similarity threshold above which rows are considered duplicates", + ) + model_name: str = Field( + default="all-MiniLM-L6-v2", + description="Sentence-transformers model name for computing embeddings", + ) diff --git a/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/impl.py b/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/impl.py new file mode 100644 index 00000000..699367ba --- /dev/null +++ b/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/impl.py @@ -0,0 +1,74 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +import numpy as np +import transformers.utils.logging as transformers_logging +from sentence_transformers import SentenceTransformer + +from data_designer.engine.processing.processors.base import Processor +from data_designer_demo_processors.semantic_dedup.config import SemanticDedupProcessorConfig + +if TYPE_CHECKING: + import pandas as pd + +logger = logging.getLogger(__name__) + + +class SemanticDedupProcessor(Processor[SemanticDedupProcessorConfig]): + """Removes semantically similar rows using embeddings. + + Runs during postprocess to deduplicate the final generated dataset. + """ + + def _initialize(self) -> None: + # Suppress sentence-transformers/transformers logging noise + transformers_logging.set_verbosity_error() + transformers_logging.disable_progress_bar() + + self._model = SentenceTransformer(self.config.model_name) + + def postprocess(self, data: pd.DataFrame) -> pd.DataFrame: + column = self.config.column + threshold = self.config.similarity_threshold + + if column not in data.columns: + logger.warning(f"โš ๏ธ Column '{column}' not found in dataset. Skipping semantic dedup.") + return data + + if len(data) == 0: + return data + + texts = data[column].astype(str).tolist() + embeddings = self._model.encode(texts, show_progress_bar=False, convert_to_numpy=True) + + # Normalize embeddings for cosine similarity + norms = np.linalg.norm(embeddings, axis=1, keepdims=True) + norms[norms == 0] = 1 # Avoid division by zero + embeddings = embeddings / norms + + # Find duplicates using greedy approach: keep first occurrence, remove similar ones + keep_indices = [] + for i in range(len(embeddings)): + is_duplicate = False + for kept_idx in keep_indices: + similarity = np.dot(embeddings[i], embeddings[kept_idx]) + if similarity >= threshold: + is_duplicate = True + break + if not is_duplicate: + keep_indices.append(i) + + original_count = len(data) + data = data.iloc[keep_indices].reset_index(drop=True) + removed_count = original_count - len(data) + + logger.info( + f"๐Ÿงน Semantic dedup: removed {removed_count} similar rows (threshold: {threshold}, column: '{column}')" + ) + + return data diff --git a/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/plugin.py b/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/plugin.py new file mode 100644 index 00000000..cd49c760 --- /dev/null +++ b/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/plugin.py @@ -0,0 +1,10 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from data_designer.plugins.plugin import Plugin, PluginType + +semantic_dedup_plugin = Plugin( + config_qualified_name="data_designer_demo_processors.semantic_dedup.config.SemanticDedupProcessorConfig", + impl_qualified_name="data_designer_demo_processors.semantic_dedup.impl.SemanticDedupProcessor", + plugin_type=PluginType.PROCESSOR, +) diff --git a/demo/data_designer_demo_processors/tests/__init__.py b/demo/data_designer_demo_processors/tests/__init__.py new file mode 100644 index 00000000..52a7a9da --- /dev/null +++ b/demo/data_designer_demo_processors/tests/__init__.py @@ -0,0 +1,2 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 diff --git a/demo/data_designer_demo_processors/tests/test_regex_filter.py b/demo/data_designer_demo_processors/tests/test_regex_filter.py new file mode 100644 index 00000000..6ef25f76 --- /dev/null +++ b/demo/data_designer_demo_processors/tests/test_regex_filter.py @@ -0,0 +1,56 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from unittest.mock import Mock + +import pandas as pd +import pytest +from data_designer_demo_processors.regex_filter import RegexFilterProcessor, RegexFilterProcessorConfig + + +@pytest.fixture +def stub_resource_provider(): + return Mock() + + +def test_regex_filter_keeps_matching_rows(stub_resource_provider): + config = RegexFilterProcessorConfig(name="test", column="text", pattern=r"hello") + processor = RegexFilterProcessor(config=config, resource_provider=stub_resource_provider) + + df = pd.DataFrame({"text": ["hello world", "goodbye", "say hello", "nothing"]}) + result = processor.process(df) + + assert len(result) == 2 + assert list(result["text"]) == ["hello world", "say hello"] + + +def test_regex_filter_invert_keeps_non_matching(stub_resource_provider): + config = RegexFilterProcessorConfig(name="test", column="text", pattern=r"hello", invert=True) + processor = RegexFilterProcessor(config=config, resource_provider=stub_resource_provider) + + df = pd.DataFrame({"text": ["hello world", "goodbye", "say hello", "nothing"]}) + result = processor.process(df) + + assert len(result) == 2 + assert list(result["text"]) == ["goodbye", "nothing"] + + +def test_regex_filter_missing_column_returns_unchanged(stub_resource_provider): + config = RegexFilterProcessorConfig(name="test", column="missing", pattern=r"hello") + processor = RegexFilterProcessor(config=config, resource_provider=stub_resource_provider) + + df = pd.DataFrame({"text": ["hello world", "goodbye"]}) + result = processor.process(df) + + assert len(result) == 2 + + +def test_regex_filter_complex_pattern(stub_resource_provider): + config = RegexFilterProcessorConfig(name="test", column="email", pattern=r"^\w+@\w+\.\w+$") + processor = RegexFilterProcessor(config=config, resource_provider=stub_resource_provider) + + df = pd.DataFrame({"email": ["user@example.com", "invalid", "other@test.org", "bad@"]}) + result = processor.process(df) + + assert len(result) == 2 + assert list(result["email"]) == ["user@example.com", "other@test.org"] diff --git a/demo/data_designer_demo_processors/tests/test_semantic_dedup.py b/demo/data_designer_demo_processors/tests/test_semantic_dedup.py new file mode 100644 index 00000000..e247eb2c --- /dev/null +++ b/demo/data_designer_demo_processors/tests/test_semantic_dedup.py @@ -0,0 +1,70 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from unittest.mock import Mock + +import pandas as pd +import pytest +from data_designer_demo_processors.semantic_dedup import SemanticDedupProcessor, SemanticDedupProcessorConfig + + +@pytest.fixture +def stub_resource_provider(): + return Mock() + + +@pytest.mark.slow +def test_semantic_dedup_removes_similar_rows(stub_resource_provider): + config = SemanticDedupProcessorConfig(name="test", column="text", similarity_threshold=0.9) + processor = SemanticDedupProcessor(config=config, resource_provider=stub_resource_provider) + + df = pd.DataFrame( + { + "text": [ + "The cat sat on the mat", + "A cat was sitting on the mat", # Very similar to first + "Dogs like to play fetch", + "The dog enjoys playing fetch", # Very similar to third + "Quantum physics is fascinating", + ] + } + ) + result = processor.process(df) + + # Should keep only dissimilar rows + assert len(result) < len(df) + assert len(result) >= 3 # At least 3 distinct topics + + +@pytest.mark.slow +def test_semantic_dedup_missing_column_returns_unchanged(stub_resource_provider): + config = SemanticDedupProcessorConfig(name="test", column="missing", similarity_threshold=0.9) + processor = SemanticDedupProcessor(config=config, resource_provider=stub_resource_provider) + + df = pd.DataFrame({"text": ["hello", "world"]}) + result = processor.process(df) + + assert len(result) == 2 + + +@pytest.mark.slow +def test_semantic_dedup_empty_dataframe(stub_resource_provider): + config = SemanticDedupProcessorConfig(name="test", column="text", similarity_threshold=0.9) + processor = SemanticDedupProcessor(config=config, resource_provider=stub_resource_provider) + + df = pd.DataFrame({"text": []}) + result = processor.process(df) + + assert len(result) == 0 + + +@pytest.mark.slow +def test_semantic_dedup_low_threshold_removes_more(stub_resource_provider): + config = SemanticDedupProcessorConfig(name="test", column="text", similarity_threshold=0.5) + processor = SemanticDedupProcessor(config=config, resource_provider=stub_resource_provider) + + df = pd.DataFrame({"text": ["apple", "orange", "banana", "grape", "lemon"]}) + result = processor.process(df) + + # With low threshold, fruit words might be considered similar + assert len(result) <= len(df) diff --git a/docs/plugins/overview.md b/docs/plugins/overview.md index 0016120f..b38ad6a2 100644 --- a/docs/plugins/overview.md +++ b/docs/plugins/overview.md @@ -7,9 +7,10 @@ Plugins are Python packages that extend Data Designer's capabilities without modifying the core library. Similar to [VS Code extensions](https://marketplace.visualstudio.com/vscode) and [Pytest plugins](https://docs.pytest.org/en/stable/reference/plugin_list.html), the plugin system empowers you to build specialized extensions for your specific use cases and share them with the community. -**Current capabilities**: Data Designer currently supports plugins for column generators (the column types you pass to the config builder's [add_column](../code_reference/config_builder.md#data_designer.config.config_builder.DataDesignerConfigBuilder.add_column) method). +**Current capabilities**: Data Designer supports plugins for: -**Coming soon**: Plugin support for processors, validators, and more! +- **Column generators**: Custom column types you pass to the config builder's [add_column](../code_reference/config_builder.md#data_designer.config.config_builder.DataDesignerConfigBuilder.add_column) method +- **Processors**: Custom transformations that run at `PRE_GENERATION`, `POST_BATCH`, or `POST_GENERATION` stages ## How do you use plugins? @@ -27,9 +28,17 @@ Creating a plugin involves three main steps: ### 1. Implement the Plugin Components +For **column generator plugins**: + - Create a task class inheriting from `ColumnGenerator` - Create a config class inheriting from `SingleColumnConfig` -- Instantiate a `Plugin` object connecting them +- Instantiate a `Plugin` object with `plugin_type=PluginType.COLUMN_GENERATOR` + +For **processor plugins**: + +- Create a task class inheriting from `Processor` +- Create a config class inheriting from `ProcessorConfig` +- Instantiate a `Plugin` object with `plugin_type=PluginType.PROCESSOR` ### 2. Package Your Plugin @@ -37,6 +46,13 @@ Creating a plugin involves three main steps: - Register your plugin using entry points - Define dependencies (including `data-designer`) +**Example entry point configuration in `pyproject.toml`:** + +```toml +[project.entry-points."data_designer.plugins"] +my-processor = "my_package.processor.plugin:my_processor_plugin" +``` + ### 3. Share Your Plugin - Publish to PyPI or another package index diff --git a/packages/data-designer-config/src/data_designer/config/config_builder.py b/packages/data-designer-config/src/data_designer/config/config_builder.py index 7bf791eb..a893e327 100644 --- a/packages/data-designer-config/src/data_designer/config/config_builder.py +++ b/packages/data-designer-config/src/data_designer/config/config_builder.py @@ -27,7 +27,8 @@ from data_designer.config.exportable_config import ExportableConfigBase from data_designer.config.mcp import ToolConfig from data_designer.config.models import ModelConfig, load_model_configs -from data_designer.config.processors import ProcessorConfigT, ProcessorType, get_processor_config_from_kwargs +from data_designer.config.processor_types import ProcessorConfigT +from data_designer.config.processors import ProcessorType, get_processor_config_from_kwargs from data_designer.config.sampler_constraints import ( ColumnConstraintT, ColumnInequalityConstraint, diff --git a/packages/data-designer-config/src/data_designer/config/data_designer_config.py b/packages/data-designer-config/src/data_designer/config/data_designer_config.py index 5a0a9613..e65a67de 100644 --- a/packages/data-designer-config/src/data_designer/config/data_designer_config.py +++ b/packages/data-designer-config/src/data_designer/config/data_designer_config.py @@ -12,7 +12,7 @@ from data_designer.config.exportable_config import ExportableConfigBase from data_designer.config.mcp import ToolConfig from data_designer.config.models import ModelConfig -from data_designer.config.processors import ProcessorConfigT +from data_designer.config.processor_types import ProcessorConfigT from data_designer.config.sampler_constraints import ColumnConstraintT from data_designer.config.seed import SeedConfig diff --git a/packages/data-designer-config/src/data_designer/config/processor_types.py b/packages/data-designer-config/src/data_designer/config/processor_types.py new file mode 100644 index 00000000..a90f582d --- /dev/null +++ b/packages/data-designer-config/src/data_designer/config/processor_types.py @@ -0,0 +1,17 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from typing_extensions import TypeAlias + +from data_designer.config.processors import ( + DropColumnsProcessorConfig, + SchemaTransformProcessorConfig, +) +from data_designer.plugin_manager import PluginManager + +plugin_manager = PluginManager() + +ProcessorConfigT: TypeAlias = DropColumnsProcessorConfig | SchemaTransformProcessorConfig +ProcessorConfigT = plugin_manager.inject_into_processor_config_type_union(ProcessorConfigT) diff --git a/packages/data-designer-config/src/data_designer/config/processors.py b/packages/data-designer-config/src/data_designer/config/processors.py index 21d94b78..3dba7b29 100644 --- a/packages/data-designer-config/src/data_designer/config/processors.py +++ b/packages/data-designer-config/src/data_designer/config/processors.py @@ -9,7 +9,6 @@ from typing import Any, Literal from pydantic import Field, field_validator -from typing_extensions import TypeAlias from data_designer.config.base import ConfigBase from data_designer.config.errors import InvalidConfigError @@ -129,6 +128,3 @@ def validate_template(cls, v: dict[str, Any]) -> dict[str, Any]: if "not JSON serializable" in str(e): raise InvalidConfigError("Template must be JSON serializable") return v - - -ProcessorConfigT: TypeAlias = DropColumnsProcessorConfig | SchemaTransformProcessorConfig diff --git a/packages/data-designer-config/src/data_designer/plugin_manager.py b/packages/data-designer-config/src/data_designer/plugin_manager.py index 768e5314..7d204c02 100644 --- a/packages/data-designer-config/src/data_designer/plugin_manager.py +++ b/packages/data-designer-config/src/data_designer/plugin_manager.py @@ -76,3 +76,40 @@ def inject_into_seed_source_type_union(self, seed_source_type: type[TypeAlias]) """ seed_source_type = self._plugin_registry.add_plugin_types_to_union(seed_source_type, PluginType.SEED_READER) return seed_source_type + + def get_processor_plugins(self) -> list[Plugin]: + """Get all processor plugins. + + Returns: + A list of all processor plugins. + """ + return self._plugin_registry.get_plugins(PluginType.PROCESSOR) + + def get_processor_plugin_if_exists(self, plugin_name: str) -> Plugin | None: + """Get a processor plugin by name if it exists. + + Args: + plugin_name: The name of the plugin to retrieve. + + Returns: + The plugin if found, otherwise None. + """ + if self._plugin_registry.plugin_exists(plugin_name): + plugin = self._plugin_registry.get_plugin(plugin_name) + if plugin.plugin_type == PluginType.PROCESSOR: + return plugin + return None + + def inject_into_processor_config_type_union(self, processor_config_type: type[TypeAlias]) -> type[TypeAlias]: + """Inject plugins into the processor config type. + + Args: + processor_config_type: The processor config type to inject plugins into. + + Returns: + The processor config type with plugins injected. + """ + processor_config_type = self._plugin_registry.add_plugin_types_to_union( + processor_config_type, PluginType.PROCESSOR + ) + return processor_config_type diff --git a/packages/data-designer-config/src/data_designer/plugins/plugin.py b/packages/data-designer-config/src/data_designer/plugins/plugin.py index d8c47792..a961062a 100644 --- a/packages/data-designer-config/src/data_designer/plugins/plugin.py +++ b/packages/data-designer-config/src/data_designer/plugins/plugin.py @@ -20,6 +20,7 @@ class PluginType(str, Enum): COLUMN_GENERATOR = "column-generator" SEED_READER = "seed-reader" + PROCESSOR = "processor" @property def discriminator_field(self) -> str: @@ -27,6 +28,8 @@ def discriminator_field(self) -> str: return "column_type" elif self == PluginType.SEED_READER: return "seed_type" + elif self == PluginType.PROCESSOR: + return "processor_type" else: raise ValueError(f"Invalid plugin type: {self.value}") diff --git a/packages/data-designer-config/src/data_designer/plugins/registry.py b/packages/data-designer-config/src/data_designer/plugins/registry.py index b544e146..7ed777f3 100644 --- a/packages/data-designer-config/src/data_designer/plugins/registry.py +++ b/packages/data-designer-config/src/data_designer/plugins/registry.py @@ -23,7 +23,7 @@ class PluginRegistry: _instance = None _plugins_discovered = False - _lock = threading.Lock() + _lock = threading.RLock() _plugins: dict[str, Plugin] = {} diff --git a/packages/data-designer-engine/src/data_designer/engine/processing/processors/registry.py b/packages/data-designer-engine/src/data_designer/engine/processing/processors/registry.py index 9a9b463e..2c583304 100644 --- a/packages/data-designer-engine/src/data_designer/engine/processing/processors/registry.py +++ b/packages/data-designer-engine/src/data_designer/engine/processing/processors/registry.py @@ -13,13 +13,22 @@ from data_designer.engine.processing.processors.drop_columns import DropColumnsProcessor from data_designer.engine.processing.processors.schema_transform import SchemaTransformProcessor from data_designer.engine.registry.base import TaskRegistry +from data_designer.plugins.plugin import PluginType +from data_designer.plugins.registry import PluginRegistry class ProcessorRegistry(TaskRegistry[str, Processor, ConfigBase]): ... -def create_default_processor_registry() -> ProcessorRegistry: +def create_default_processor_registry(with_plugins: bool = True) -> ProcessorRegistry: registry = ProcessorRegistry() registry.register(ProcessorType.SCHEMA_TRANSFORM, SchemaTransformProcessor, SchemaTransformProcessorConfig, False) registry.register(ProcessorType.DROP_COLUMNS, DropColumnsProcessor, DropColumnsProcessorConfig, False) + if with_plugins: + for plugin in PluginRegistry().get_plugins(PluginType.PROCESSOR): + registry.register( + plugin.name, + plugin.impl_cls, + plugin.config_cls, + ) return registry diff --git a/packages/data-designer-engine/src/data_designer/engine/validation.py b/packages/data-designer-engine/src/data_designer/engine/validation.py index 0483d3a0..127ccf50 100644 --- a/packages/data-designer-engine/src/data_designer/engine/validation.py +++ b/packages/data-designer-engine/src/data_designer/engine/validation.py @@ -15,7 +15,8 @@ from rich.panel import Panel from data_designer.config.column_types import ColumnConfigT, DataDesignerColumnType -from data_designer.config.processors import ProcessorConfigT, ProcessorType +from data_designer.config.processor_types import ProcessorConfigT +from data_designer.config.processors import ProcessorType from data_designer.config.utils.constants import RICH_CONSOLE_THEME from data_designer.config.utils.misc import ( can_run_data_designer_locally, diff --git a/plans/processor-plugins.md b/plans/processor-plugins.md new file mode 100644 index 00000000..583d3b55 --- /dev/null +++ b/plans/processor-plugins.md @@ -0,0 +1,246 @@ +# Plan: Processor Plugins with Global Stages + +Created: 2026-02-03 +Status: Iterating and Refining + +## Goal + +Extend the processor system to support global preprocessing (before generation) and postprocessing (after generation) stages, and enable third-party processor plugins via the existing plugin discovery mechanism. + +## Success Criteria + +- [x] `PRE_GENERATION` stage runs once on full seed data before batching/generation +- [x] `POST_GENERATION` stage runs once on final dataset after all batches complete +- [x] `PluginType.PROCESSOR` enables external processor plugins +- [x] ProcessorRegistry loads plugins from entry points +- [x] Demo plugin package demonstrates both preprocessing and postprocessing +- [x] Existing `POST_BATCH` behavior unchanged + +## Implementation Steps + +### Step 1: Extend BuildStage Support + +Update processor configuration to accept new stages. + +- [x] Add `PRE_GENERATION` and `POST_GENERATION` to `SUPPORTED_STAGES` in processors.py +- [x] Add unit tests verifying `ProcessorConfig` accepts the new stage values + +**Suggestion**: Check if `BuildStage` enum already has these values defined elsewhere before adding. + +### Step 2: Update Dataset Builder for Global Stages + +Implement the actual execution of processors at the new stages. + +- [x] Add `_run_pre_generation_processors()` method + - Load full seed dataset before batch loop + - Apply PRE_GENERATION processors sequentially + - Replace the seed reader with an in-memory version containing processed data + - **Suggestion**: Look for existing in-memory seed reader implementations (e.g., `DataFrameSeedReader`) + +- [x] Add `_run_post_generation_processors()` method + - Load the final combined dataset after all batches complete + - Apply POST_GENERATION processors sequentially + - Rewrite the final dataset with processed results + - **Suggestion**: Check how existing artifact storage handles dataset loading/writing + +- [x] Integrate calls into the `build()` method at appropriate points +- [x] Add integration tests for both flows + +### Step 3: Add Processor Plugin Support + +Enable third-party processor plugins through the existing plugin system. + +- [x] Add `PluginType.PROCESSOR` to the plugin types enum +- [x] Update `discriminator_field` property to return `"processor_type"` for processors +- [x] Update `ProcessorRegistry` to discover and load processor plugins + - **Suggestion**: Follow the pattern used for column generator plugins + - Use string keys for plugin processors (not enum values) + +- [x] Inject plugin processor configs into the `ProcessorConfigT` type union + - Follow the existing `_types` pattern used for columns and seed sources + +**Follow the `_types` Module Pattern**: + +The codebase separates base classes from type unions with plugin injection: +- `column_configs.py` (base) โ†’ `column_types.py` (union + injection) +- `seed_source.py` (base) โ†’ `seed_source_types.py` (union + injection) + +Do the same for processors: +- [x] Keep `processors.py` with base classes and concrete configs +- [x] Create `processor_types.py` for `ProcessorConfigT` with plugin injection +- [x] Plugin configs import from `processors.py` (no circular dependency) + +**Threading Note**: + +If you encounter deadlocks during plugin discovery with nested imports, the `PluginRegistry` may need a reentrant lock (`RLock`) instead of `Lock`. + +### Step 4: Create Demo Plugin Package + +Create a separate package demonstrating both processor types. + +- [x] Create package structure under `demo/data_designer_demo_processors/` +- [x] Implement `RegexFilterProcessor` (PRE_GENERATION) + - Config: column, pattern, invert flag + - Filters rows based on regex matching +- [x] Implement `SemanticDedupProcessor` (POST_GENERATION) + - Config: column, similarity_threshold, model_name + - Uses embeddings to find and remove similar rows + - **Suggestion**: Use sentence-transformers with a small model like `all-MiniLM-L6-v2` + +- [x] Configure entry points in `pyproject.toml` under `data_designer.plugins` +- [x] Add unit tests for each processor +- [x] Add README with installation and usage examples + +**Logging Suppression** (for sentence-transformers): + +Sentence-transformers emits progress bars and warnings when loading models. Suppress them: + +- Use `transformers.utils.logging.set_verbosity_error()` to suppress info/warning messages +- Use `transformers.utils.logging.disable_progress_bar()` to suppress progress bars +- Pass `show_progress_bar=False` to `model.encode()` for batch encoding + +### Step 5: Demo Notebook + +Create a simple, short demo that tests all features end-to-end. + +- [x] Use `#%%` cell markers for IDE compatibility +- [x] Keep the demo minimal - just enough to verify the feature works +- [x] Include sample seed data with rows to filter (PRE_GENERATION test) +- [x] Add an LLM column to generate content, use the `openai-text` model +- [x] Configure both PRE_GENERATION and POST_GENERATION processors +- [x] **Run the demo and fix any issues** - don't just write it, execute it +- [x] Verify the output shows filtering and deduplication working + +**Important**: The demo must actually run successfully. Test it before considering this step complete. + +**API Notes**: Check the docs for correct Data Designer API usage. + +### Step 6: Documentation + +Update existing documentation to cover new capabilities. + +- [x] Update processor concepts doc with new stages table +- [x] Update plugins overview to mention processor plugins +- [x] Include example entry point configuration + +## Testing Strategy + +- Write tests alongside implementation, not as a separate step +- Use mocks for external dependencies (seed readers, artifact storage) +- For plugin registry tests, create actual mock classes (not Mock objects) to satisfy type validation + +## Risks & Considerations + +- **Memory usage**: POST_GENERATION holds full dataset in memory +- **Seed data mutation**: PRE_GENERATION modifies seed data before batching +- **Model download**: Embedding models download on first use; perform pre-download on uv install + +## Files Modified + +Core: +- `packages/data-designer-config/src/data_designer/config/processor_types.py` (new) +- `packages/data-designer-config/src/data_designer/config/processors.py` +- `packages/data-designer-config/src/data_designer/config/data_designer_config.py` +- `packages/data-designer-config/src/data_designer/config/config_builder.py` +- `packages/data-designer-config/src/data_designer/plugin_manager.py` +- `packages/data-designer-config/src/data_designer/plugins/plugin.py` +- `packages/data-designer-config/src/data_designer/plugins/registry.py` +- `packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py` +- `packages/data-designer-engine/src/data_designer/engine/processing/processors/registry.py` +- `packages/data-designer-engine/src/data_designer/engine/validation.py` + +Demo: +- `demo/data_designer_demo_processors/` (new package) + +Docs: +- `docs/concepts/processors.md` +- `docs/plugins/overview.md` + +--- + +## Iterating and Refining + +### Issue 1: Preview does not apply PRE_GENERATION / POST_GENERATION processors + +**Problem**: `build_preview()` and `process_preview()` do not call the new global-stage processors. This means users previewing their data pipeline won't see the effects of filtering or deduplication until they run a full build. + +**Investigation**: +- `build()` calls `_run_pre_generation_processors()` before generation and `_run_post_generation_processors()` after +- `build_preview()` skips both +- `process_preview()` only applies `POST_BATCH` processors + +**Fix**: +- [x] Add `_run_pre_generation_processors()` call to `build_preview()` before `_initialize_generators()` +- [x] Update `process_preview()` to also run `POST_GENERATION` processors after `POST_BATCH` +- [x] Add tests for preview with global-stage processors + +### Issue 2: Is RLock necessary in PluginRegistry? + +**Question**: The plan suggested changing `Lock` to `RLock` in `PluginRegistry`. Is this actually needed? + +**Investigation**: + +The `PluginRegistry` singleton uses a lock in three places: +- `__new__`: double-checked locking for singleton creation +- `__init__`: protects `_discover()` call +- `reset()`: resets singleton state + +The potential deadlock scenario: +1. `PluginRegistry.__init__` acquires lock +2. `_discover()` calls `ep.load()` to load a plugin module +3. The plugin imports `data_designer.config.config_builder` (or any module using the config API) +4. That imports `column_types.py`, `processor_types.py`, `seed_source_types.py` +5. Each of those calls `PluginManager()` โ†’ `PluginRegistry()` at module level +6. `PluginRegistry().__init__` tries to acquire lock again (same thread) +7. With `Lock`: deadlock (same thread blocked). With `RLock`: succeeds. + +Import chain that triggers this: +``` +plugin.py โ†’ data_designer.config.config_builder + โ†’ data_designer.config.column_types (calls PluginManager()) + โ†’ data_designer.config.processor_types (calls PluginManager()) + โ†’ data_designer.config.seed_source_types (calls PluginManager()) +``` + +**Conclusion**: YES, `RLock` is necessary. Any third-party plugin that imports from the `data_designer.config` public API (e.g., `DataDesignerConfigBuilder`, `DataDesignerConfig`) would trigger this re-entry. Using a regular `Lock` would cause a deadlock. + +### Iteration 3: Callback-based Processor Design + +**Date**: 2026-02-04 + +**Issue**: The `build_stage` config field determines when a processor runs, but processors are inherently tied to specific stages. A `SemanticDedupProcessor` running at PRE_GENERATION doesn't make sense - it's meant to deduplicate final outputs. This creates a semantic mismatch where config controls something that's really a property of the processor class. + +**Root Cause**: Original design treated stage as configuration rather than as part of the processor's interface. This: +- Polluted config with fields that aren't user-configurable in practice +- Made single-stage limitation implicit (processors can only run at one stage) +- Created inconsistent method signatures (`current_batch_number` only meaningful for POST_BATCH) + +**Resolution**: Changed from stage-based config to callback-based design: + +1. **Removed `build_stage` from `ProcessorConfig`** - Processors no longer declare when they run via config + +2. **Added callback methods to `Processor` base class**: + - `preprocess(data)` - Called at PRE_GENERATION on seed data + - `process_after_batch(data, *, batch_number)` - Called at POST_BATCH for each batch + - `postprocess(data)` - Called at POST_GENERATION on final dataset + +3. **Changed `_processors` from `dict[BuildStage, list]` to `list[Processor]`** - All processors in flat list, callbacks called on all + +4. **Updated built-in processors**: + - `DropColumnsProcessor` โ†’ overrides `process_after_batch()` + - `SchemaTransformProcessor` โ†’ overrides `process_after_batch()` + +5. **Updated demo processors**: + - `RegexFilterProcessor` โ†’ overrides `preprocess()` + - `SemanticDedupProcessor` โ†’ overrides `postprocess()` + +**Benefits**: +- Self-documenting: Looking at a processor class shows what stages it handles +- Multi-stage capable: One processor can implement multiple callbacks +- Clean signatures: Each callback has only relevant parameters +- Simpler config: No `build_stage` field to confuse users + +**Verification**: +- All 1360 tests pass +- Config tests updated to remove `build_stage` parameter +- Builder tests updated for new callback pattern