Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import static org.apache.fluss.testutils.DataTestUtils.row;
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.catchThrowable;

Expand Down Expand Up @@ -1022,6 +1023,43 @@ void testCancelRebalance() throws Exception {
guestAdmin.cancelRebalance(null).get();
}

@Test
void testRebalanceDuringConcurrentTableCreation() throws Exception {
// Setup WRITE permission on the cluster for the guest user to allow rebalance operations.
rootAdmin
.createAcls(
Collections.singletonList(
new AclBinding(
Resource.cluster(),
new AccessControlEntry(
guestPrincipal,
WILD_CARD_HOST,
OperationType.WRITE,
PermissionType.ALLOW))))
.all()
.get();

// Run multiple iterations to catch potential race conditions between
// table creation events and rebalance plan generation.
// Locally verified with 50+ iterations without failures.
for (int i = 0; i < 5; i++) {
TablePath transientTable = TablePath.of("test_db_1", "transient_rebalance_table_" + i);

// Trigger table creation. We do not wait for the table to be "ready"
// to maximize the chance of the rebalancer encountering transient metadata.
rootAdmin.createTable(transientTable, DATA1_TABLE_DESCRIPTOR_PK, false);

// Attempt to rebalance the cluster.
// This verifies that the rebalance operation is robust against transient table states
// (e.g., leader elected but not yet present in the assignment list) and does not fail.
assertThatCode(() -> guestAdmin.rebalance(Collections.emptyList()).get())
.doesNotThrowAnyException();

// Cleanup the table for the next iteration.
rootAdmin.dropTable(transientTable, true).get();
}
}

// ------------------------------------------------------------------------
// Producer Offsets Authorization Tests
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,11 @@ private ClusterModel buildClusterModel(CoordinatorContext coordinatorContext) {
checkArgument(bucketLeaderAndIsrOpt.isPresent(), "Bucket leader and isr is empty.");
LeaderAndIsr isr = bucketLeaderAndIsrOpt.get();
int leader = isr.leader();
// Skip the bucket if it is in a transient state (e.g., during table creation)
// where the leader is elected but not yet present in the assignment list.
if (leader == -1 || !assignment.contains(leader)) {
continue;
}
for (int i = 0; i < assignment.size(); i++) {
int replica = assignment.get(i);
clusterModel.createReplica(replica, tableBucket, i, leader == replica);
Expand Down