From 8ea7478d006f7656f0cc7edd156a46c7be09647d Mon Sep 17 00:00:00 2001 From: Zubo Date: Sun, 25 Jan 2026 23:02:44 +0100 Subject: [PATCH] [FLUSS-2473][server][test] Fix flaky test FlussAuthorizationITCase.testRebalance --- .../acl/FlussAuthorizationITCase.java | 38 +++++++++++++++++++ .../rebalance/RebalanceManager.java | 5 +++ 2 files changed, 43 insertions(+) diff --git a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java index 89c64e5caa..fdb5e3015a 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java @@ -102,6 +102,7 @@ import static org.apache.fluss.testutils.DataTestUtils.row; import static org.apache.fluss.testutils.common.CommonTestUtils.retry; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.catchThrowable; @@ -1022,6 +1023,43 @@ void testCancelRebalance() throws Exception { guestAdmin.cancelRebalance(null).get(); } + @Test + void testRebalanceDuringConcurrentTableCreation() throws Exception { + // Setup WRITE permission on the cluster for the guest user to allow rebalance operations. + rootAdmin + .createAcls( + Collections.singletonList( + new AclBinding( + Resource.cluster(), + new AccessControlEntry( + guestPrincipal, + WILD_CARD_HOST, + OperationType.WRITE, + PermissionType.ALLOW)))) + .all() + .get(); + + // Run multiple iterations to catch potential race conditions between + // table creation events and rebalance plan generation. + // Locally verified with 50+ iterations without failures. + for (int i = 0; i < 5; i++) { + TablePath transientTable = TablePath.of("test_db_1", "transient_rebalance_table_" + i); + + // Trigger table creation. We do not wait for the table to be "ready" + // to maximize the chance of the rebalancer encountering transient metadata. + rootAdmin.createTable(transientTable, DATA1_TABLE_DESCRIPTOR_PK, false); + + // Attempt to rebalance the cluster. + // This verifies that the rebalance operation is robust against transient table states + // (e.g., leader elected but not yet present in the assignment list) and does not fail. + assertThatCode(() -> guestAdmin.rebalance(Collections.emptyList()).get()) + .doesNotThrowAnyException(); + + // Cleanup the table for the next iteration. + rootAdmin.dropTable(transientTable, true).get(); + } + } + // ------------------------------------------------------------------------ // Producer Offsets Authorization Tests // ------------------------------------------------------------------------ diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java index 9d45dc2346..30cf40bd39 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManager.java @@ -365,6 +365,11 @@ private ClusterModel buildClusterModel(CoordinatorContext coordinatorContext) { checkArgument(bucketLeaderAndIsrOpt.isPresent(), "Bucket leader and isr is empty."); LeaderAndIsr isr = bucketLeaderAndIsrOpt.get(); int leader = isr.leader(); + // Skip the bucket if it is in a transient state (e.g., during table creation) + // where the leader is elected but not yet present in the assignment list. + if (leader == -1 || !assignment.contains(leader)) { + continue; + } for (int i = 0; i < assignment.size(); i++) { int replica = assignment.get(i); clusterModel.createReplica(replica, tableBucket, i, leader == replica);