From 5cc6f2151cacbd26e21d0a41adbfd4283ef9815f Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Wed, 4 Feb 2026 20:05:03 -0300 Subject: [PATCH 1/2] Refactor processors to use callback-based design Replace stage parameter with callback methods (preprocess, process_after_batch, postprocess). The builder now invokes these callbacks at appropriate stages: PRE_GENERATION, POST_BATCH, and POST_GENERATION. - Remove build_stage from ProcessorConfig - Add callback methods to Processor base class - Update DropColumns and SchemaTransform to use process_after_batch - Simplify ColumnWiseBuilder processor invocation --- docs/concepts/processors.md | 8 +- .../src/data_designer/config/processors.py | 24 +-- .../tests/config/test_processors.py | 43 +--- .../dataset_builders/column_wise_builder.py | 114 +++++++--- .../engine/processing/processors/base.py | 49 ++++- .../processing/processors/drop_columns.py | 21 +- .../processing/processors/schema_transform.py | 31 ++- .../test_column_wise_builder.py | 196 ++++++++++++++++-- .../processors/test_drop_columns.py | 40 ++-- .../processors/test_schema_transform.py | 35 +--- .../tests/engine/test_validation.py | 3 - .../tests/interface/test_data_designer.py | 7 +- 12 files changed, 389 insertions(+), 182 deletions(-) diff --git a/docs/concepts/processors.md b/docs/concepts/processors.md index 46773ecb..d03eefd0 100644 --- a/docs/concepts/processors.md +++ b/docs/concepts/processors.md @@ -13,7 +13,13 @@ Each processor: - Applies its transformation - Passes the result to the next processor (or to output) -Currently, processors run only at the `POST_BATCH` stage, i.e., after column generation completes for each batch. +Processors can run at three stages: + +| Stage | When it runs | Use cases | +|-------|--------------|-----------| +| `PRE_GENERATION` | Once, on full seed data before batching | Filter seed data, validate inputs, normalize data | +| `POST_BATCH` | After each batch completes (default) | Drop columns, transform schema per batch | +| `POST_GENERATION` | Once, on final dataset after all batches | Deduplicate, aggregate statistics, final cleanup | ## Processor Types diff --git a/packages/data-designer-config/src/data_designer/config/processors.py b/packages/data-designer-config/src/data_designer/config/processors.py index db7bb9ce..21d94b78 100644 --- a/packages/data-designer-config/src/data_designer/config/processors.py +++ b/packages/data-designer-config/src/data_designer/config/processors.py @@ -12,11 +12,8 @@ from typing_extensions import TypeAlias from data_designer.config.base import ConfigBase -from data_designer.config.dataset_builders import BuildStage from data_designer.config.errors import InvalidConfigError -SUPPORTED_STAGES = [BuildStage.POST_BATCH] - class ProcessorType(str, Enum): """Enumeration of available processor types. @@ -33,33 +30,22 @@ class ProcessorType(str, Enum): class ProcessorConfig(ConfigBase, ABC): """Abstract base class for all processor configuration types. - Processors are transformations that run before or after columns are generated. - They can modify, reshape, or augment the dataset before it's saved. + Processors are transformations that run at different stages of the generation + pipeline. They can modify, reshape, or augment the dataset. + + The processor implementation determines which stages it handles by overriding + the appropriate callback methods (preprocess, process_after_batch, postprocess). Attributes: name: Unique name of the processor, used to identify the processor in results and to name output artifacts on disk. - build_stage: The stage at which the processor runs. Currently only `POST_BATCH` - is supported, meaning processors run after each batch of columns is generated. """ name: str = Field( description="The name of the processor, used to identify the processor in the results and to write the artifacts to disk.", ) - build_stage: BuildStage = Field( - default=BuildStage.POST_BATCH, - description=f"The stage at which the processor will run. Supported stages: {', '.join(SUPPORTED_STAGES)}", - ) processor_type: str - @field_validator("build_stage") - def validate_build_stage(cls, v: BuildStage) -> BuildStage: - if v not in SUPPORTED_STAGES: - raise ValueError( - f"Invalid dataset builder stage: {v}. Only these stages are supported: {', '.join(SUPPORTED_STAGES)}" - ) - return v - def get_processor_config_from_kwargs(processor_type: ProcessorType, **kwargs: Any) -> ProcessorConfig: """Create a processor configuration from a processor type and keyword arguments. diff --git a/packages/data-designer-config/tests/config/test_processors.py b/packages/data-designer-config/tests/config/test_processors.py index b18814e6..e688be15 100644 --- a/packages/data-designer-config/tests/config/test_processors.py +++ b/packages/data-designer-config/tests/config/test_processors.py @@ -4,7 +4,6 @@ import pytest from pydantic import ValidationError -from data_designer.config.dataset_builders import BuildStage from data_designer.config.errors import InvalidConfigError from data_designer.config.processors import ( DropColumnsProcessorConfig, @@ -16,92 +15,64 @@ def test_drop_columns_processor_config_creation(): - config = DropColumnsProcessorConfig( - name="drop_columns_processor", build_stage=BuildStage.POST_BATCH, column_names=["col1", "col2"] - ) + config = DropColumnsProcessorConfig(name="drop_columns_processor", column_names=["col1", "col2"]) - assert config.build_stage == BuildStage.POST_BATCH assert config.column_names == ["col1", "col2"] assert config.processor_type == ProcessorType.DROP_COLUMNS assert isinstance(config, ProcessorConfig) def test_drop_columns_processor_config_validation(): - # Test unsupported stage raises error - with pytest.raises(ValidationError, match="Invalid dataset builder stage"): - DropColumnsProcessorConfig( - name="drop_columns_processor", build_stage=BuildStage.PRE_BATCH, column_names=["col1"] - ) - # Test missing required field raises error with pytest.raises(ValidationError, match="Field required"): - DropColumnsProcessorConfig(name="drop_columns_processor", build_stage=BuildStage.POST_BATCH) + DropColumnsProcessorConfig(name="drop_columns_processor") def test_drop_columns_processor_config_serialization(): - config = DropColumnsProcessorConfig( - name="drop_columns_processor", build_stage=BuildStage.POST_BATCH, column_names=["col1", "col2"] - ) + config = DropColumnsProcessorConfig(name="drop_columns_processor", column_names=["col1", "col2"]) # Serialize to dict config_dict = config.model_dump() - assert config_dict["build_stage"] == "post_batch" assert config_dict["column_names"] == ["col1", "col2"] # Deserialize from dict config_restored = DropColumnsProcessorConfig.model_validate(config_dict) - assert config_restored.build_stage == config.build_stage assert config_restored.column_names == config.column_names def test_schema_transform_processor_config_creation(): config = SchemaTransformProcessorConfig( name="output_format_processor", - build_stage=BuildStage.POST_BATCH, template={"text": "{{ col1 }}"}, ) - assert config.build_stage == BuildStage.POST_BATCH assert config.template == {"text": "{{ col1 }}"} assert config.processor_type == ProcessorType.SCHEMA_TRANSFORM assert isinstance(config, ProcessorConfig) def test_schema_transform_processor_config_validation(): - # Test unsupported stage raises error - with pytest.raises(ValidationError, match="Invalid dataset builder stage"): - SchemaTransformProcessorConfig( - name="schema_transform_processor", - build_stage=BuildStage.PRE_BATCH, - template={"text": "{{ col1 }}"}, - ) - # Test missing required field raises error with pytest.raises(ValidationError, match="Field required"): - SchemaTransformProcessorConfig(name="schema_transform_processor", build_stage=BuildStage.POST_BATCH) + SchemaTransformProcessorConfig(name="schema_transform_processor") # Test invalid template raises error with pytest.raises(InvalidConfigError, match="Template must be JSON serializable"): - SchemaTransformProcessorConfig( - name="schema_transform_processor", build_stage=BuildStage.POST_BATCH, template={"text": {1, 2, 3}} - ) + SchemaTransformProcessorConfig(name="schema_transform_processor", template={"text": {1, 2, 3}}) def test_schema_transform_processor_config_serialization(): config = SchemaTransformProcessorConfig( name="schema_transform_processor", - build_stage=BuildStage.POST_BATCH, template={"text": "{{ col1 }}"}, ) # Serialize to dict config_dict = config.model_dump() - assert config_dict["build_stage"] == "post_batch" assert config_dict["template"] == {"text": "{{ col1 }}"} # Deserialize from dict config_restored = SchemaTransformProcessorConfig.model_validate(config_dict) - assert config_restored.build_stage == config.build_stage assert config_restored.template == config.template @@ -110,7 +81,6 @@ def test_get_processor_config_from_kwargs(): config_drop_columns = get_processor_config_from_kwargs( ProcessorType.DROP_COLUMNS, name="drop_columns_processor", - build_stage=BuildStage.POST_BATCH, column_names=["col1"], ) assert isinstance(config_drop_columns, DropColumnsProcessorConfig) @@ -120,7 +90,6 @@ def test_get_processor_config_from_kwargs(): config_schema_transform = get_processor_config_from_kwargs( ProcessorType.SCHEMA_TRANSFORM, name="output_format_processor", - build_stage=BuildStage.POST_BATCH, template={"text": "{{ col1 }}"}, ) assert isinstance(config_schema_transform, SchemaTransformProcessorConfig) @@ -134,6 +103,6 @@ class UnknownProcessorType(str, Enum): UNKNOWN = "unknown" result = get_processor_config_from_kwargs( - UnknownProcessorType.UNKNOWN, name="unknown_processor", build_stage=BuildStage.POST_BATCH, column_names=["col1"] + UnknownProcessorType.UNKNOWN, name="unknown_processor", column_names=["col1"] ) assert result is None diff --git a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py index 6434ba76..9b4f1f4f 100644 --- a/packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py +++ b/packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py @@ -16,12 +16,8 @@ from data_designer.config.column_types import ColumnConfigT from data_designer.config.config_builder import BuilderConfig from data_designer.config.data_designer_config import DataDesignerConfig -from data_designer.config.dataset_builders import BuildStage -from data_designer.config.processors import ( - DropColumnsProcessorConfig, - ProcessorConfig, - ProcessorType, -) +from data_designer.config.processors import DropColumnsProcessorConfig, ProcessorConfig, ProcessorType +from data_designer.config.seed_source import DataFrameSeedSource from data_designer.engine.column_generators.generators.base import ( ColumnGenerator, ColumnGeneratorWithModel, @@ -29,7 +25,7 @@ ) from data_designer.engine.column_generators.utils.generator_classification import column_type_is_model_generated from data_designer.engine.compiler import compile_data_designer_config -from data_designer.engine.dataset_builders.artifact_storage import SDG_CONFIG_FILENAME, ArtifactStorage +from data_designer.engine.dataset_builders.artifact_storage import SDG_CONFIG_FILENAME, ArtifactStorage, BatchStage from data_designer.engine.dataset_builders.errors import DatasetGenerationError, DatasetProcessingError from data_designer.engine.dataset_builders.multi_column_configs import MultiColumnConfig from data_designer.engine.dataset_builders.utils.concurrency import ConcurrentThreadExecutor @@ -41,6 +37,8 @@ from data_designer.engine.processing.processors.drop_columns import DropColumnsProcessor from data_designer.engine.registry.data_designer_registry import DataDesignerRegistry from data_designer.engine.resources.resource_provider import ResourceProvider +from data_designer.engine.resources.seed_reader import DataFrameSeedReader +from data_designer.engine.secret_resolver import PlaintextResolver from data_designer.lazy_heavy_imports import pd if TYPE_CHECKING: @@ -68,9 +66,7 @@ def __init__( self._data_designer_config = compile_data_designer_config(data_designer_config, resource_provider) self._column_configs = compile_dataset_builder_column_configs(self._data_designer_config) - self._processors: dict[BuildStage, list[Processor]] = self._initialize_processors( - self._data_designer_config.processors or [] - ) + self._processors: list[Processor] = self._initialize_processors(self._data_designer_config.processors or []) self._validate_column_configs() @property @@ -99,6 +95,7 @@ def build( ) -> Path: self._run_model_health_check_if_needed() self._run_mcp_tool_check_if_needed() + self._run_pre_generation_processors() self._write_builder_config() generators = self._initialize_generators() start_time = time.perf_counter() @@ -109,14 +106,14 @@ def build( for batch_idx in range(self.batch_manager.num_batches): logger.info(f"โณ Processing batch {batch_idx + 1} of {self.batch_manager.num_batches}") self._run_batch(generators, batch_mode="batch", group_id=group_id) - df_batch = self._run_processors( - stage=BuildStage.POST_BATCH, + df_batch = self._run_post_batch_processors( dataframe=self.batch_manager.get_current_batch(as_dataframe=True), - current_batch_number=batch_idx, + batch_number=batch_idx, ) self._write_processed_batch(df_batch) self.batch_manager.finish_batch(on_batch_complete) self.batch_manager.finish() + self._run_post_generation_processors() model_usage_stats = self._resource_provider.model_registry.get_model_usage_stats( time.perf_counter() - start_time @@ -128,6 +125,7 @@ def build( def build_preview(self, *, num_records: int) -> pd.DataFrame: self._run_model_health_check_if_needed() self._run_mcp_tool_check_if_needed() + self._run_pre_generation_processors() generators = self._initialize_generators() group_id = uuid.uuid4().hex @@ -145,11 +143,11 @@ def build_preview(self, *, num_records: int) -> pd.DataFrame: return dataset def process_preview(self, dataset: pd.DataFrame) -> pd.DataFrame: - return self._run_processors( - stage=BuildStage.POST_BATCH, + df = self._run_post_batch_processors( dataframe=dataset.copy(), - current_batch_number=None, # preview mode does not have a batch number + batch_number=None, # preview mode does not have a batch number ) + return self._run_post_generation_processors_on_df(df) def _initialize_generators(self) -> list[ColumnGenerator]: return [ @@ -292,20 +290,20 @@ def _validate_column_configs(self) -> None: ).can_generate_from_scratch: raise DatasetGenerationError("๐Ÿ›‘ The first column config must be a from-scratch column generator.") - def _initialize_processors(self, processor_configs: list[ProcessorConfig]) -> dict[BuildStage, list[Processor]]: + def _initialize_processors(self, processor_configs: list[ProcessorConfig]) -> list[Processor]: # Check columns marked for drop columns_to_drop = [config.name for config in self.single_column_configs if config.drop] - processors: dict[BuildStage, list[Processor]] = {stage: [] for stage in BuildStage} + processors: list[Processor] = [] for config in processor_configs: - processors[config.build_stage].append( + processors.append( self._registry.processors.get_for_config_type(type(config))( config=config, resource_provider=self._resource_provider, ) ) - # Manually included "drop columns" processor takes precedence (can e.g., pick stages other than post-batch) + # Manually included "drop columns" processor takes precedence if config.processor_type == ProcessorType.DROP_COLUMNS: for column in config.column_names: if column in columns_to_drop: @@ -313,12 +311,11 @@ def _initialize_processors(self, processor_configs: list[ProcessorConfig]) -> di # If there are still columns marked for drop, add the "drop columns" processor to drop them if len(columns_to_drop) > 0: - processors[BuildStage.POST_BATCH].append( # as post-batch by default + processors.append( DropColumnsProcessor( config=DropColumnsProcessorConfig( name="default_drop_columns_processor", column_names=columns_to_drop, - build_stage=BuildStage.POST_BATCH, ), resource_provider=self._resource_provider, ) @@ -326,18 +323,79 @@ def _initialize_processors(self, processor_configs: list[ProcessorConfig]) -> di return processors - def _run_processors( - self, stage: BuildStage, dataframe: pd.DataFrame, current_batch_number: int | None = None - ) -> pd.DataFrame: - for processor in self._processors[stage]: + def _run_pre_generation_processors(self) -> None: + """Run preprocess() on all processors for the full seed dataset.""" + if not self._processors: + return + if self._resource_provider.seed_reader is None: + return + + logger.info("โณ Running preprocess on seed data...") + seed_reader = self._resource_provider.seed_reader + conn = seed_reader.create_duckdb_connection() + df = conn.execute(f"SELECT * FROM '{seed_reader.get_dataset_uri()}'").fetchdf() + + original_len = len(df) + for processor in self._processors: try: - dataframe = processor.process(dataframe, current_batch_number=current_batch_number) + df = processor.preprocess(df) + except Exception as e: + raise DatasetProcessingError(f"๐Ÿ›‘ Failed in preprocess for processor {processor.name}: {e}") from e + + if len(df) != original_len: + new_source = DataFrameSeedSource(df=df) + new_reader = DataFrameSeedReader() + new_reader.attach(new_source, PlaintextResolver()) + self._resource_provider.seed_reader = new_reader + logger.info(f"โœ… Preprocess complete. Seed data now has {len(df)} rows.") + + def _run_post_batch_processors(self, dataframe: pd.DataFrame, batch_number: int | None) -> pd.DataFrame: + """Run process_after_batch() on all processors for a batch.""" + for processor in self._processors: + try: + if batch_number is not None: + dataframe = processor.process_after_batch(dataframe, batch_number=batch_number) + else: + # Preview mode - still call but with batch_number=0 + dataframe = processor.process_after_batch(dataframe, batch_number=0) except Exception as e: raise DatasetProcessingError( - f"๐Ÿ›‘ Failed to process dataset with processor {processor.name} in stage {stage}: {e}" + f"๐Ÿ›‘ Failed in process_after_batch for processor {processor.name}: {e}" ) from e return dataframe + def _run_post_generation_processors_on_df(self, df: pd.DataFrame) -> pd.DataFrame: + """Run postprocess() on all processors for a dataframe.""" + for processor in self._processors: + try: + df = processor.postprocess(df) + except Exception as e: + raise DatasetProcessingError(f"๐Ÿ›‘ Failed in postprocess for processor {processor.name}: {e}") from e + return df + + def _run_post_generation_processors(self) -> None: + """Run postprocess() on all processors for the final combined dataset.""" + if not self._processors: + return + + logger.info("โณ Running postprocess on final dataset...") + df = self.artifact_storage.load_dataset() + + original_len = len(df) + df = self._run_post_generation_processors_on_df(df) + + if len(df) != original_len: + # Rewrite the final dataset as a single file + import shutil + + shutil.rmtree(self.artifact_storage.final_dataset_path) + self.artifact_storage.write_batch_to_parquet_file( + batch_number=0, + dataframe=df, + batch_stage=BatchStage.FINAL_RESULT, + ) + logger.info(f"โœ… Postprocess complete. Final dataset has {len(df)} rows.") + def _worker_error_callback(self, exc: Exception, *, context: dict | None = None) -> None: """If a worker fails, we can handle the exception here.""" logger.warning( diff --git a/packages/data-designer-engine/src/data_designer/engine/processing/processors/base.py b/packages/data-designer-engine/src/data_designer/engine/processing/processors/base.py index 8dd47132..ee6731b6 100644 --- a/packages/data-designer-engine/src/data_designer/engine/processing/processors/base.py +++ b/packages/data-designer-engine/src/data_designer/engine/processing/processors/base.py @@ -3,11 +3,54 @@ from __future__ import annotations -from abc import ABC, abstractmethod +from abc import ABC from data_designer.engine.configurable_task import ConfigurableTask, DataT, TaskConfigT class Processor(ConfigurableTask[TaskConfigT], ABC): - @abstractmethod - def process(self, data: DataT, *, current_batch_number: int | None = None) -> DataT: ... + """Base class for dataset processors. + + Processors transform data at different stages of the generation pipeline. + Override the callback methods for the stages you want to handle. + """ + + def preprocess(self, data: DataT) -> DataT: + """Called at PRE_GENERATION stage on seed data before batching. + + Override to filter or transform seed data before generation begins. + + Args: + data: The full seed dataset. + + Returns: + Transformed seed dataset. + """ + return data + + def process_after_batch(self, data: DataT, *, batch_number: int) -> DataT: + """Called at POST_BATCH stage after each batch is generated. + + Override to process each batch of generated data. + + Args: + data: The generated batch data. + batch_number: The current batch number (0-indexed). + + Returns: + Transformed batch data. + """ + return data + + def postprocess(self, data: DataT) -> DataT: + """Called at POST_GENERATION stage on the final combined dataset. + + Override to transform the complete generated dataset. + + Args: + data: The final combined dataset. + + Returns: + Transformed final dataset. + """ + return data diff --git a/packages/data-designer-engine/src/data_designer/engine/processing/processors/drop_columns.py b/packages/data-designer-engine/src/data_designer/engine/processing/processors/drop_columns.py index 98369a6b..45d07c47 100644 --- a/packages/data-designer-engine/src/data_designer/engine/processing/processors/drop_columns.py +++ b/packages/data-designer-engine/src/data_designer/engine/processing/processors/drop_columns.py @@ -18,10 +18,14 @@ class DropColumnsProcessor(Processor[DropColumnsProcessorConfig]): - def process(self, data: pd.DataFrame, *, current_batch_number: int | None = None) -> pd.DataFrame: + """Drops specified columns from the dataset after each batch.""" + + def process_after_batch(self, data: pd.DataFrame, *, batch_number: int) -> pd.DataFrame: logger.info(f"๐Ÿ™ˆ Dropping columns: {self.config.column_names}") - if current_batch_number is not None: # not in preview mode - self._save_dropped_columns_if_needed(data, current_batch_number) + self._save_dropped_columns(data, batch_number) + return self._drop_columns(data) + + def _drop_columns(self, data: pd.DataFrame) -> pd.DataFrame: for column in self.config.column_names: if column in data.columns: data.drop(columns=[column], inplace=True) @@ -29,14 +33,19 @@ def process(self, data: pd.DataFrame, *, current_batch_number: int | None = None logger.warning(f"โš ๏ธ Cannot drop column: `{column}` not found in the dataset.") return data - def _save_dropped_columns_if_needed(self, data: pd.DataFrame, current_batch_number: int) -> None: + def _save_dropped_columns(self, data: pd.DataFrame, batch_number: int) -> None: + # Only save columns that actually exist + existing_columns = [col for col in self.config.column_names if col in data.columns] + if not existing_columns: + return + logger.debug("๐Ÿ“ฆ Saving dropped columns to dropped-columns directory") dropped_column_parquet_file_name = self.artifact_storage.create_batch_file_path( - batch_number=current_batch_number, + batch_number=batch_number, batch_stage=BatchStage.DROPPED_COLUMNS, ).name self.artifact_storage.write_parquet_file( parquet_file_name=dropped_column_parquet_file_name, - dataframe=data[self.config.column_names], + dataframe=data[existing_columns], batch_stage=BatchStage.DROPPED_COLUMNS, ) diff --git a/packages/data-designer-engine/src/data_designer/engine/processing/processors/schema_transform.py b/packages/data-designer-engine/src/data_designer/engine/processing/processors/schema_transform.py index b84339e6..ed97ee53 100644 --- a/packages/data-designer-engine/src/data_designer/engine/processing/processors/schema_transform.py +++ b/packages/data-designer-engine/src/data_designer/engine/processing/processors/schema_transform.py @@ -41,11 +41,23 @@ def escape_for_json_string(s: str) -> str: class SchemaTransformProcessor(WithJinja2UserTemplateRendering, Processor[SchemaTransformProcessorConfig]): + """Transforms dataset schema using Jinja2 templates after each batch.""" + @property def template_as_str(self) -> str: return json.dumps(self.config.template) - def process(self, data: pd.DataFrame, *, current_batch_number: int | None = None) -> pd.DataFrame: + def process_after_batch(self, data: pd.DataFrame, *, batch_number: int) -> pd.DataFrame: + formatted_data = self._transform(data) + self.artifact_storage.write_batch_to_parquet_file( + batch_number=batch_number, + dataframe=formatted_data, + batch_stage=BatchStage.PROCESSORS_OUTPUTS, + subfolder=self.config.name, + ) + return data + + def _transform(self, data: pd.DataFrame) -> pd.DataFrame: self.prepare_jinja2_template_renderer(self.template_as_str, data.columns.to_list()) formatted_records = [] for record in data.to_dict(orient="records"): @@ -53,19 +65,4 @@ def process(self, data: pd.DataFrame, *, current_batch_number: int | None = None escaped = _json_escape_record(deserialized) rendered = self.render_template(escaped) formatted_records.append(json.loads(rendered)) - formatted_data = pd.DataFrame(formatted_records) - if current_batch_number is not None: - self.artifact_storage.write_batch_to_parquet_file( - batch_number=current_batch_number, - dataframe=formatted_data, - batch_stage=BatchStage.PROCESSORS_OUTPUTS, - subfolder=self.config.name, - ) - else: - self.artifact_storage.write_parquet_file( - parquet_file_name=f"{self.config.name}.parquet", - dataframe=formatted_data, - batch_stage=BatchStage.PROCESSORS_OUTPUTS, - ) - - return data + return pd.DataFrame(formatted_records) diff --git a/packages/data-designer-engine/tests/engine/dataset_builders/test_column_wise_builder.py b/packages/data-designer-engine/tests/engine/dataset_builders/test_column_wise_builder.py index 90291336..5b14c71f 100644 --- a/packages/data-designer-engine/tests/engine/dataset_builders/test_column_wise_builder.py +++ b/packages/data-designer-engine/tests/engine/dataset_builders/test_column_wise_builder.py @@ -10,7 +10,6 @@ from data_designer.config.column_configs import LLMTextColumnConfig, SamplerColumnConfig from data_designer.config.config_builder import DataDesignerConfigBuilder -from data_designer.config.dataset_builders import BuildStage from data_designer.config.processors import DropColumnsProcessorConfig from data_designer.config.run_config import RunConfig from data_designer.config.sampler_params import SamplerType, UUIDSamplerParams @@ -37,11 +36,7 @@ def stub_test_column_configs(): @pytest.fixture def stub_test_processor_configs(): - return [ - DropColumnsProcessorConfig( - name="drop_columns_processor", build_stage=BuildStage.POST_BATCH, column_names=["column_to_drop"] - ) - ] + return [DropColumnsProcessorConfig(name="drop_columns_processor", column_names=["column_to_drop"])] @pytest.fixture @@ -52,7 +47,6 @@ def stub_test_config_builder(stub_test_column_configs, stub_model_configs): config_builder.add_processor( processor_type="drop_columns", name="drop_columns_processor", - build_stage=BuildStage.POST_BATCH, column_names=["column_to_drop"], ) return config_builder @@ -169,6 +163,7 @@ def test_column_wise_dataset_builder_build_method_basic_flow( stub_resource_provider, ): stub_resource_provider.run_config = RunConfig(buffer_size=50) + stub_resource_provider.seed_reader = None # No seed data for this basic flow test stub_resource_provider.model_registry.run_health_check = Mock() stub_resource_provider.model_registry.get_model_usage_stats = Mock(return_value={"test": "stats"}) stub_resource_provider.model_registry.models = {} @@ -182,6 +177,7 @@ def test_column_wise_dataset_builder_build_method_basic_flow( stub_batch_manager.iter_current_batch.return_value = [(0, {"test": "data"})] stub_column_wise_builder.batch_manager = stub_batch_manager + stub_column_wise_builder._processors = [] # No processors for basic flow test result_path = stub_column_wise_builder.build(num_records=100) @@ -232,12 +228,9 @@ def test_column_wise_dataset_builder_validate_column_configs( def test_column_wise_dataset_builder_initialize_processors(stub_column_wise_builder): processors = stub_column_wise_builder._processors - assert processors.keys() == set(BuildStage) - assert len(processors[BuildStage.PRE_BATCH]) == 0 - assert len(processors[BuildStage.POST_BATCH]) == 1 - assert len(processors[BuildStage.PRE_GENERATION]) == 0 - assert len(processors[BuildStage.POST_GENERATION]) == 0 - assert processors[BuildStage.POST_BATCH][0].config.column_names == ["column_to_drop"] + assert isinstance(processors, list) + assert len(processors) == 1 + assert processors[0].config.column_names == ["column_to_drop"] def test_run_config_default_non_inference_max_parallel_workers() -> None: @@ -388,3 +381,180 @@ def test_fan_out_with_threads_uses_early_shutdown_settings_from_resource_provide assert call_kwargs["shutdown_error_rate"] == expected_rate assert call_kwargs["shutdown_error_window"] == shutdown_error_window assert call_kwargs["disable_early_shutdown"] == disable_early_shutdown + + +def test_run_pre_generation_processors_filters_seed_data(stub_resource_provider, stub_model_configs, tmp_path): + """Test that PRE_GENERATION processors are applied to seed data before generation.""" + from data_designer.config.seed_source import DataFrameSeedSource, LocalFileSeedSource + from data_designer.engine.processing.processors.base import Processor + from data_designer.engine.resources.seed_reader import DataFrameSeedReader + + # Set up seed reader with test data + seed_df = pd.DataFrame({"seed_id": [1, 2, 3, 4, 5], "value": ["a", "b", "c", "d", "e"]}) + seed_source = DataFrameSeedSource(df=seed_df) + seed_reader = DataFrameSeedReader() + seed_reader.attach(seed_source, Mock()) + stub_resource_provider.seed_reader = seed_reader + + # Create a mock processor that filters rows during preprocess + mock_processor = Mock(spec=Processor) + mock_processor.name = "filter_processor" + mock_processor.preprocess.return_value = seed_df[seed_df["seed_id"] > 2] + + # Write seed file to tmp_path + seed_path = tmp_path / "seed.parquet" + seed_df.to_parquet(seed_path, index=False) + + config_builder = DataDesignerConfigBuilder(model_configs=stub_model_configs) + config_builder.with_seed_dataset(LocalFileSeedSource(path=str(seed_path))) + config_builder.add_column(SamplerColumnConfig(name="uuid", sampler_type="uuid", params=UUIDSamplerParams())) + + builder = ColumnWiseDatasetBuilder( + data_designer_config=config_builder.build(), + resource_provider=stub_resource_provider, + ) + builder._processors = [mock_processor] + + builder._run_pre_generation_processors() + + # Verify preprocess was called + mock_processor.preprocess.assert_called_once() + + # Verify seed reader was replaced + new_seed_reader = stub_resource_provider.seed_reader + assert isinstance(new_seed_reader, DataFrameSeedReader) + + # Verify the new seed data has fewer rows + conn = new_seed_reader.create_duckdb_connection() + result_df = conn.execute(f"SELECT * FROM '{new_seed_reader.get_dataset_uri()}'").fetchdf() + assert len(result_df) == 3 + + +def test_run_post_generation_processors_modifies_final_dataset(stub_resource_provider, stub_model_configs): + """Test that postprocess callbacks are applied to the final dataset.""" + from data_designer.engine.processing.processors.base import Processor + + # Create test parquet files + final_df = pd.DataFrame({"id": [1, 2, 3, 4, 5], "value": ["a", "b", "c", "d", "e"]}) + stub_resource_provider.artifact_storage.mkdir_if_needed(stub_resource_provider.artifact_storage.final_dataset_path) + final_df.to_parquet(stub_resource_provider.artifact_storage.final_dataset_path / "batch_00000.parquet", index=False) + + # Create a mock processor that filters rows during postprocess + mock_processor = Mock(spec=Processor) + mock_processor.name = "dedup_processor" + mock_processor.postprocess.return_value = final_df[final_df["id"] > 2] + + config_builder = DataDesignerConfigBuilder(model_configs=stub_model_configs) + config_builder.add_column(SamplerColumnConfig(name="id", sampler_type="uuid", params=UUIDSamplerParams())) + + builder = ColumnWiseDatasetBuilder( + data_designer_config=config_builder.build(), + resource_provider=stub_resource_provider, + ) + builder._processors = [mock_processor] + + builder._run_post_generation_processors() + + # Verify postprocess was called + mock_processor.postprocess.assert_called_once() + + # Verify final dataset was rewritten with fewer rows + result_df = stub_resource_provider.artifact_storage.load_dataset() + assert len(result_df) == 3 + + +def test_run_pre_generation_processors_skips_when_no_seed_reader(stub_resource_provider, stub_model_configs): + """Test that preprocess is skipped when no seed reader is configured.""" + from data_designer.engine.processing.processors.base import Processor + + stub_resource_provider.seed_reader = None + + mock_processor = Mock(spec=Processor) + mock_processor.name = "filter_processor" + + config_builder = DataDesignerConfigBuilder(model_configs=stub_model_configs) + config_builder.add_column(SamplerColumnConfig(name="id", sampler_type="uuid", params=UUIDSamplerParams())) + + builder = ColumnWiseDatasetBuilder( + data_designer_config=config_builder.build(), + resource_provider=stub_resource_provider, + ) + builder._processors = [mock_processor] + + builder._run_pre_generation_processors() + + # Preprocess should not be called when no seed reader + mock_processor.preprocess.assert_not_called() + + +def test_build_preview_runs_pre_generation_processors(stub_resource_provider, stub_model_configs, tmp_path): + """Test that build_preview runs PRE_GENERATION processors.""" + from data_designer.config.seed_source import DataFrameSeedSource, LocalFileSeedSource + from data_designer.engine.resources.seed_reader import DataFrameSeedReader + + # Set up seed reader with test data + seed_df = pd.DataFrame({"seed_id": [1, 2, 3, 4, 5], "text": ["a", "b", "c", "d", "e"]}) + seed_source = DataFrameSeedSource(df=seed_df) + seed_reader = DataFrameSeedReader() + seed_reader.attach(seed_source, Mock()) + stub_resource_provider.seed_reader = seed_reader + + # Write seed file to tmp_path + seed_path = tmp_path / "seed.parquet" + seed_df.to_parquet(seed_path, index=False) + + config_builder = DataDesignerConfigBuilder(model_configs=stub_model_configs) + config_builder.with_seed_dataset(LocalFileSeedSource(path=str(seed_path))) + config_builder.add_column(SamplerColumnConfig(name="uuid", sampler_type="uuid", params=UUIDSamplerParams())) + + builder = ColumnWiseDatasetBuilder( + data_designer_config=config_builder.build(), + resource_provider=stub_resource_provider, + ) + + # Mock everything to isolate the test + builder._run_model_health_check_if_needed = Mock() + builder._run_mcp_tool_check_if_needed = Mock() + builder._run_pre_generation_processors = Mock() + builder._initialize_generators = Mock(return_value=[]) + builder.batch_manager.start = Mock() + builder._run_batch = Mock() + builder.batch_manager.get_current_batch = Mock(return_value=pd.DataFrame()) + builder.batch_manager.reset = Mock() + builder._resource_provider.model_registry.get_model_usage_stats = Mock(return_value={}) + + builder.build_preview(num_records=5) + + builder._run_pre_generation_processors.assert_called_once() + + +def test_process_preview_runs_both_callbacks(stub_resource_provider, stub_model_configs): + """Test that process_preview runs process_after_batch and postprocess callbacks.""" + from data_designer.engine.processing.processors.base import Processor + + config_builder = DataDesignerConfigBuilder(model_configs=stub_model_configs) + config_builder.add_column(SamplerColumnConfig(name="id", sampler_type="uuid", params=UUIDSamplerParams())) + + builder = ColumnWiseDatasetBuilder( + data_designer_config=config_builder.build(), + resource_provider=stub_resource_provider, + ) + + # Create a mock processor with both callbacks + mock_processor = Mock(spec=Processor) + mock_processor.name = "test_processor" + mock_processor.process_after_batch.side_effect = lambda df, **kwargs: df.assign(post_batch_applied=True) + mock_processor.postprocess.side_effect = lambda df: df.assign(post_gen_applied=True) + + builder._processors = [mock_processor] + + input_df = pd.DataFrame({"id": [1, 2, 3]}) + result = builder.process_preview(input_df) + + # Both callbacks should have been called + mock_processor.process_after_batch.assert_called_once() + mock_processor.postprocess.assert_called_once() + + # Result should have both columns added + assert "post_batch_applied" in result.columns + assert "post_gen_applied" in result.columns diff --git a/packages/data-designer-engine/tests/engine/processing/processors/test_drop_columns.py b/packages/data-designer-engine/tests/engine/processing/processors/test_drop_columns.py index 53da3e4a..7ebf68df 100644 --- a/packages/data-designer-engine/tests/engine/processing/processors/test_drop_columns.py +++ b/packages/data-designer-engine/tests/engine/processing/processors/test_drop_columns.py @@ -8,7 +8,6 @@ import pytest -from data_designer.config.dataset_builders import BuildStage from data_designer.config.processors import DropColumnsProcessorConfig from data_designer.engine.dataset_builders.artifact_storage import BatchStage from data_designer.engine.processing.processors.drop_columns import DropColumnsProcessor @@ -20,9 +19,7 @@ @pytest.fixture def stub_processor_config(): - return DropColumnsProcessorConfig( - name="drop_columns_processor", build_stage=BuildStage.POST_BATCH, column_names=["col1", "col2"] - ) + return DropColumnsProcessorConfig(name="drop_columns_processor", column_names=["col1", "col2"]) @pytest.fixture @@ -84,34 +81,34 @@ def stub_empty_dataframe(): ), ], ) -def test_process_scenarios( +def test_process_after_batch_scenarios( stub_processor, stub_sample_dataframe, test_case, column_names, expected_result, expected_warning ): stub_processor.config.column_names = column_names if expected_warning: with patch("data_designer.engine.processing.processors.drop_columns.logger") as mock_logger: - result = stub_processor.process(stub_sample_dataframe.copy()) + result = stub_processor.process_after_batch(stub_sample_dataframe.copy(), batch_number=0) pd.testing.assert_frame_equal(result, pd.DataFrame(expected_result)) mock_logger.warning.assert_called_once_with(expected_warning) else: - result = stub_processor.process(stub_sample_dataframe.copy()) + result = stub_processor.process_after_batch(stub_sample_dataframe.copy(), batch_number=0) pd.testing.assert_frame_equal(result, pd.DataFrame(expected_result)) -def test_process_logging(stub_processor, stub_sample_dataframe): +def test_process_after_batch_logging(stub_processor, stub_sample_dataframe): with patch("data_designer.engine.processing.processors.drop_columns.logger") as mock_logger: - stub_processor.process(stub_sample_dataframe.copy()) + stub_processor.process_after_batch(stub_sample_dataframe.copy(), batch_number=0) mock_logger.info.assert_called_once_with("๐Ÿ™ˆ Dropping columns: ['col1', 'col2']") -def test_save_dropped_columns_without_preview(stub_processor, stub_sample_dataframe): +def test_save_dropped_columns(stub_processor, stub_sample_dataframe): stub_processor.config.column_names = ["col1", "col2"] with patch("data_designer.engine.processing.processors.drop_columns.logger") as mock_logger: - stub_processor.process(stub_sample_dataframe.copy(), current_batch_number=0) + stub_processor.process_after_batch(stub_sample_dataframe.copy(), batch_number=0) stub_processor.artifact_storage.write_parquet_file.assert_called_once() call_args = stub_processor.artifact_storage.write_parquet_file.call_args @@ -126,24 +123,19 @@ def test_save_dropped_columns_without_preview(stub_processor, stub_sample_datafr mock_logger.debug.assert_called_once_with("๐Ÿ“ฆ Saving dropped columns to dropped-columns directory") -def test_save_dropped_columns_with_preview(stub_processor, stub_sample_dataframe): - stub_processor.config.column_names = ["col1", "col2"] - - stub_processor.process(stub_sample_dataframe.copy()) - stub_processor.artifact_storage.write_parquet_file.assert_not_called() - - def test_save_dropped_columns_with_nonexistent_columns(stub_processor, stub_sample_dataframe): + """When columns don't exist, no file is written but warnings are logged.""" stub_processor.config.column_names = ["nonexistent1", "nonexistent2"] with patch("data_designer.engine.processing.processors.drop_columns.logger"): - with pytest.raises(KeyError): - stub_processor.process(stub_sample_dataframe.copy(), current_batch_number=0) + stub_processor.process_after_batch(stub_sample_dataframe.copy(), batch_number=0) + # No file is written for nonexistent columns + stub_processor.artifact_storage.write_parquet_file.assert_not_called() -def test_process_inplace_modification(stub_processor, stub_sample_dataframe): +def test_process_after_batch_inplace_modification(stub_processor, stub_sample_dataframe): original_df = stub_sample_dataframe.copy() - result = stub_processor.process(original_df) + result = stub_processor.process_after_batch(original_df, batch_number=0) assert result is original_df @@ -152,11 +144,11 @@ def test_process_inplace_modification(stub_processor, stub_sample_dataframe): assert "col3" in result.columns -def test_process_empty_dataframe(stub_processor, stub_empty_dataframe): +def test_process_after_batch_empty_dataframe(stub_processor, stub_empty_dataframe): stub_processor.config.column_names = ["col1"] with patch("data_designer.engine.processing.processors.drop_columns.logger") as mock_logger: - result = stub_processor.process(stub_empty_dataframe) + result = stub_processor.process_after_batch(stub_empty_dataframe, batch_number=0) pd.testing.assert_frame_equal(result, stub_empty_dataframe) mock_logger.warning.assert_called_once_with("โš ๏ธ Cannot drop column: `col1` not found in the dataset.") diff --git a/packages/data-designer-engine/tests/engine/processing/processors/test_schema_transform.py b/packages/data-designer-engine/tests/engine/processing/processors/test_schema_transform.py index 520d67da..fa918a75 100644 --- a/packages/data-designer-engine/tests/engine/processing/processors/test_schema_transform.py +++ b/packages/data-designer-engine/tests/engine/processing/processors/test_schema_transform.py @@ -9,7 +9,6 @@ import pytest -from data_designer.config.dataset_builders import BuildStage from data_designer.config.processors import SchemaTransformProcessorConfig from data_designer.engine.dataset_builders.artifact_storage import BatchStage from data_designer.engine.processing.processors.schema_transform import SchemaTransformProcessor @@ -23,7 +22,6 @@ @pytest.fixture def stub_processor_config() -> SchemaTransformProcessorConfig: return SchemaTransformProcessorConfig( - build_stage=BuildStage.POST_BATCH, template={"text": "{{ col1 }}", "value": "{{ col2 }}"}, name="test_schema_transform", ) @@ -53,18 +51,18 @@ def stub_simple_dataframe() -> pd.DataFrame: ) -def test_process_returns_original_dataframe( +def test_process_after_batch_returns_original_dataframe( stub_processor: SchemaTransformProcessor, stub_sample_dataframe: pd.DataFrame ) -> None: - result = stub_processor.process(stub_sample_dataframe, current_batch_number=0) + result = stub_processor.process_after_batch(stub_sample_dataframe, batch_number=0) pd.testing.assert_frame_equal(result, stub_sample_dataframe) -def test_process_writes_formatted_output_to_parquet( +def test_process_after_batch_writes_formatted_output_to_parquet( stub_processor: SchemaTransformProcessor, stub_sample_dataframe: pd.DataFrame ) -> None: # Process the dataframe - result = stub_processor.process(stub_sample_dataframe, current_batch_number=0) + result = stub_processor.process_after_batch(stub_sample_dataframe, batch_number=0) # Verify the original dataframe is returned pd.testing.assert_frame_equal(result, stub_sample_dataframe) @@ -97,20 +95,7 @@ def test_process_writes_formatted_output_to_parquet( assert json.loads(actual) == json.loads(expected), f"Row {i} mismatch: {actual} != {expected}" -def test_process_without_batch_number_does_not_write( - stub_processor: SchemaTransformProcessor, stub_sample_dataframe: pd.DataFrame -) -> None: - # Process without batch number (preview mode) - result = stub_processor.process(stub_sample_dataframe, current_batch_number=None) - - # Verify the original dataframe is returned - pd.testing.assert_frame_equal(result, stub_sample_dataframe) - - # Verify write_batch_to_parquet_file was NOT called - stub_processor.artifact_storage.write_batch_to_parquet_file.assert_not_called() - - -def test_process_with_json_serialized_values(stub_processor: SchemaTransformProcessor) -> None: +def test_process_after_batch_with_json_serialized_values(stub_processor: SchemaTransformProcessor) -> None: # Test with JSON-serialized values in dataframe df_with_json = pd.DataFrame( { @@ -120,7 +105,7 @@ def test_process_with_json_serialized_values(stub_processor: SchemaTransformProc ) # Process the dataframe - stub_processor.process(df_with_json, current_batch_number=0) + stub_processor.process_after_batch(df_with_json, batch_number=0) written_dataframe: pd.DataFrame = stub_processor.artifact_storage.write_batch_to_parquet_file.call_args.kwargs[ "dataframe" ] @@ -136,7 +121,7 @@ def test_process_with_json_serialized_values(stub_processor: SchemaTransformProc assert first_output["value"] == '{"nested": "value1"}' -def test_process_with_special_characters_in_llm_output(stub_processor: SchemaTransformProcessor) -> None: +def test_process_after_batch_with_special_characters_in_llm_output(stub_processor: SchemaTransformProcessor) -> None: """Test that LLM outputs with special characters are properly escaped for JSON. This addresses GitHub issue #227 where SchemaTransformProcessor fails with JSONDecodeError @@ -155,7 +140,7 @@ def test_process_with_special_characters_in_llm_output(stub_processor: SchemaTra ) # Process should not raise JSONDecodeError - stub_processor.process(df_with_special_chars, current_batch_number=0) + stub_processor.process_after_batch(df_with_special_chars, batch_number=0) written_dataframe: pd.DataFrame = stub_processor.artifact_storage.write_batch_to_parquet_file.call_args.kwargs[ "dataframe" ] @@ -172,7 +157,7 @@ def test_process_with_special_characters_in_llm_output(stub_processor: SchemaTra assert outputs[3]["text"] == "Tab\there" -def test_process_with_mixed_special_characters(stub_processor: SchemaTransformProcessor) -> None: +def test_process_after_batch_with_mixed_special_characters(stub_processor: SchemaTransformProcessor) -> None: """Test complex LLM output with multiple types of special characters.""" df_complex = pd.DataFrame( { @@ -183,7 +168,7 @@ def test_process_with_mixed_special_characters(stub_processor: SchemaTransformPr } ) - stub_processor.process(df_complex, current_batch_number=0) + stub_processor.process_after_batch(df_complex, batch_number=0) written_dataframe: pd.DataFrame = stub_processor.artifact_storage.write_batch_to_parquet_file.call_args.kwargs[ "dataframe" ] diff --git a/packages/data-designer-engine/tests/engine/test_validation.py b/packages/data-designer-engine/tests/engine/test_validation.py index c0cc4bc0..97f795b5 100644 --- a/packages/data-designer-engine/tests/engine/test_validation.py +++ b/packages/data-designer-engine/tests/engine/test_validation.py @@ -12,7 +12,6 @@ Score, ValidationColumnConfig, ) -from data_designer.config.dataset_builders import BuildStage from data_designer.config.models import ImageContext, ModalityDataType from data_designer.config.processors import ( DropColumnsProcessorConfig, @@ -104,12 +103,10 @@ DropColumnsProcessorConfig( name="drop_columns_processor", column_names=["inexistent_column"], - build_stage=BuildStage.POST_BATCH, ), SchemaTransformProcessorConfig( name="schema_transform_processor_invalid_reference", template={"text": "{{ invalid_reference }}"}, - build_stage=BuildStage.POST_BATCH, ), ] ALLOWED_REFERENCE = [c.name for c in COLUMNS] diff --git a/packages/data-designer/tests/interface/test_data_designer.py b/packages/data-designer/tests/interface/test_data_designer.py index 84636e59..692bfc13 100644 --- a/packages/data-designer/tests/interface/test_data_designer.py +++ b/packages/data-designer/tests/interface/test_data_designer.py @@ -12,7 +12,6 @@ from data_designer.config.column_configs import SamplerColumnConfig from data_designer.config.config_builder import DataDesignerConfigBuilder -from data_designer.config.dataset_builders import BuildStage from data_designer.config.errors import InvalidConfigError from data_designer.config.models import ModelProvider from data_designer.config.processors import DropColumnsProcessorConfig @@ -323,11 +322,7 @@ def test_preview_with_dropped_columns( SamplerColumnConfig(name="uniform", sampler_type="uniform", params={"low": 1, "high": 100}) ) - config_builder.add_processor( - DropColumnsProcessorConfig( - name="drop_columns_processor", build_stage=BuildStage.POST_BATCH, column_names=["category"] - ) - ) + config_builder.add_processor(DropColumnsProcessorConfig(name="drop_columns_processor", column_names=["category"])) data_designer = DataDesigner( artifact_path=stub_artifact_path, From 56ccb1532398a0da4cc7600b7f141022321908cd Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Wed, 4 Feb 2026 21:17:30 -0300 Subject: [PATCH 2/2] feat: add processor plugin system Adds support for third-party processor plugins via plugin discovery: - PluginType.PROCESSOR for external processor plugins - ProcessorRegistry discovers and loads processor plugins - processor_types.py with plugin-injected type union - PluginRegistry uses RLock for nested imports Demo processors: - RegexFilterProcessor (preprocess stage) - SemanticDedupProcessor (postprocess stage) --- demo/data_designer_demo_processors/README.md | 61 +++++ .../notebooks/demo_processors.py | 120 +++++++++ .../pyproject.toml | 25 ++ .../data_designer_demo_processors/__init__.py | 4 + .../download_model.py | 19 ++ .../regex_filter/__init__.py | 7 + .../regex_filter/config.py | 20 ++ .../regex_filter/impl.py | 47 ++++ .../regex_filter/plugin.py | 10 + .../semantic_dedup/__init__.py | 7 + .../semantic_dedup/config.py | 28 ++ .../semantic_dedup/impl.py | 74 ++++++ .../semantic_dedup/plugin.py | 10 + .../tests/__init__.py | 2 + .../tests/test_regex_filter.py | 56 ++++ .../tests/test_semantic_dedup.py | 70 +++++ docs/plugins/overview.md | 22 +- .../data_designer/config/config_builder.py | 3 +- .../config/data_designer_config.py | 2 +- .../data_designer/config/processor_types.py | 17 ++ .../src/data_designer/config/processors.py | 4 - .../src/data_designer/plugin_manager.py | 37 +++ .../src/data_designer/plugins/plugin.py | 3 + .../src/data_designer/plugins/registry.py | 2 +- .../engine/processing/processors/registry.py | 11 +- .../src/data_designer/engine/validation.py | 3 +- plans/processor-plugins.md | 246 ++++++++++++++++++ 27 files changed, 898 insertions(+), 12 deletions(-) create mode 100644 demo/data_designer_demo_processors/README.md create mode 100644 demo/data_designer_demo_processors/notebooks/demo_processors.py create mode 100644 demo/data_designer_demo_processors/pyproject.toml create mode 100644 demo/data_designer_demo_processors/src/data_designer_demo_processors/__init__.py create mode 100644 demo/data_designer_demo_processors/src/data_designer_demo_processors/download_model.py create mode 100644 demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/__init__.py create mode 100644 demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/config.py create mode 100644 demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/impl.py create mode 100644 demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/plugin.py create mode 100644 demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/__init__.py create mode 100644 demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/config.py create mode 100644 demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/impl.py create mode 100644 demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/plugin.py create mode 100644 demo/data_designer_demo_processors/tests/__init__.py create mode 100644 demo/data_designer_demo_processors/tests/test_regex_filter.py create mode 100644 demo/data_designer_demo_processors/tests/test_semantic_dedup.py create mode 100644 packages/data-designer-config/src/data_designer/config/processor_types.py create mode 100644 plans/processor-plugins.md diff --git a/demo/data_designer_demo_processors/README.md b/demo/data_designer_demo_processors/README.md new file mode 100644 index 00000000..847ff0ea --- /dev/null +++ b/demo/data_designer_demo_processors/README.md @@ -0,0 +1,61 @@ +# Data Designer Demo Processors + +Demo processor plugins demonstrating PRE_GENERATION and POST_GENERATION stages. + +## Installation + +```bash +uv pip install -e demo/data_designer_demo_processors +``` + +## Processors + +### RegexFilterProcessor (PRE_GENERATION) + +Filters seed data rows based on regex pattern matching. + +```python +from data_designer.config.config_builder import DataDesignerConfigBuilder +from data_designer_demo_processors.regex_filter import RegexFilterProcessorConfig + +builder = DataDesignerConfigBuilder(model_configs=[...]) +builder.add_processor(RegexFilterProcessorConfig( + name="filter_emails", + column="email", + pattern=r"@company\.com$", + invert=False, # Keep only matching rows +)) +``` + +### SemanticDedupProcessor (POST_GENERATION) + +Removes semantically similar rows using sentence embeddings. + +```python +from data_designer_demo_processors.semantic_dedup import SemanticDedupProcessorConfig + +builder.add_processor(SemanticDedupProcessorConfig( + name="dedup_responses", + column="response", + similarity_threshold=0.9, # Remove rows with >90% similarity + model_name="all-MiniLM-L6-v2", +)) +``` + +## Pre-downloading the Embedding Model + +The semantic dedup processor downloads the embedding model on first use. To pre-download: + +```bash +download-semantic-dedup-model +``` + +## Entry Points + +The package registers plugins via entry points: + +```toml +[project.entry-points."data_designer.plugins"] +regex-filter = "data_designer_demo_processors.regex_filter.plugin:regex_filter_plugin" +semantic-dedup = "data_designer_demo_processors.semantic_dedup.plugin:semantic_dedup_plugin" +``` diff --git a/demo/data_designer_demo_processors/notebooks/demo_processors.py b/demo/data_designer_demo_processors/notebooks/demo_processors.py new file mode 100644 index 00000000..5ce55641 --- /dev/null +++ b/demo/data_designer_demo_processors/notebooks/demo_processors.py @@ -0,0 +1,120 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Demo: Processor Plugins with PRE_GENERATION and POST_GENERATION stages. + +This notebook demonstrates: +1. RegexFilterProcessor (PRE_GENERATION) - filters seed data before generation +2. SemanticDedupProcessor (POST_GENERATION) - deduplicates final dataset + +Run cells with `#%%` markers in VS Code or PyCharm. +""" + +# %% Imports +import tempfile +from pathlib import Path + +import pandas as pd +from data_designer_demo_processors.regex_filter import RegexFilterProcessorConfig +from data_designer_demo_processors.semantic_dedup import SemanticDedupProcessorConfig + +import data_designer.config as dd +from data_designer.interface import DataDesigner + +# %% Create seed data with some rows we want to filter out +seed_data = pd.DataFrame( + { + "topic": [ + "Python programming", + "Machine learning", + "SPAM: Buy now!", # Will be filtered by regex + "Data science", + "SPAM: Click here", # Will be filtered by regex + "Natural language processing", + "Computer vision", + ], + "difficulty": ["beginner", "advanced", "N/A", "intermediate", "N/A", "advanced", "advanced"], + } +) + +print("Seed data before PRE_GENERATION filtering:") +print(seed_data) +print(f"Total rows: {len(seed_data)}") + +# %% Setup temporary directory and save seed data +output_dir = Path(tempfile.mkdtemp()) +seed_path = output_dir / "seed.parquet" +seed_data.to_parquet(seed_path, index=False) + +# %% Build the Data Designer configuration (uses default openai-text model) +config_builder = dd.DataDesignerConfigBuilder() + +# Add seed dataset +config_builder.with_seed_dataset(dd.LocalFileSeedSource(path=str(seed_path))) + +# Add LLM column to generate explanations +config_builder.add_column( + dd.LLMTextColumnConfig( + name="explanation", + prompt="""Write a brief one-sentence explanation of the topic: {{ topic }} +Difficulty level: {{ difficulty }} + +Keep it concise and educational.""", + model_alias="openai-text", + ) +) + +# Add PRE_GENERATION processor to filter out spam rows +config_builder.add_processor( + RegexFilterProcessorConfig( + name="filter_spam", + column="topic", + pattern=r"^SPAM:", + invert=True, # Keep rows that do NOT match (i.e., filter out spam) + ) +) + +# Add POST_GENERATION processor to deduplicate similar explanations +config_builder.add_processor( + SemanticDedupProcessorConfig( + name="dedup_explanations", + column="explanation", + similarity_threshold=0.85, + ) +) + +print("Configuration created successfully!") +processor_configs = config_builder.get_processor_configs() +print(f"Processors configured: {[p.name for p in processor_configs]}") + +# %% Run preview to test with a few records +data_designer = DataDesigner() + +print("\nRunning preview (3 records)...") +preview = data_designer.preview(config_builder, num_records=3) + +print("\nPreview dataset:") +print(preview.dataset) + +# %% Run full generation +print("\nRunning full generation...") +results = data_designer.create( + config_builder, + num_records=5, + dataset_name="processor-demo", +) + +# Load the final dataset +final_dataset = results.load_dataset() + +print("\nFinal dataset after all processors:") +print(final_dataset) +print(f"\nTotal rows in final dataset: {len(final_dataset)}") + +# %% Summary +print("\n" + "=" * 60) +print("DEMO SUMMARY") +print("=" * 60) +print(f"Original seed rows: {len(seed_data)}") +print("After PRE_GENERATION (regex filter): Expected ~5 rows (SPAM removed)") +print(f"After POST_GENERATION (semantic dedup): {len(final_dataset)} rows") diff --git a/demo/data_designer_demo_processors/pyproject.toml b/demo/data_designer_demo_processors/pyproject.toml new file mode 100644 index 00000000..49cbda2c --- /dev/null +++ b/demo/data_designer_demo_processors/pyproject.toml @@ -0,0 +1,25 @@ +[project] +name = "data-designer-demo-processors" +version = "0.1.0" +description = "Demo processor plugins for Data Designer showing PRE_GENERATION and POST_GENERATION stages" +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "data-designer-config", + "data-designer-engine", + "sentence-transformers>=2.2.0", +] + +[project.entry-points."data_designer.plugins"] +regex-filter = "data_designer_demo_processors.regex_filter.plugin:regex_filter_plugin" +semantic-dedup = "data_designer_demo_processors.semantic_dedup.plugin:semantic_dedup_plugin" + +[project.scripts] +download-semantic-dedup-model = "data_designer_demo_processors.download_model:main" + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build.targets.wheel] +packages = ["src/data_designer_demo_processors"] diff --git a/demo/data_designer_demo_processors/src/data_designer_demo_processors/__init__.py b/demo/data_designer_demo_processors/src/data_designer_demo_processors/__init__.py new file mode 100644 index 00000000..955d001d --- /dev/null +++ b/demo/data_designer_demo_processors/src/data_designer_demo_processors/__init__.py @@ -0,0 +1,4 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Demo processor plugins for Data Designer.""" diff --git a/demo/data_designer_demo_processors/src/data_designer_demo_processors/download_model.py b/demo/data_designer_demo_processors/src/data_designer_demo_processors/download_model.py new file mode 100644 index 00000000..72ad60e5 --- /dev/null +++ b/demo/data_designer_demo_processors/src/data_designer_demo_processors/download_model.py @@ -0,0 +1,19 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +"""Pre-download the semantic dedup embedding model.""" + +DEFAULT_MODEL = "all-MiniLM-L6-v2" + + +def main(): + """Download the embedding model to cache.""" + from sentence_transformers import SentenceTransformer + + print(f"Downloading model: {DEFAULT_MODEL}") + SentenceTransformer(DEFAULT_MODEL) + print("Model downloaded successfully!") + + +if __name__ == "__main__": + main() diff --git a/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/__init__.py b/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/__init__.py new file mode 100644 index 00000000..b486d99f --- /dev/null +++ b/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/__init__.py @@ -0,0 +1,7 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from data_designer_demo_processors.regex_filter.config import RegexFilterProcessorConfig +from data_designer_demo_processors.regex_filter.impl import RegexFilterProcessor + +__all__ = ["RegexFilterProcessorConfig", "RegexFilterProcessor"] diff --git a/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/config.py b/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/config.py new file mode 100644 index 00000000..da7e6916 --- /dev/null +++ b/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/config.py @@ -0,0 +1,20 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from typing import Literal + +from pydantic import Field + +from data_designer.config.processors import ProcessorConfig + + +class RegexFilterProcessorConfig(ProcessorConfig): + """Filter rows based on regex matching on a column. + + This processor filters seed data during the preprocess stage. + """ + + processor_type: Literal["regex-filter"] = "regex-filter" + column: str = Field(description="Column to apply regex filter on") + pattern: str = Field(description="Regex pattern to match") + invert: bool = Field(default=False, description="If True, keep rows that do NOT match") diff --git a/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/impl.py b/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/impl.py new file mode 100644 index 00000000..98dd8555 --- /dev/null +++ b/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/impl.py @@ -0,0 +1,47 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import logging +import re +from typing import TYPE_CHECKING + +from data_designer.engine.processing.processors.base import Processor +from data_designer_demo_processors.regex_filter.config import RegexFilterProcessorConfig + +if TYPE_CHECKING: + import pandas as pd + +logger = logging.getLogger(__name__) + + +class RegexFilterProcessor(Processor[RegexFilterProcessorConfig]): + """Filters rows based on regex matching on a specified column. + + Runs during preprocess to filter seed data before generation. + """ + + def preprocess(self, data: pd.DataFrame) -> pd.DataFrame: + column = self.config.column + pattern = self.config.pattern + invert = self.config.invert + + if column not in data.columns: + logger.warning(f"โš ๏ธ Column '{column}' not found in dataset. Skipping regex filter.") + return data + + compiled = re.compile(pattern) + mask = data[column].astype(str).apply(lambda x: bool(compiled.search(x))) + + if invert: + mask = ~mask + + original_count = len(data) + data = data[mask].reset_index(drop=True) + filtered_count = original_count - len(data) + + action = "excluded" if not invert else "kept only non-matching" + logger.info(f"๐Ÿ” Regex filter: {filtered_count} rows {action} (pattern: {pattern!r} on column '{column}')") + + return data diff --git a/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/plugin.py b/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/plugin.py new file mode 100644 index 00000000..bc091b5d --- /dev/null +++ b/demo/data_designer_demo_processors/src/data_designer_demo_processors/regex_filter/plugin.py @@ -0,0 +1,10 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from data_designer.plugins.plugin import Plugin, PluginType + +regex_filter_plugin = Plugin( + config_qualified_name="data_designer_demo_processors.regex_filter.config.RegexFilterProcessorConfig", + impl_qualified_name="data_designer_demo_processors.regex_filter.impl.RegexFilterProcessor", + plugin_type=PluginType.PROCESSOR, +) diff --git a/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/__init__.py b/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/__init__.py new file mode 100644 index 00000000..993b7eb2 --- /dev/null +++ b/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/__init__.py @@ -0,0 +1,7 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from data_designer_demo_processors.semantic_dedup.config import SemanticDedupProcessorConfig +from data_designer_demo_processors.semantic_dedup.impl import SemanticDedupProcessor + +__all__ = ["SemanticDedupProcessorConfig", "SemanticDedupProcessor"] diff --git a/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/config.py b/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/config.py new file mode 100644 index 00000000..04ecb1ab --- /dev/null +++ b/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/config.py @@ -0,0 +1,28 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from typing import Literal + +from pydantic import Field + +from data_designer.config.processors import ProcessorConfig + + +class SemanticDedupProcessorConfig(ProcessorConfig): + """Remove semantically similar rows using embeddings. + + This processor deduplicates the final dataset during the postprocess stage. + """ + + processor_type: Literal["semantic-dedup"] = "semantic-dedup" + column: str = Field(description="Column to compute embeddings on for deduplication") + similarity_threshold: float = Field( + default=0.9, + ge=0.0, + le=1.0, + description="Cosine similarity threshold above which rows are considered duplicates", + ) + model_name: str = Field( + default="all-MiniLM-L6-v2", + description="Sentence-transformers model name for computing embeddings", + ) diff --git a/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/impl.py b/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/impl.py new file mode 100644 index 00000000..699367ba --- /dev/null +++ b/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/impl.py @@ -0,0 +1,74 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING + +import numpy as np +import transformers.utils.logging as transformers_logging +from sentence_transformers import SentenceTransformer + +from data_designer.engine.processing.processors.base import Processor +from data_designer_demo_processors.semantic_dedup.config import SemanticDedupProcessorConfig + +if TYPE_CHECKING: + import pandas as pd + +logger = logging.getLogger(__name__) + + +class SemanticDedupProcessor(Processor[SemanticDedupProcessorConfig]): + """Removes semantically similar rows using embeddings. + + Runs during postprocess to deduplicate the final generated dataset. + """ + + def _initialize(self) -> None: + # Suppress sentence-transformers/transformers logging noise + transformers_logging.set_verbosity_error() + transformers_logging.disable_progress_bar() + + self._model = SentenceTransformer(self.config.model_name) + + def postprocess(self, data: pd.DataFrame) -> pd.DataFrame: + column = self.config.column + threshold = self.config.similarity_threshold + + if column not in data.columns: + logger.warning(f"โš ๏ธ Column '{column}' not found in dataset. Skipping semantic dedup.") + return data + + if len(data) == 0: + return data + + texts = data[column].astype(str).tolist() + embeddings = self._model.encode(texts, show_progress_bar=False, convert_to_numpy=True) + + # Normalize embeddings for cosine similarity + norms = np.linalg.norm(embeddings, axis=1, keepdims=True) + norms[norms == 0] = 1 # Avoid division by zero + embeddings = embeddings / norms + + # Find duplicates using greedy approach: keep first occurrence, remove similar ones + keep_indices = [] + for i in range(len(embeddings)): + is_duplicate = False + for kept_idx in keep_indices: + similarity = np.dot(embeddings[i], embeddings[kept_idx]) + if similarity >= threshold: + is_duplicate = True + break + if not is_duplicate: + keep_indices.append(i) + + original_count = len(data) + data = data.iloc[keep_indices].reset_index(drop=True) + removed_count = original_count - len(data) + + logger.info( + f"๐Ÿงน Semantic dedup: removed {removed_count} similar rows (threshold: {threshold}, column: '{column}')" + ) + + return data diff --git a/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/plugin.py b/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/plugin.py new file mode 100644 index 00000000..cd49c760 --- /dev/null +++ b/demo/data_designer_demo_processors/src/data_designer_demo_processors/semantic_dedup/plugin.py @@ -0,0 +1,10 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from data_designer.plugins.plugin import Plugin, PluginType + +semantic_dedup_plugin = Plugin( + config_qualified_name="data_designer_demo_processors.semantic_dedup.config.SemanticDedupProcessorConfig", + impl_qualified_name="data_designer_demo_processors.semantic_dedup.impl.SemanticDedupProcessor", + plugin_type=PluginType.PROCESSOR, +) diff --git a/demo/data_designer_demo_processors/tests/__init__.py b/demo/data_designer_demo_processors/tests/__init__.py new file mode 100644 index 00000000..52a7a9da --- /dev/null +++ b/demo/data_designer_demo_processors/tests/__init__.py @@ -0,0 +1,2 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 diff --git a/demo/data_designer_demo_processors/tests/test_regex_filter.py b/demo/data_designer_demo_processors/tests/test_regex_filter.py new file mode 100644 index 00000000..6ef25f76 --- /dev/null +++ b/demo/data_designer_demo_processors/tests/test_regex_filter.py @@ -0,0 +1,56 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from unittest.mock import Mock + +import pandas as pd +import pytest +from data_designer_demo_processors.regex_filter import RegexFilterProcessor, RegexFilterProcessorConfig + + +@pytest.fixture +def stub_resource_provider(): + return Mock() + + +def test_regex_filter_keeps_matching_rows(stub_resource_provider): + config = RegexFilterProcessorConfig(name="test", column="text", pattern=r"hello") + processor = RegexFilterProcessor(config=config, resource_provider=stub_resource_provider) + + df = pd.DataFrame({"text": ["hello world", "goodbye", "say hello", "nothing"]}) + result = processor.process(df) + + assert len(result) == 2 + assert list(result["text"]) == ["hello world", "say hello"] + + +def test_regex_filter_invert_keeps_non_matching(stub_resource_provider): + config = RegexFilterProcessorConfig(name="test", column="text", pattern=r"hello", invert=True) + processor = RegexFilterProcessor(config=config, resource_provider=stub_resource_provider) + + df = pd.DataFrame({"text": ["hello world", "goodbye", "say hello", "nothing"]}) + result = processor.process(df) + + assert len(result) == 2 + assert list(result["text"]) == ["goodbye", "nothing"] + + +def test_regex_filter_missing_column_returns_unchanged(stub_resource_provider): + config = RegexFilterProcessorConfig(name="test", column="missing", pattern=r"hello") + processor = RegexFilterProcessor(config=config, resource_provider=stub_resource_provider) + + df = pd.DataFrame({"text": ["hello world", "goodbye"]}) + result = processor.process(df) + + assert len(result) == 2 + + +def test_regex_filter_complex_pattern(stub_resource_provider): + config = RegexFilterProcessorConfig(name="test", column="email", pattern=r"^\w+@\w+\.\w+$") + processor = RegexFilterProcessor(config=config, resource_provider=stub_resource_provider) + + df = pd.DataFrame({"email": ["user@example.com", "invalid", "other@test.org", "bad@"]}) + result = processor.process(df) + + assert len(result) == 2 + assert list(result["email"]) == ["user@example.com", "other@test.org"] diff --git a/demo/data_designer_demo_processors/tests/test_semantic_dedup.py b/demo/data_designer_demo_processors/tests/test_semantic_dedup.py new file mode 100644 index 00000000..e247eb2c --- /dev/null +++ b/demo/data_designer_demo_processors/tests/test_semantic_dedup.py @@ -0,0 +1,70 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from unittest.mock import Mock + +import pandas as pd +import pytest +from data_designer_demo_processors.semantic_dedup import SemanticDedupProcessor, SemanticDedupProcessorConfig + + +@pytest.fixture +def stub_resource_provider(): + return Mock() + + +@pytest.mark.slow +def test_semantic_dedup_removes_similar_rows(stub_resource_provider): + config = SemanticDedupProcessorConfig(name="test", column="text", similarity_threshold=0.9) + processor = SemanticDedupProcessor(config=config, resource_provider=stub_resource_provider) + + df = pd.DataFrame( + { + "text": [ + "The cat sat on the mat", + "A cat was sitting on the mat", # Very similar to first + "Dogs like to play fetch", + "The dog enjoys playing fetch", # Very similar to third + "Quantum physics is fascinating", + ] + } + ) + result = processor.process(df) + + # Should keep only dissimilar rows + assert len(result) < len(df) + assert len(result) >= 3 # At least 3 distinct topics + + +@pytest.mark.slow +def test_semantic_dedup_missing_column_returns_unchanged(stub_resource_provider): + config = SemanticDedupProcessorConfig(name="test", column="missing", similarity_threshold=0.9) + processor = SemanticDedupProcessor(config=config, resource_provider=stub_resource_provider) + + df = pd.DataFrame({"text": ["hello", "world"]}) + result = processor.process(df) + + assert len(result) == 2 + + +@pytest.mark.slow +def test_semantic_dedup_empty_dataframe(stub_resource_provider): + config = SemanticDedupProcessorConfig(name="test", column="text", similarity_threshold=0.9) + processor = SemanticDedupProcessor(config=config, resource_provider=stub_resource_provider) + + df = pd.DataFrame({"text": []}) + result = processor.process(df) + + assert len(result) == 0 + + +@pytest.mark.slow +def test_semantic_dedup_low_threshold_removes_more(stub_resource_provider): + config = SemanticDedupProcessorConfig(name="test", column="text", similarity_threshold=0.5) + processor = SemanticDedupProcessor(config=config, resource_provider=stub_resource_provider) + + df = pd.DataFrame({"text": ["apple", "orange", "banana", "grape", "lemon"]}) + result = processor.process(df) + + # With low threshold, fruit words might be considered similar + assert len(result) <= len(df) diff --git a/docs/plugins/overview.md b/docs/plugins/overview.md index 0016120f..b38ad6a2 100644 --- a/docs/plugins/overview.md +++ b/docs/plugins/overview.md @@ -7,9 +7,10 @@ Plugins are Python packages that extend Data Designer's capabilities without modifying the core library. Similar to [VS Code extensions](https://marketplace.visualstudio.com/vscode) and [Pytest plugins](https://docs.pytest.org/en/stable/reference/plugin_list.html), the plugin system empowers you to build specialized extensions for your specific use cases and share them with the community. -**Current capabilities**: Data Designer currently supports plugins for column generators (the column types you pass to the config builder's [add_column](../code_reference/config_builder.md#data_designer.config.config_builder.DataDesignerConfigBuilder.add_column) method). +**Current capabilities**: Data Designer supports plugins for: -**Coming soon**: Plugin support for processors, validators, and more! +- **Column generators**: Custom column types you pass to the config builder's [add_column](../code_reference/config_builder.md#data_designer.config.config_builder.DataDesignerConfigBuilder.add_column) method +- **Processors**: Custom transformations that run at `PRE_GENERATION`, `POST_BATCH`, or `POST_GENERATION` stages ## How do you use plugins? @@ -27,9 +28,17 @@ Creating a plugin involves three main steps: ### 1. Implement the Plugin Components +For **column generator plugins**: + - Create a task class inheriting from `ColumnGenerator` - Create a config class inheriting from `SingleColumnConfig` -- Instantiate a `Plugin` object connecting them +- Instantiate a `Plugin` object with `plugin_type=PluginType.COLUMN_GENERATOR` + +For **processor plugins**: + +- Create a task class inheriting from `Processor` +- Create a config class inheriting from `ProcessorConfig` +- Instantiate a `Plugin` object with `plugin_type=PluginType.PROCESSOR` ### 2. Package Your Plugin @@ -37,6 +46,13 @@ Creating a plugin involves three main steps: - Register your plugin using entry points - Define dependencies (including `data-designer`) +**Example entry point configuration in `pyproject.toml`:** + +```toml +[project.entry-points."data_designer.plugins"] +my-processor = "my_package.processor.plugin:my_processor_plugin" +``` + ### 3. Share Your Plugin - Publish to PyPI or another package index diff --git a/packages/data-designer-config/src/data_designer/config/config_builder.py b/packages/data-designer-config/src/data_designer/config/config_builder.py index 7bf791eb..a893e327 100644 --- a/packages/data-designer-config/src/data_designer/config/config_builder.py +++ b/packages/data-designer-config/src/data_designer/config/config_builder.py @@ -27,7 +27,8 @@ from data_designer.config.exportable_config import ExportableConfigBase from data_designer.config.mcp import ToolConfig from data_designer.config.models import ModelConfig, load_model_configs -from data_designer.config.processors import ProcessorConfigT, ProcessorType, get_processor_config_from_kwargs +from data_designer.config.processor_types import ProcessorConfigT +from data_designer.config.processors import ProcessorType, get_processor_config_from_kwargs from data_designer.config.sampler_constraints import ( ColumnConstraintT, ColumnInequalityConstraint, diff --git a/packages/data-designer-config/src/data_designer/config/data_designer_config.py b/packages/data-designer-config/src/data_designer/config/data_designer_config.py index 5a0a9613..e65a67de 100644 --- a/packages/data-designer-config/src/data_designer/config/data_designer_config.py +++ b/packages/data-designer-config/src/data_designer/config/data_designer_config.py @@ -12,7 +12,7 @@ from data_designer.config.exportable_config import ExportableConfigBase from data_designer.config.mcp import ToolConfig from data_designer.config.models import ModelConfig -from data_designer.config.processors import ProcessorConfigT +from data_designer.config.processor_types import ProcessorConfigT from data_designer.config.sampler_constraints import ColumnConstraintT from data_designer.config.seed import SeedConfig diff --git a/packages/data-designer-config/src/data_designer/config/processor_types.py b/packages/data-designer-config/src/data_designer/config/processor_types.py new file mode 100644 index 00000000..a90f582d --- /dev/null +++ b/packages/data-designer-config/src/data_designer/config/processor_types.py @@ -0,0 +1,17 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from typing_extensions import TypeAlias + +from data_designer.config.processors import ( + DropColumnsProcessorConfig, + SchemaTransformProcessorConfig, +) +from data_designer.plugin_manager import PluginManager + +plugin_manager = PluginManager() + +ProcessorConfigT: TypeAlias = DropColumnsProcessorConfig | SchemaTransformProcessorConfig +ProcessorConfigT = plugin_manager.inject_into_processor_config_type_union(ProcessorConfigT) diff --git a/packages/data-designer-config/src/data_designer/config/processors.py b/packages/data-designer-config/src/data_designer/config/processors.py index 21d94b78..3dba7b29 100644 --- a/packages/data-designer-config/src/data_designer/config/processors.py +++ b/packages/data-designer-config/src/data_designer/config/processors.py @@ -9,7 +9,6 @@ from typing import Any, Literal from pydantic import Field, field_validator -from typing_extensions import TypeAlias from data_designer.config.base import ConfigBase from data_designer.config.errors import InvalidConfigError @@ -129,6 +128,3 @@ def validate_template(cls, v: dict[str, Any]) -> dict[str, Any]: if "not JSON serializable" in str(e): raise InvalidConfigError("Template must be JSON serializable") return v - - -ProcessorConfigT: TypeAlias = DropColumnsProcessorConfig | SchemaTransformProcessorConfig diff --git a/packages/data-designer-config/src/data_designer/plugin_manager.py b/packages/data-designer-config/src/data_designer/plugin_manager.py index 768e5314..7d204c02 100644 --- a/packages/data-designer-config/src/data_designer/plugin_manager.py +++ b/packages/data-designer-config/src/data_designer/plugin_manager.py @@ -76,3 +76,40 @@ def inject_into_seed_source_type_union(self, seed_source_type: type[TypeAlias]) """ seed_source_type = self._plugin_registry.add_plugin_types_to_union(seed_source_type, PluginType.SEED_READER) return seed_source_type + + def get_processor_plugins(self) -> list[Plugin]: + """Get all processor plugins. + + Returns: + A list of all processor plugins. + """ + return self._plugin_registry.get_plugins(PluginType.PROCESSOR) + + def get_processor_plugin_if_exists(self, plugin_name: str) -> Plugin | None: + """Get a processor plugin by name if it exists. + + Args: + plugin_name: The name of the plugin to retrieve. + + Returns: + The plugin if found, otherwise None. + """ + if self._plugin_registry.plugin_exists(plugin_name): + plugin = self._plugin_registry.get_plugin(plugin_name) + if plugin.plugin_type == PluginType.PROCESSOR: + return plugin + return None + + def inject_into_processor_config_type_union(self, processor_config_type: type[TypeAlias]) -> type[TypeAlias]: + """Inject plugins into the processor config type. + + Args: + processor_config_type: The processor config type to inject plugins into. + + Returns: + The processor config type with plugins injected. + """ + processor_config_type = self._plugin_registry.add_plugin_types_to_union( + processor_config_type, PluginType.PROCESSOR + ) + return processor_config_type diff --git a/packages/data-designer-config/src/data_designer/plugins/plugin.py b/packages/data-designer-config/src/data_designer/plugins/plugin.py index d8c47792..a961062a 100644 --- a/packages/data-designer-config/src/data_designer/plugins/plugin.py +++ b/packages/data-designer-config/src/data_designer/plugins/plugin.py @@ -20,6 +20,7 @@ class PluginType(str, Enum): COLUMN_GENERATOR = "column-generator" SEED_READER = "seed-reader" + PROCESSOR = "processor" @property def discriminator_field(self) -> str: @@ -27,6 +28,8 @@ def discriminator_field(self) -> str: return "column_type" elif self == PluginType.SEED_READER: return "seed_type" + elif self == PluginType.PROCESSOR: + return "processor_type" else: raise ValueError(f"Invalid plugin type: {self.value}") diff --git a/packages/data-designer-config/src/data_designer/plugins/registry.py b/packages/data-designer-config/src/data_designer/plugins/registry.py index b544e146..7ed777f3 100644 --- a/packages/data-designer-config/src/data_designer/plugins/registry.py +++ b/packages/data-designer-config/src/data_designer/plugins/registry.py @@ -23,7 +23,7 @@ class PluginRegistry: _instance = None _plugins_discovered = False - _lock = threading.Lock() + _lock = threading.RLock() _plugins: dict[str, Plugin] = {} diff --git a/packages/data-designer-engine/src/data_designer/engine/processing/processors/registry.py b/packages/data-designer-engine/src/data_designer/engine/processing/processors/registry.py index 9a9b463e..2c583304 100644 --- a/packages/data-designer-engine/src/data_designer/engine/processing/processors/registry.py +++ b/packages/data-designer-engine/src/data_designer/engine/processing/processors/registry.py @@ -13,13 +13,22 @@ from data_designer.engine.processing.processors.drop_columns import DropColumnsProcessor from data_designer.engine.processing.processors.schema_transform import SchemaTransformProcessor from data_designer.engine.registry.base import TaskRegistry +from data_designer.plugins.plugin import PluginType +from data_designer.plugins.registry import PluginRegistry class ProcessorRegistry(TaskRegistry[str, Processor, ConfigBase]): ... -def create_default_processor_registry() -> ProcessorRegistry: +def create_default_processor_registry(with_plugins: bool = True) -> ProcessorRegistry: registry = ProcessorRegistry() registry.register(ProcessorType.SCHEMA_TRANSFORM, SchemaTransformProcessor, SchemaTransformProcessorConfig, False) registry.register(ProcessorType.DROP_COLUMNS, DropColumnsProcessor, DropColumnsProcessorConfig, False) + if with_plugins: + for plugin in PluginRegistry().get_plugins(PluginType.PROCESSOR): + registry.register( + plugin.name, + plugin.impl_cls, + plugin.config_cls, + ) return registry diff --git a/packages/data-designer-engine/src/data_designer/engine/validation.py b/packages/data-designer-engine/src/data_designer/engine/validation.py index 0483d3a0..127ccf50 100644 --- a/packages/data-designer-engine/src/data_designer/engine/validation.py +++ b/packages/data-designer-engine/src/data_designer/engine/validation.py @@ -15,7 +15,8 @@ from rich.panel import Panel from data_designer.config.column_types import ColumnConfigT, DataDesignerColumnType -from data_designer.config.processors import ProcessorConfigT, ProcessorType +from data_designer.config.processor_types import ProcessorConfigT +from data_designer.config.processors import ProcessorType from data_designer.config.utils.constants import RICH_CONSOLE_THEME from data_designer.config.utils.misc import ( can_run_data_designer_locally, diff --git a/plans/processor-plugins.md b/plans/processor-plugins.md new file mode 100644 index 00000000..583d3b55 --- /dev/null +++ b/plans/processor-plugins.md @@ -0,0 +1,246 @@ +# Plan: Processor Plugins with Global Stages + +Created: 2026-02-03 +Status: Iterating and Refining + +## Goal + +Extend the processor system to support global preprocessing (before generation) and postprocessing (after generation) stages, and enable third-party processor plugins via the existing plugin discovery mechanism. + +## Success Criteria + +- [x] `PRE_GENERATION` stage runs once on full seed data before batching/generation +- [x] `POST_GENERATION` stage runs once on final dataset after all batches complete +- [x] `PluginType.PROCESSOR` enables external processor plugins +- [x] ProcessorRegistry loads plugins from entry points +- [x] Demo plugin package demonstrates both preprocessing and postprocessing +- [x] Existing `POST_BATCH` behavior unchanged + +## Implementation Steps + +### Step 1: Extend BuildStage Support + +Update processor configuration to accept new stages. + +- [x] Add `PRE_GENERATION` and `POST_GENERATION` to `SUPPORTED_STAGES` in processors.py +- [x] Add unit tests verifying `ProcessorConfig` accepts the new stage values + +**Suggestion**: Check if `BuildStage` enum already has these values defined elsewhere before adding. + +### Step 2: Update Dataset Builder for Global Stages + +Implement the actual execution of processors at the new stages. + +- [x] Add `_run_pre_generation_processors()` method + - Load full seed dataset before batch loop + - Apply PRE_GENERATION processors sequentially + - Replace the seed reader with an in-memory version containing processed data + - **Suggestion**: Look for existing in-memory seed reader implementations (e.g., `DataFrameSeedReader`) + +- [x] Add `_run_post_generation_processors()` method + - Load the final combined dataset after all batches complete + - Apply POST_GENERATION processors sequentially + - Rewrite the final dataset with processed results + - **Suggestion**: Check how existing artifact storage handles dataset loading/writing + +- [x] Integrate calls into the `build()` method at appropriate points +- [x] Add integration tests for both flows + +### Step 3: Add Processor Plugin Support + +Enable third-party processor plugins through the existing plugin system. + +- [x] Add `PluginType.PROCESSOR` to the plugin types enum +- [x] Update `discriminator_field` property to return `"processor_type"` for processors +- [x] Update `ProcessorRegistry` to discover and load processor plugins + - **Suggestion**: Follow the pattern used for column generator plugins + - Use string keys for plugin processors (not enum values) + +- [x] Inject plugin processor configs into the `ProcessorConfigT` type union + - Follow the existing `_types` pattern used for columns and seed sources + +**Follow the `_types` Module Pattern**: + +The codebase separates base classes from type unions with plugin injection: +- `column_configs.py` (base) โ†’ `column_types.py` (union + injection) +- `seed_source.py` (base) โ†’ `seed_source_types.py` (union + injection) + +Do the same for processors: +- [x] Keep `processors.py` with base classes and concrete configs +- [x] Create `processor_types.py` for `ProcessorConfigT` with plugin injection +- [x] Plugin configs import from `processors.py` (no circular dependency) + +**Threading Note**: + +If you encounter deadlocks during plugin discovery with nested imports, the `PluginRegistry` may need a reentrant lock (`RLock`) instead of `Lock`. + +### Step 4: Create Demo Plugin Package + +Create a separate package demonstrating both processor types. + +- [x] Create package structure under `demo/data_designer_demo_processors/` +- [x] Implement `RegexFilterProcessor` (PRE_GENERATION) + - Config: column, pattern, invert flag + - Filters rows based on regex matching +- [x] Implement `SemanticDedupProcessor` (POST_GENERATION) + - Config: column, similarity_threshold, model_name + - Uses embeddings to find and remove similar rows + - **Suggestion**: Use sentence-transformers with a small model like `all-MiniLM-L6-v2` + +- [x] Configure entry points in `pyproject.toml` under `data_designer.plugins` +- [x] Add unit tests for each processor +- [x] Add README with installation and usage examples + +**Logging Suppression** (for sentence-transformers): + +Sentence-transformers emits progress bars and warnings when loading models. Suppress them: + +- Use `transformers.utils.logging.set_verbosity_error()` to suppress info/warning messages +- Use `transformers.utils.logging.disable_progress_bar()` to suppress progress bars +- Pass `show_progress_bar=False` to `model.encode()` for batch encoding + +### Step 5: Demo Notebook + +Create a simple, short demo that tests all features end-to-end. + +- [x] Use `#%%` cell markers for IDE compatibility +- [x] Keep the demo minimal - just enough to verify the feature works +- [x] Include sample seed data with rows to filter (PRE_GENERATION test) +- [x] Add an LLM column to generate content, use the `openai-text` model +- [x] Configure both PRE_GENERATION and POST_GENERATION processors +- [x] **Run the demo and fix any issues** - don't just write it, execute it +- [x] Verify the output shows filtering and deduplication working + +**Important**: The demo must actually run successfully. Test it before considering this step complete. + +**API Notes**: Check the docs for correct Data Designer API usage. + +### Step 6: Documentation + +Update existing documentation to cover new capabilities. + +- [x] Update processor concepts doc with new stages table +- [x] Update plugins overview to mention processor plugins +- [x] Include example entry point configuration + +## Testing Strategy + +- Write tests alongside implementation, not as a separate step +- Use mocks for external dependencies (seed readers, artifact storage) +- For plugin registry tests, create actual mock classes (not Mock objects) to satisfy type validation + +## Risks & Considerations + +- **Memory usage**: POST_GENERATION holds full dataset in memory +- **Seed data mutation**: PRE_GENERATION modifies seed data before batching +- **Model download**: Embedding models download on first use; perform pre-download on uv install + +## Files Modified + +Core: +- `packages/data-designer-config/src/data_designer/config/processor_types.py` (new) +- `packages/data-designer-config/src/data_designer/config/processors.py` +- `packages/data-designer-config/src/data_designer/config/data_designer_config.py` +- `packages/data-designer-config/src/data_designer/config/config_builder.py` +- `packages/data-designer-config/src/data_designer/plugin_manager.py` +- `packages/data-designer-config/src/data_designer/plugins/plugin.py` +- `packages/data-designer-config/src/data_designer/plugins/registry.py` +- `packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py` +- `packages/data-designer-engine/src/data_designer/engine/processing/processors/registry.py` +- `packages/data-designer-engine/src/data_designer/engine/validation.py` + +Demo: +- `demo/data_designer_demo_processors/` (new package) + +Docs: +- `docs/concepts/processors.md` +- `docs/plugins/overview.md` + +--- + +## Iterating and Refining + +### Issue 1: Preview does not apply PRE_GENERATION / POST_GENERATION processors + +**Problem**: `build_preview()` and `process_preview()` do not call the new global-stage processors. This means users previewing their data pipeline won't see the effects of filtering or deduplication until they run a full build. + +**Investigation**: +- `build()` calls `_run_pre_generation_processors()` before generation and `_run_post_generation_processors()` after +- `build_preview()` skips both +- `process_preview()` only applies `POST_BATCH` processors + +**Fix**: +- [x] Add `_run_pre_generation_processors()` call to `build_preview()` before `_initialize_generators()` +- [x] Update `process_preview()` to also run `POST_GENERATION` processors after `POST_BATCH` +- [x] Add tests for preview with global-stage processors + +### Issue 2: Is RLock necessary in PluginRegistry? + +**Question**: The plan suggested changing `Lock` to `RLock` in `PluginRegistry`. Is this actually needed? + +**Investigation**: + +The `PluginRegistry` singleton uses a lock in three places: +- `__new__`: double-checked locking for singleton creation +- `__init__`: protects `_discover()` call +- `reset()`: resets singleton state + +The potential deadlock scenario: +1. `PluginRegistry.__init__` acquires lock +2. `_discover()` calls `ep.load()` to load a plugin module +3. The plugin imports `data_designer.config.config_builder` (or any module using the config API) +4. That imports `column_types.py`, `processor_types.py`, `seed_source_types.py` +5. Each of those calls `PluginManager()` โ†’ `PluginRegistry()` at module level +6. `PluginRegistry().__init__` tries to acquire lock again (same thread) +7. With `Lock`: deadlock (same thread blocked). With `RLock`: succeeds. + +Import chain that triggers this: +``` +plugin.py โ†’ data_designer.config.config_builder + โ†’ data_designer.config.column_types (calls PluginManager()) + โ†’ data_designer.config.processor_types (calls PluginManager()) + โ†’ data_designer.config.seed_source_types (calls PluginManager()) +``` + +**Conclusion**: YES, `RLock` is necessary. Any third-party plugin that imports from the `data_designer.config` public API (e.g., `DataDesignerConfigBuilder`, `DataDesignerConfig`) would trigger this re-entry. Using a regular `Lock` would cause a deadlock. + +### Iteration 3: Callback-based Processor Design + +**Date**: 2026-02-04 + +**Issue**: The `build_stage` config field determines when a processor runs, but processors are inherently tied to specific stages. A `SemanticDedupProcessor` running at PRE_GENERATION doesn't make sense - it's meant to deduplicate final outputs. This creates a semantic mismatch where config controls something that's really a property of the processor class. + +**Root Cause**: Original design treated stage as configuration rather than as part of the processor's interface. This: +- Polluted config with fields that aren't user-configurable in practice +- Made single-stage limitation implicit (processors can only run at one stage) +- Created inconsistent method signatures (`current_batch_number` only meaningful for POST_BATCH) + +**Resolution**: Changed from stage-based config to callback-based design: + +1. **Removed `build_stage` from `ProcessorConfig`** - Processors no longer declare when they run via config + +2. **Added callback methods to `Processor` base class**: + - `preprocess(data)` - Called at PRE_GENERATION on seed data + - `process_after_batch(data, *, batch_number)` - Called at POST_BATCH for each batch + - `postprocess(data)` - Called at POST_GENERATION on final dataset + +3. **Changed `_processors` from `dict[BuildStage, list]` to `list[Processor]`** - All processors in flat list, callbacks called on all + +4. **Updated built-in processors**: + - `DropColumnsProcessor` โ†’ overrides `process_after_batch()` + - `SchemaTransformProcessor` โ†’ overrides `process_after_batch()` + +5. **Updated demo processors**: + - `RegexFilterProcessor` โ†’ overrides `preprocess()` + - `SemanticDedupProcessor` โ†’ overrides `postprocess()` + +**Benefits**: +- Self-documenting: Looking at a processor class shows what stages it handles +- Multi-stage capable: One processor can implement multiple callbacks +- Clean signatures: Each callback has only relevant parameters +- Simpler config: No `build_stage` field to confuse users + +**Verification**: +- All 1360 tests pass +- Config tests updated to remove `build_stage` parameter +- Builder tests updated for new callback pattern