Skip to content

Conversation

@platinumhamburg
Copy link
Contributor

Core changes:

  • Add ProducerSnapshotManager for lifecycle management with atomic registration
  • Add ProducerSnapshotStore for ZK + remote storage operations
  • Add tryRegisterProducerSnapshot in ZooKeeperClient for atomic check-and-create
  • Add Admin API: registerProducerOffsets, getProducerOffsets, deleteProducerOffsets
  • Add configurable TTL and cleanup interval for producer snapshots

Design highlights:

  • Atomic registration via ZK's NodeExistsException handling
  • Eventually consistent: ZK as commit point, orphan files cleaned periodically
  • UUID-based file naming prevents concurrent upload conflicts

Tests:

  • ProducerSnapshotManagerTest: lifecycle, expiration, concurrent atomicity
  • ProducerSnapshotJsonSerdeTest: JSON format compatibility

Purpose

Linked issue: close #2433

Brief change log

Tests

API and Format

Documentation

This commit introduces the Producer Offset Snapshot feature to support
exactly-once semantics in Fluss. The feature allows producers (e.g., Flink
jobs) to register their offset snapshots for recovery purposes.

Main changes:
- Add ProducerSnapshotManager for lifecycle management of producer snapshots
- Add ProducerSnapshotStore for low-level storage operations (ZK + remote FS)
- Add Admin APIs: registerProducerOffsets, getProducerOffsets, deleteProducerOffsets
- Add RetryUtils for IO operations with exponential backoff
- Add configuration options for snapshot TTL and cleanup interval

Code quality improvements:
- Remove ProducerSnapshotResultCodes class, use RegisterResult enum directly
- Fix RetryUtils interrupt handling to preserve thread interrupt status
- Add comprehensive tests for ProducerSnapshotManager including concurrency tests
- Add interrupt handling tests for RetryUtils
Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @platinumhamburg , I left some comments.

- Rename 'producer-snapshot' config options to 'producer-offsets' for clarity
- Refactor RegisterProducerOffsetsRequest to use PbProducerTableOffsets
  instead of flat PbTableBucketOffset for better organization by table
- Move RegisterResult enum from fluss-rpc to fluss-client as public API
- Add producer ID validation using TablePath.detectInvalidName()
- Improve authorization: check per-table permissions instead of cluster-level
- Add version-based delete in cleanup to avoid race conditions
- Use FlussPaths for consistent remote storage path generation
- Add FileNotFoundException handling for non-retryable file errors
- Extract RPC message conversion utilities to ClientRpcMessageUtils
  and ServerRpcMessageUtils
- Add comprehensive unit tests for producer ID validation
Rename classes and corresponding variable declarations:
- ProducerSnapshot -> ProducerOffsets
- ProducerSnapshotJsonSerde -> ProducerOffsetsJsonSerde
- ProducerSnapshotStore -> ProducerOffsetsStore
- ProducerSnapshotManager -> ProducerOffsetsManager

Update variable names in:
- CoordinatorService: producerSnapshotManager -> producerOffsetsManager
- ZooKeeperClient: class type references updated
- ZkData: class type references updated

Test classes renamed accordingly.
@platinumhamburg
Copy link
Contributor Author

Thanks @wuchong for the detailed review. I’ve addressed all the comments above, please take another look when you have time.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@platinumhamburg the updated changes looks good to me in general. I left some final comments. Besides, I appended a commit to improve renaming and comments a bit.

- Remove duplicated RegisterResult class from fluss-common, use magic values
  (0=CREATED, 1=ALREADY_EXISTS) with comments aligning to client RegisterResult enum
- Introduce InvalidProducerIdException (ApiException) for producer ID validation
- Register InvalidProducerIdException in Errors enum (code 63)
- Re-throw ApiExceptions as-is in producer offsets APIs to preserve exception types
- Wrap non-ApiExceptions with UnknownServerException instead of RuntimeException
- Add authorization tests for producer offsets operations in FlussAuthorizationITCase
@platinumhamburg
Copy link
Contributor Author

@wuchong Thanks for your detailed review and suggestions. I’ve addressed all the comments above—please take a look when you have time.

The deleteProducerOffsets and getProducerOffsets methods were calling
authorizeTable() inside CompletableFuture.supplyAsync(), which runs on
a different thread (ioExecutor). Since currentSession() relies on
thread-local storage, it fails with 'No session set' error when called
from the async thread.

Fix:
- Capture session before entering async block
- Add authorizeTableWithSession() method that accepts explicit session
- Refactor authorizeTable(OperationType, long) to delegate to the new
  method, eliminating code duplication
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Server] Add Producer Offset Snapshot Registry as Infrastructure for Exactly-Once Semantics

2 participants