diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index 2538a472a8..efb3c67963 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -440,6 +440,16 @@ static void setReadLockEnabled(RaftProperties properties, boolean readLockEnable setBoolean(properties::setBoolean, READ_LOCK_ENABLED_KEY, readLockEnabled); } + String APPEND_ENTRIES_COMPOSE_ENABLED_KEY = PREFIX + ".append-entries.compose.enabled"; + boolean APPEND_ENTRIES_COMPOSE_ENABLED_DEFAULT = true; + static boolean appendEntriesComposeEnabled(RaftProperties properties) { + return getBoolean(properties::getBoolean, + APPEND_ENTRIES_COMPOSE_ENABLED_KEY, APPEND_ENTRIES_COMPOSE_ENABLED_DEFAULT, getDefaultLog()); + } + static void setAppendEntriesComposeEnabled(RaftProperties properties, boolean enabled) { + setBoolean(properties::setBoolean, APPEND_ENTRIES_COMPOSE_ENABLED_KEY, enabled); + } + /** * Besides the open segment, the max number of segments caching log entries. */ diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 60f72e001e..846b87702f 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -81,7 +81,6 @@ import org.apache.ratis.server.RaftServerRpc; import org.apache.ratis.server.impl.LeaderElection.Phase; import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry; -import org.apache.ratis.server.impl.ServerImplUtils.ConsecutiveIndices; import org.apache.ratis.server.impl.ServerImplUtils.NavigableIndices; import org.apache.ratis.server.leader.LeaderState.StepDownReason; import org.apache.ratis.server.metrics.LeaderElectionMetrics; @@ -133,7 +132,6 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -260,8 +258,7 @@ public long[] getFollowerMatchIndices() { private final AtomicBoolean firstElectionSinceStartup = new AtomicBoolean(true); private final ThreadGroup threadGroup; - private final AtomicReference> appendLogFuture; - private final NavigableIndices appendLogTermIndices = new NavigableIndices(); + private final NavigableIndices appendLogTermIndices; RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy, RaftStorage.StartupOption option) throws IOException { @@ -296,7 +293,8 @@ public long[] getFollowerMatchIndices() { this.transferLeadership = new TransferLeadership(this, properties); this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this); this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, properties); - this.appendLogFuture = new AtomicReference<>(CompletableFuture.completedFuture(null)); + this.appendLogTermIndices = RaftServerConfigKeys.Log.appendEntriesComposeEnabled(properties) ? + new NavigableIndices() : null; this.serverExecutor = ConcurrentUtils.newThreadPoolWithMax( RaftServerConfigKeys.ThreadPool.serverCached(properties), @@ -1620,7 +1618,8 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde state.updateConfiguration(entries); } future.join(); - final CompletableFuture appendLog = entries.isEmpty()? CompletableFuture.completedFuture(null) + final CompletableFuture appendFuture = entries.isEmpty()? CompletableFuture.completedFuture(null) + : appendLogTermIndices != null ? appendLogTermIndices.append(entries, this::appendLog) : appendLog(entries); proto.getCommitInfosList().forEach(commitInfoCache::update); @@ -1636,7 +1635,7 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size()); final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex(); - return appendLog.whenCompleteAsync((r, t) -> { + return appendFuture.whenCompleteAsync((r, t) -> { followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE)); timer.stop(); }, getServerExecutor()).thenApply(v -> { @@ -1654,16 +1653,8 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde }); } private CompletableFuture appendLog(List entries) { - final List entriesTermIndices = ConsecutiveIndices.convert(entries); - if (!appendLogTermIndices.append(entriesTermIndices)) { - // index already exists, return the last future - return appendLogFuture.get(); - } - - - return appendLogFuture.updateAndGet(f -> f.thenComposeAsync( - ignored -> JavaUtils.allOf(state.getLog().append(entries)), serverExecutor)) - .whenComplete((v, e) -> appendLogTermIndices.removeExisting(entriesTermIndices)); + return CompletableFuture.completedFuture(null) + .thenComposeAsync(dummy -> JavaUtils.allOf(state.getLog().append(entries)), serverExecutor); } private long checkInconsistentAppendEntries(TermIndex previous, List entries) { @@ -1690,7 +1681,9 @@ private long checkInconsistentAppendEntries(TermIndex previous, List map = new TreeMap<>(); + private final AtomicReference> future + = new AtomicReference<>(CompletableFuture.completedFuture(null)); boolean contains(TermIndex ti) { final Long term = getTerm(ti.getIndex()); @@ -137,7 +142,15 @@ synchronized Long getTerm(long index) { return floorEntry.getValue().getTerm(index); } - synchronized boolean append(List entriesTermIndices) { + CompletableFuture append(List entries, + Function, CompletableFuture> appendLog) { + final List entriesTermIndices = ConsecutiveIndices.convert(entries); + return alreadyExists(entriesTermIndices) ? future.get() + : future.updateAndGet(f -> f.thenComposeAsync(ignored -> appendLog.apply(entries))) + .whenComplete((v, e) -> removeExisting(entriesTermIndices)); + } + + private synchronized boolean alreadyExists(List entriesTermIndices) { for(int i = 0; i < entriesTermIndices.size(); i++) { final ConsecutiveIndices indices = entriesTermIndices.get(i); final ConsecutiveIndices previous = map.put(indices.startIndex, indices); @@ -147,10 +160,10 @@ synchronized boolean append(List entriesTermIndices) { for(int j = 0; j < i; j++) { map.remove(entriesTermIndices.get(j).startIndex); } - return false; + return true; } } - return true; + return false; } synchronized void removeExisting(List entriesTermIndices) {