-
Notifications
You must be signed in to change notification settings - Fork 57
refactor: callback-based processor design #294
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
35df5cf to
ec54289
Compare
edbaf44 to
5cc6f21
Compare
f6a820b to
8310910
Compare
| name: str = Field( | ||
| description="The name of the processor, used to identify the processor in the results and to write the artifacts to disk.", | ||
| ) | ||
| build_stage: BuildStage = Field( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for build_stage anymore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sweet, this is a neat approach!
| from data_designer.engine.configurable_task import ConfigurableTask, DataT, TaskConfigT | ||
|
|
||
|
|
||
| class Processor(ConfigurableTask[TaskConfigT], ABC): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main change - three different methods
Greptile OverviewGreptile SummaryRefactors processor system from config-based stages to callback-based design. Processors now define behavior by implementing Key changes:
Issues found:
|
| Filename | Overview |
|---|---|
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/processor_runner.py | New file implementing callback-based processor execution. Clean separation of concerns with stage enum. Row-count change logging handles all stages automatically. Empty DataFrame edge case writes one empty file intentionally for consistency. |
| packages/data-designer-engine/src/data_designer/engine/processing/processors/base.py | Clean callback-based design with three no-op default methods. implements() uses method resolution to detect overrides correctly. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py | Refactored to use ProcessorRunner. Pre-batch runs after seed generation, post-batch runs in _run_batch(), after-generation runs in build(). Removed stage-based dict, now uses simple processor list. |
| packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/dataset_batch_manager.py | Added replace_buffer() method to support PRE_BATCH processors that change row count. Updates both buffer and batch size tracking. |
Sequence Diagram
sequenceDiagram
participant Builder as ColumnWiseBuilder
participant Runner as ProcessorRunner
participant BatchMgr as DatasetBatchManager
participant Processor as Processor(s)
participant Storage as ArtifactStorage
Note over Builder: build(num_records)
Builder->>Builder: Initialize generators
Builder->>BatchMgr: start(num_records, buffer_size)
loop For each batch
Builder->>Builder: _run_batch()
Note over Builder: Generate seed columns
Builder->>Runner: run_pre_batch(batch_manager)
Runner->>BatchMgr: get_current_batch(as_dataframe=True)
BatchMgr-->>Runner: DataFrame
Runner->>Processor: process_before_batch(df)
Processor-->>Runner: Modified DataFrame
Runner->>BatchMgr: replace_buffer(records)
Note over Builder: Generate dependent columns
Builder->>Runner: run_post_batch(df, batch_number)
Runner->>Processor: process_after_batch(df, batch_number)
Processor-->>Runner: Modified DataFrame
Runner-->>Builder: Processed DataFrame
Builder->>BatchMgr: update_records(processed_df)
Builder->>BatchMgr: write() + finish_batch()
end
Builder->>BatchMgr: finish()
Builder->>Runner: run_after_generation(batch_size)
Runner->>Storage: load_dataset()
Storage-->>Runner: Complete DataFrame
Runner->>Processor: process_after_generation(df)
Processor-->>Runner: Final DataFrame
Runner->>Storage: rmtree(final_dataset_path)
loop Re-chunk by batch_size
Runner->>Storage: write_batch_to_parquet_file()
end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
6 files reviewed, 3 comments
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Outdated
Show resolved
Hide resolved
...s/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py
Outdated
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
7 files reviewed, 4 comments
...ages/data-designer-engine/src/data_designer/engine/processing/processors/schema_transform.py
Show resolved
Hide resolved
Additional Comments (3)
Prompt To Fix With AIThis is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Line: 438:444
Comment:
**Leaking DuckDB connection**
`_run_pre_generation_processors()` creates a DuckDB connection via `seed_reader.create_duckdb_connection()` and never closes it. If a build runs many times in-process (service mode/tests), this will leak connections/file handles. Close the connection (e.g., `try/finally: conn.close()`), or use a context manager if the seed reader supports it.
How can I resolve this? If you propose a fix, please make it concise.
Prompt To Fix With AIThis is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Line: 448:455
Comment:
**seed_row_indices not updated**
`_run_pre_generation_processors()` only sets `resource_provider.seed_row_indices` when `len(df) != original_len`. If preprocess reorders rows or filters+adds back to the same length, `seed_row_indices` stays unset and the generator will read unfiltered rows even though preprocess changed the seed selection. Track indices whenever preprocess runs (or whenever `_dd_rowid` changes), not only on length change.
How can I resolve this? If you propose a fix, please make it concise.
Prompt To Fix With AIThis is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Line: 486:496
Comment:
**Postprocess changes may be lost**
`_run_post_generation_processors()` only rewrites the final dataset when row count or column *names/order* change. If `postprocess()` modifies values in-place (same rows/columns), those changes won't be persisted and will be silently dropped. This is a functional correctness issue for value-only postprocessors; consider always rewriting when processors exist, or require postprocessors to return a new df and detect `df is not original_df`/hashing.
How can I resolve this? If you propose a fix, please make it concise. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
6 files reviewed, 3 comments
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Outdated
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Outdated
Show resolved
Hide resolved
...s/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py
Outdated
Show resolved
Hide resolved
...s/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
6 files reviewed, 1 comment
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Outdated
Show resolved
Hide resolved
a1323f9 to
46880e7
Compare
| |-------|--------------|-----------------|-----------| | ||
| | Pre-generation | Once, on full seed data before batching | `preprocess()` | Filter seed data, validate inputs, normalize data | | ||
| | Pre-batch | After seed columns, before dependent columns | `process_before_batch()` | Transform seed data before other columns are generated | | ||
| | Post-batch | After each batch completes | `process_after_batch()` | Drop columns, transform schema per batch | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Somehow in by head dropping columns needs to happen post generation, BUT the key point is that each batch contains the full schema of the dataset. Might be good to have a tip (or something) box that highlights this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added an info box to the docs. dropping columns post-batch is probably better than post-generation since we don't need to write dropped columns to disk.
packages/data-designer-config/src/data_designer/config/processors.py
Outdated
Show resolved
Hide resolved
...s/data-designer-engine/src/data_designer/engine/column_generators/generators/seed_dataset.py
Outdated
Show resolved
Hide resolved
- Move imports to top of file - Add seed_data_setup and builder_with_seed fixtures - Add create_mock_processor helper function - Add edge case tests for exceptions, no-op processors, ordering
- Move all processor stage logic to new ProcessorRunner in utils/ - ProcessorRunner takes dependencies and provides complete stage methods - Builder now calls runner methods directly instead of wrapper methods - Remove unused imports from column_wise_builder
- Add ProcessorStage enum to replace raw strings in processor_runner - Rename _processors property to public processors + set_processor_runner - Fix docstring to include process_before_batch callback - Add info box to processors docs about full schema in batches - Add clarifying comment about pre-batch processing location Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
Add replace_records() to DatasetBatchManager that replaces the buffer without requiring matching length. Use it in run_pre_batch() so processors that filter or expand rows don't crash.
- Restore schema transform preview write so processor artifacts are available in preview mode (read back by data_designer.py) - Return tuple from processors property to prevent mutation
…eration - Remove preprocess() callback, run_preprocess(), cleanup_preprocessed_seed(), preprocessed_seed_uri, and all related plumbing - Rename postprocess() to process_after_generation() across base class, ProcessorRunner, builder, config enums, docs, and tests - Drop resource_provider parameter from ProcessorRunner (no longer needed)
- Remove dead BuildStage enum and fix wrong return type on get_processor_configs() - Log row count delta instead of absolute values in PRE_BATCH - Preserve batch partitioning in run_after_generation() - Consolidate and simplify processor tests
21f8e8b to
cd7031e
Compare
|
@johnnygreco @nabinchha removed preprocessor following our discussion offline |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
17 files reviewed, 1 comment
...ges/data-designer-engine/src/data_designer/engine/dataset_builders/utils/processor_runner.py
Outdated
Show resolved
Hide resolved
Avoids empty parquet files when processors reduce row count below the original file count. Re-chunks by buffer_size instead of matching the number of input files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
17 files reviewed, 1 comment
...ges/data-designer-engine/src/data_designer/engine/dataset_builders/utils/processor_runner.py
Show resolved
Hide resolved
...ata-designer-engine/src/data_designer/engine/dataset_builders/utils/dataset_batch_manager.py
Outdated
Show resolved
Hide resolved
...ges/data-designer-engine/src/data_designer/engine/dataset_builders/utils/processor_runner.py
Outdated
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Outdated
Show resolved
Hide resolved
packages/data-designer-engine/src/data_designer/engine/dataset_builders/column_wise_builder.py
Outdated
Show resolved
Hide resolved
- Rename replace_records to replace_buffer - Move row-count-change logging into _run_stage for all stages - Expose processors as public property on ProcessorRunner
Both pre-batch and post-batch now run inside _run_batch, keeping the full batch lifecycle (generate, process, write, finish) in one place. Preview mode skips post-batch via current_batch_number=None guard.
a8715dc to
403bc69
Compare
…ocessor-plugins # Conflicts: # packages/data-designer-engine/tests/engine/dataset_builders/test_column_wise_builder.py
403bc69 to
a61848e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
17 files reviewed, 1 comment
...ges/data-designer-engine/src/data_designer/engine/dataset_builders/utils/processor_runner.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
17 files reviewed, 1 comment
| df = self._run_stage(df, ProcessorStage.AFTER_GENERATION) | ||
|
|
||
| shutil.rmtree(self._artifact_storage.final_dataset_path) | ||
| for i in range(0, max(len(df), 1), batch_size): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when AFTER_GENERATION processor filters dataset to 0 rows, max(len(df), 1) forces one iteration → writes one empty parquet file. consider early return if len(df) == 0 to avoid empty output files.
Prompt To Fix With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/src/data_designer/engine/dataset_builders/utils/processor_runner.py
Line: 98:98
Comment:
when AFTER_GENERATION processor filters dataset to 0 rows, `max(len(df), 1)` forces one iteration → writes one empty parquet file. consider early return if `len(df) == 0` to avoid empty output files.
How can I resolve this? If you propose a fix, please make it concise.
Summary
Refactors the processor system from stage-based configuration to a callback-based design. Processors define behavior by implementing callback methods instead of setting a
build_stagefield in config. The preprocessor stage has been removed in favor of the simpler pre-batch approach.Changes
Added
Processorbase class with default no-op implementations:process_before_batch()- after seed columns, before dependent columnsprocess_after_batch()- after each batch completesprocess_after_generation()- on final combined dataset after all batchesimplements()method to check if a processor overrides a callbackProcessorRunnerclass encapsulating all processor execution logic with publicprocessorspropertyreplace_buffer()onDatasetBatchManagerfor PRE_BATCH processors that change row count_run_stagefires for all stages automaticallyChanged
ColumnWiseBuilderinto dedicatedProcessorRunner_run_batch, keeping the full batch lifecycle in one placerun_after_generation(batch_size)re-chunks output bybatch_sizeto avoid empty parquet files when processors reduce row countProcessorRunneronly depends onArtifactStorage(notResourceProvider)docs/concepts/processors.mdwith 3-stage callback designRemoved
preprocess()stage and all associated plumbing (preprocessed_seed_uri,cleanup_preprocessed_seed, disk I/O for seed preprocessing)BuildStageenum and fixed wrong return type onget_processor_configs()build_stagefield fromProcessorConfigAttention Areas
processor_runner.py- Core processor execution logic with batch-size-based chunking inrun_after_generationcolumn_wise_builder.py#_run_batch- Full batch lifecycle: generate → post-batch process → write → finishdataset_batch_manager.py-replace_buffer()for PRE_BATCH row-count changes (note: overlaps withupdate_records(allow_resize=True)in feat: add allow_resize for 1:N and N:1 generation patterns #286 - should unify when both land)Description updated with AI