From ffa18151f40495b576ae26e98319f1ee5596b43a Mon Sep 17 00:00:00 2001 From: Sergey Soldatov Date: Fri, 23 Jan 2026 09:37:36 -0800 Subject: [PATCH 1/2] RATIS-2387. Performance degradation after RATIS-2235 --- .../ratis/server/impl/RaftServerImpl.java | 45 ++++++++++++++++--- 1 file changed, 38 insertions(+), 7 deletions(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 60f72e001e..94bd8ff107 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -119,6 +119,7 @@ import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -260,8 +261,10 @@ public long[] getFollowerMatchIndices() { private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true); private final ThreadGroup threadGroup; + // Conditional fields for appendLog synchronization (RATIS-2235) private final AtomicReference> appendLogFuture; - private final NavigableIndices appendLogTermIndices = new NavigableIndices(); + private final NavigableIndices appendLogTermIndices; + private final boolean appendEntriesSynchronized; RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option) throws IOException { @@ -296,7 +299,21 @@ public long[] getFollowerMatchIndices() { this.transferLeadership = new TransferLeadership(this, properties); this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this); this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, properties); - this.appendLogFuture = new AtomicReference<>(CompletableFuture.completedFuture(null)); + + // Initialize appendLog synchronization components conditionally (RATIS-2235) + // Use system property for single-file configuration (avoids updating ratis-server-api jar) + this.appendEntriesSynchronized = Boolean.parseBoolean( + System.getProperty("raft.server.log.append.entries.synchronized", "true")); + LOG.info("{}: appendLog synchronization mode: {}", getMemberId(), + appendEntriesSynchronized ? "synchronized" : "parallel"); + + if (appendEntriesSynchronized) { + this.appendLogFuture = new AtomicReference<>(CompletableFuture.completedFuture(null)); + this.appendLogTermIndices = new NavigableIndices(); + } else { + this.appendLogFuture = null; + this.appendLogTermIndices = null; + } this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax( RaftServerConfigKeys.ThreadPool.serverCached(properties), @@ -1620,8 +1637,16 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde state.updateConfiguration(entries); } future.join(); - final CompletableFuture appendLog = entries.isEmpty()? CompletableFuture.completedFuture(null) - : appendLog(entries); + + // Conditional appendLog based on configuration (RATIS-2235) + final CompletableFuture appendOperation; + if (appendEntriesSynchronized && !entries.isEmpty()) { + appendOperation = appendLogSynchronized(entries); + } else { + final List> futures = entries.isEmpty() ? Collections.emptyList() + : state.getLog().append(entries); + appendOperation = JavaUtils.allOf(futures); + } proto.getCommitInfosList().forEach(commitInfoCache::update); @@ -1636,7 +1661,7 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size()); final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex(); - return appendLog.whenCompleteAsync((r, t) -> { + return appendOperation.whenCompleteAsync((r, t) -> { followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)); timer.stop(); }, getServerExecutor()).thenApply(v -> { @@ -1653,7 +1678,12 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde return reply; }); } - private CompletableFuture appendLog(List entries) { + + /** + * Synchronized appendLog operation to ensure only one thread performs appendLog at a time. + * This is the RATIS-2235 implementation that can be enabled via configuration. + */ + private CompletableFuture appendLogSynchronized(List entries) { final List entriesTermIndices = ConsecutiveIndices.convert(entries); if (!appendLogTermIndices.append(entriesTermIndices)) { // index already exists, return the last future @@ -1690,7 +1720,8 @@ private long checkInconsistentAppendEntries(TermIndex previous, List Date: Wed, 28 Jan 2026 20:04:55 -0800 Subject: [PATCH 2/2] Suggested changes by Nicholas. --- .../ratis/server/RaftServerConfigKeys.java | 10 ++++ .../ratis/server/impl/RaftServerImpl.java | 60 ++++--------------- .../ratis/server/impl/ServerImplUtils.java | 19 +++++- 3 files changed, 37 insertions(+), 52 deletions(-) diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index 2538a472a8..efb3c67963 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -440,6 +440,16 @@ static void setReadLockEnabled(RaftProperties properties, boolean readLockEnable setBoolean(properties::setBoolean, READ_LOCK_ENABLED_KEY, readLockEnabled); } + String APPEND_ENTRIES_COMPOSE_ENABLED_KEY = PREFIX + ".append-entries.compose.enabled"; + boolean APPEND_ENTRIES_COMPOSE_ENABLED_DEFAULT = true; + static boolean appendEntriesComposeEnabled(RaftProperties properties) { + return getBoolean(properties::getBoolean, + APPEND_ENTRIES_COMPOSE_ENABLED_KEY, APPEND_ENTRIES_COMPOSE_ENABLED_DEFAULT, getDefaultLog()); + } + static void setAppendEntriesComposeEnabled(RaftProperties properties, boolean enabled) { + setBoolean(properties::setBoolean, APPEND_ENTRIES_COMPOSE_ENABLED_KEY, enabled); + } + /** * Besides the open segment, the max number of segments caching log entries. */ diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 94bd8ff107..846b87702f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -81,7 +81,6 @@ import org.apache.ratis.server.RaftServerRpc; import org.apache.ratis.server.impl.LeaderElection.Phase; import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry; -import org.apache.ratis.server.impl.ServerImplUtils.ConsecutiveIndices; import org.apache.ratis.server.impl.ServerImplUtils.NavigableIndices; import org.apache.ratis.server.leader.LeaderState.StepDownReason; import org.apache.ratis.server.metrics.LeaderElectionMetrics; @@ -119,7 +118,6 @@ import java.nio.file.NoSuchFileException; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -134,7 +132,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -261,10 +258,7 @@ public long[] getFollowerMatchIndices() { private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true); private final ThreadGroup threadGroup; - // Conditional fields for appendLog synchronization (RATIS-2235) - private final AtomicReference> appendLogFuture; private final NavigableIndices appendLogTermIndices; - private final boolean appendEntriesSynchronized; RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option) throws IOException { @@ -299,21 +293,8 @@ public long[] getFollowerMatchIndices() { this.transferLeadership = new TransferLeadership(this, properties); this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this); this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, properties); - - // Initialize appendLog synchronization components conditionally (RATIS-2235) - // Use system property for single-file configuration (avoids updating ratis-server-api jar) - this.appendEntriesSynchronized = Boolean.parseBoolean( - System.getProperty("raft.server.log.append.entries.synchronized", "true")); - LOG.info("{}: appendLog synchronization mode: {}", getMemberId(), - appendEntriesSynchronized ? "synchronized" : "parallel"); - - if (appendEntriesSynchronized) { - this.appendLogFuture = new AtomicReference<>(CompletableFuture.completedFuture(null)); - this.appendLogTermIndices = new NavigableIndices(); - } else { - this.appendLogFuture = null; - this.appendLogTermIndices = null; - } + this.appendLogTermIndices = RaftServerConfigKeys.Log.appendEntriesComposeEnabled(properties) ? + new NavigableIndices() : null; this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax( RaftServerConfigKeys.ThreadPool.serverCached(properties), @@ -1637,16 +1618,9 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde state.updateConfiguration(entries); } future.join(); - - // Conditional appendLog based on configuration (RATIS-2235) - final CompletableFuture appendOperation; - if (appendEntriesSynchronized && !entries.isEmpty()) { - appendOperation = appendLogSynchronized(entries); - } else { - final List> futures = entries.isEmpty() ? Collections.emptyList() - : state.getLog().append(entries); - appendOperation = JavaUtils.allOf(futures); - } + final CompletableFuture appendFuture = entries.isEmpty()? CompletableFuture.completedFuture(null) + : appendLogTermIndices != null ? appendLogTermIndices.append(entries, this::appendLog) + : appendLog(entries); proto.getCommitInfosList().forEach(commitInfoCache::update); @@ -1661,7 +1635,7 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size()); final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex(); - return appendOperation.whenCompleteAsync((r, t) -> { + return appendFuture.whenCompleteAsync((r, t) -> { followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)); timer.stop(); }, getServerExecutor()).thenApply(v -> { @@ -1678,22 +1652,9 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde return reply; }); } - - /** - * Synchronized appendLog operation to ensure only one thread performs appendLog at a time. - * This is the RATIS-2235 implementation that can be enabled via configuration. - */ - private CompletableFuture appendLogSynchronized(List entries) { - final List entriesTermIndices = ConsecutiveIndices.convert(entries); - if (!appendLogTermIndices.append(entriesTermIndices)) { - // index already exists, return the last future - return appendLogFuture.get(); - } - - - return appendLogFuture.updateAndGet(f -> f.thenComposeAsync( - ignored -> JavaUtils.allOf(state.getLog().append(entries)), serverExecutor)) - .whenComplete((v, e) -> appendLogTermIndices.removeExisting(entriesTermIndices)); + private CompletableFuture appendLog(List entries) { + return CompletableFuture.completedFuture(null) + .thenComposeAsync(dummy -> JavaUtils.allOf(state.getLog().append(entries)), serverExecutor); } private long checkInconsistentAppendEntries(TermIndex previous, List entries) { @@ -1720,7 +1681,8 @@ private long checkInconsistentAppendEntries(TermIndex previous, List map = new TreeMap<>(); + private final AtomicReference> future + = new AtomicReference<>(CompletableFuture.completedFuture(null)); boolean contains(TermIndex ti) { final Long term = getTerm(ti.getIndex()); @@ -137,7 +142,15 @@ synchronized Long getTerm(long index) { return floorEntry.getValue().getTerm(index); } - synchronized boolean append(List entriesTermIndices) { + CompletableFuture append(List entries, + Function, CompletableFuture> appendLog) { + final List entriesTermIndices = ConsecutiveIndices.convert(entries); + return alreadyExists(entriesTermIndices) ? future.get() + : future.updateAndGet(f -> f.thenComposeAsync(ignored -> appendLog.apply(entries))) + .whenComplete((v, e) -> removeExisting(entriesTermIndices)); + } + + private synchronized boolean alreadyExists(List entriesTermIndices) { for(int i = 0; i < entriesTermIndices.size(); i++) { final ConsecutiveIndices indices = entriesTermIndices.get(i); final ConsecutiveIndices previous = map.put(indices.startIndex, indices); @@ -147,10 +160,10 @@ synchronized boolean append(List entriesTermIndices) { for(int j = 0; j < i; j++) { map.remove(entriesTermIndices.get(j).startIndex); } - return false; + return true; } } - return true; + return false; } synchronized void removeExisting(List entriesTermIndices) {