diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index 5edd5b3e8c92..ea1ff83d96ae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -283,4 +283,35 @@ public int getTimeout() {
* @throws IllegalStateException if this service's state isn't FAILED.
*/
Throwable failureCause();
+
+ /**
+ * Buffered replication endpoints can use this to indicate when staged WAL data should be flushed
+ * before offsets are persisted.
+ *
+ * The default value {@code -1L} indicates that offsets may be persisted immediately after a
+ * {@link WALEntryBatch} is replicated.
+ *
+ */
+ default long getMaxBufferSize() {
+ return -1L;
+ }
+
+ /**
+ * Buffered replication endpoints can use this to indicate the maximum time (in milliseconds) that
+ * WAL data may remain staged before offsets are persisted.
+ *
+ * The default value {@link Long#MAX_VALUE} indicates that offsets may be persisted without any
+ * time-based flush constraint.
+ *
+ */
+ default long maxFlushInterval() {
+ return Long.MAX_VALUE;
+ }
+
+ /**
+ * Hook invoked before persisting replication offsets. Eg: Buffered endpoints can flush/close WALs
+ * here.
+ */
+ default void beforePersistingReplicationOffset() {
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 4709e607fc70..57e34dfd8bd3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -21,6 +21,7 @@
import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -74,6 +75,10 @@ public enum WorkerState {
private final int DEFAULT_TIMEOUT = 20000;
private final int getEntriesTimeout;
private final int shipEditsTimeout;
+ private long stagedWalSize = 0L;
+ private long lastStagedFlushTs = EnvironmentEdgeManager.currentTime();
+ private WALEntryBatch lastShippedBatch;
+ private final List entriesForCleanUpHFileRefs = new ArrayList<>();
public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source,
ReplicationSourceWALReader walReader) {
@@ -98,6 +103,14 @@ public final void run() {
LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId);
// Loop until we close down
while (isActive()) {
+ // Whether to persist replication offsets based on size/time thresholds
+ if (shouldPersistLogPosition()) {
+ try {
+ persistLogPosition();
+ } catch (IOException e) {
+ LOG.warn("Exception while persisting replication state", e);
+ }
+ }
// Sleep until replication is enabled again
if (!source.isPeerEnabled()) {
// The peer enabled check is in memory, not expensive, so do not need to increase the
@@ -155,7 +168,12 @@ private void shipEdits(WALEntryBatch entryBatch) {
List entries = entryBatch.getWalEntries();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
- updateLogPosition(entryBatch);
+ lastShippedBatch = entryBatch;
+ try {
+ persistLogPosition();
+ } catch (IOException e) {
+ LOG.warn("Exception while persisting replication state", e);
+ }
return;
}
int currentSize = (int) entryBatch.getHeapSize();
@@ -190,13 +208,13 @@ private void shipEdits(WALEntryBatch entryBatch) {
} else {
sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
}
- // Clean up hfile references
- for (Entry entry : entries) {
- cleanUpHFileRefs(entry.getEdit());
- LOG.trace("shipped entry {}: ", entry);
+
+ stagedWalSize += currentSize;
+ entriesForCleanUpHFileRefs.addAll(entries);
+ lastShippedBatch = entryBatch;
+ if (shouldPersistLogPosition()) {
+ persistLogPosition();
}
- // Log and clean up WAL logs
- updateLogPosition(entryBatch);
// offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
// this sizeExcludeBulkLoad has to use same calculation that when calling
@@ -229,6 +247,41 @@ private void shipEdits(WALEntryBatch entryBatch) {
}
}
+ private boolean shouldPersistLogPosition() {
+ ReplicationEndpoint endpoint = source.getReplicationEndpoint();
+ long maxBufferSize = endpoint.getMaxBufferSize();
+ if (stagedWalSize == 0 || lastShippedBatch == null) {
+ return false;
+ }
+ if (maxBufferSize == -1) {
+ return true;
+ }
+ return stagedWalSize >= maxBufferSize
+ || (EnvironmentEdgeManager.currentTime() - lastStagedFlushTs >= endpoint.maxFlushInterval());
+ }
+
+ private void persistLogPosition() throws IOException {
+ if (lastShippedBatch == null) {
+ return;
+ }
+
+ ReplicationEndpoint endpoint = source.getReplicationEndpoint();
+ endpoint.beforePersistingReplicationOffset();
+
+ // Clean up hfile references
+ for (Entry entry : entriesForCleanUpHFileRefs) {
+ cleanUpHFileRefs(entry.getEdit());
+ LOG.trace("shipped entry {}: ", entry);
+ }
+ entriesForCleanUpHFileRefs.clear();
+
+ stagedWalSize = 0;
+ lastStagedFlushTs = EnvironmentEdgeManager.currentTime();
+
+ // Log and clean up WAL logs
+ updateLogPosition(lastShippedBatch);
+ }
+
private void cleanUpHFileRefs(WALEdit edit) throws IOException {
String peerId = source.getPeerId();
if (peerId.contains("-")) {