Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -283,4 +283,35 @@ public int getTimeout() {
* @throws IllegalStateException if this service's state isn't FAILED.
*/
Throwable failureCause();

/**
* Buffered replication endpoints can use this to indicate when staged WAL data should be flushed
* before offsets are persisted.
* <p>
* The default value {@code -1L} indicates that offsets may be persisted immediately after a
* {@link WALEntryBatch} is replicated.
* </p>
*/
default long getMaxBufferSize() {
return -1L;
}

/**
* Buffered replication endpoints can use this to indicate the maximum time (in milliseconds) that
* WAL data may remain staged before offsets are persisted.
* <p>
* The default value {@link Long#MAX_VALUE} indicates that offsets may be persisted without any
* time-based flush constraint.
* </p>
*/
default long maxFlushInterval() {
return Long.MAX_VALUE;
}

/**
* Hook invoked before persisting replication offsets. Eg: Buffered endpoints can flush/close WALs
* here.
*/
default void beforePersistingReplicationOffset() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the endpoints view, this method is just a flush/close of the given wal, so lets name it accordingly.

Suggested change
default void beforePersistingReplicationOffset() {
default void flushAndCloseWAL() {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this is exactly what we want to avoid here. This method just tells the endpoint what the shipper going to do, and the endpoint can do anything it wants. For our normal replication framework, there is no flush and close operations, as we will always send everything out and return until we get acks, so basically we do not need to implement this method. And for S3 based replication endpoint, we need to close the file to persist it on S3. Maybe in the future we have other types of replication endpoint which may do other works, so we do not want to name it flushAndCloseWAL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so maybe call it beforeUpdatingLogPosition? And let's be more detailed in the javadoc comments, like saying this is called by the shipper when it's about to move the offset forward in the wal reader and potentially make wal files that were fully read eligible for deletion.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -74,6 +75,10 @@ public enum WorkerState {
private final int DEFAULT_TIMEOUT = 20000;
private final int getEntriesTimeout;
private final int shipEditsTimeout;
private long stagedWalSize = 0L;
private long lastStagedFlushTs = EnvironmentEdgeManager.currentTime();
private WALEntryBatch lastShippedBatch;
private final List<Entry> entriesForCleanUpHFileRefs = new ArrayList<>();

public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source,
ReplicationSourceWALReader walReader) {
Expand All @@ -98,6 +103,14 @@ public final void run() {
LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId);
// Loop until we close down
while (isActive()) {
// Whether to persist replication offsets based on size/time thresholds
if (shouldPersistLogPosition()) {
try {
persistLogPosition();
} catch (IOException e) {
LOG.warn("Exception while persisting replication state", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not enough for handling the exception? Typically we should restart from the last persistent offset and replicate again.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

persistLogPosition() throws Exception only from cleanUpHFileRefs() where getBulkLoadDescriptor() can throw exception, hence I thought retry is not necessary. I am thinking to keep this as it is, but move cleanUpHFileRefs() towards the end of persistLogPosition() definition (after we update offset). Please let me know your thoughts

}
}
// Sleep until replication is enabled again
if (!source.isPeerEnabled()) {
// The peer enabled check is in memory, not expensive, so do not need to increase the
Expand Down Expand Up @@ -155,7 +168,12 @@ private void shipEdits(WALEntryBatch entryBatch) {
List<Entry> entries = entryBatch.getWalEntries();
int sleepMultiplier = 0;
if (entries.isEmpty()) {
updateLogPosition(entryBatch);
lastShippedBatch = entryBatch;
try {
persistLogPosition();
} catch (IOException e) {
LOG.warn("Exception while persisting replication state", e);
}
return;
}
int currentSize = (int) entryBatch.getHeapSize();
Expand Down Expand Up @@ -190,13 +208,13 @@ private void shipEdits(WALEntryBatch entryBatch) {
} else {
sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
}
// Clean up hfile references
for (Entry entry : entries) {
cleanUpHFileRefs(entry.getEdit());
LOG.trace("shipped entry {}: ", entry);

stagedWalSize += currentSize;
entriesForCleanUpHFileRefs.addAll(entries);
lastShippedBatch = entryBatch;
if (shouldPersistLogPosition()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than having these stagedWalSize and lastShippedBatch as global variables, we should just pass the entryBatch along to shouldPersistLogPosition() (which should be defined/implemented in the endpoints, btw, see my other comment related) and persistLogPosition().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to determine whether we need to persist the log position in shipper, based on some configurations, not triggered by replication endpoint. Users can choose different configuration values based on different replication endpoint implementations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, it doesn't look much cohesive. Shipper seems to be taking decisions based on specific endpoint implementations. What if new endpoint impls with different logic for updating log position are thought in the future, we would need to revisit the shipper again.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think time based and size based persistency is enough for most cases? If in the future we have some special endpoint which needs new type of decision way, we can add new mechanism, no problem.

The problem here why we do not want to only trigger persistency from endpoint is that, we have other considerations about when to persist the log position, like the trade off between failover and pressure on replication storage. So here I suggest that we introduce general mechanisms to control the behavior of persistency of log position, users can tune it based on different approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, so your idea is to allow shipper to decide persist log position based on time and/or stg usage by wals regarldess of the endpoint implementation? That would be fine for me, but then we would need to adjust the shouldPersistLogPosition method accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be fine for me, but then we would need to adjust the shouldPersistLogPosition method accordingly.

Are you referring to your original comment about passing the entire entryBatch object?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you referring to your original comment about passing the entire entryBatch object?

No, I meant this check that was being done inside "shouldPersistLogPosition", which would cause the buffer size to be only considered for specific endpoint types.

@ankitsol has already addressed it on a recent commit.

persistLogPosition();
}
// Log and clean up WAL logs
updateLogPosition(entryBatch);

// offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
// this sizeExcludeBulkLoad has to use same calculation that when calling
Expand Down Expand Up @@ -229,6 +247,41 @@ private void shipEdits(WALEntryBatch entryBatch) {
}
}

private boolean shouldPersistLogPosition() {
ReplicationEndpoint endpoint = source.getReplicationEndpoint();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we should use configuration values instead of getting from ReplicationEndpoint. We can have default configuration values to keep the old behavior for normal replication.

Copy link
Author

@ankitsol ankitsol Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so we need to have these conf (time and size ones) at both (ReplicationSourceShipper and ReplicationEndpoint) ends, so we can achieve the tuning you mention in your previous comment: #7617 (comment)

For me, I do not think we need to expose this information to shipper?

The design here is that, when using different ReplicationEndpoint, you need to tune the shipper configuration by your own, as the parameters are not only affected by ReplicationEndpoint, they also depend on the shipper side.

For example, when you want to reduce the pressure on recording the offset, you should increase the record interval, i.e, increase batch size, increase the number of ship times between recording offset, etc. And if you want to reduce the pressure on memory and the target receiver, you should decrease the batch size, and for S3 based replication endpoint, there is also a trade off, if you increase the flush interval, you can get better performance and less files on S3, but failover will be more complicated as you need to start from long before.

So, this should be in the documentation, just exposing some configuration from ReplicationEndpoint can not handle all the above situations.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Apache9 Please let me know if my understanding is correct here

long maxBufferSize = endpoint.getMaxBufferSize();
if (stagedWalSize == 0 || lastShippedBatch == null) {
return false;
}
if (maxBufferSize == -1) {
return true;
}
return stagedWalSize >= maxBufferSize
|| (EnvironmentEdgeManager.currentTime() - lastStagedFlushTs >= endpoint.maxFlushInterval());
}

private void persistLogPosition() throws IOException {
if (lastShippedBatch == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we could cumulate different batches in the above loop, a null batch does not mean we haven't shipped anything out? Why here we just return if lastShippedBatch is null?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, lastShippedBatch 'null' means no batch has been replicated yet, so we don't need to update offset. Please correct me if I am wrong here

lastShippedBatch is by default 'null' during ReplicationSourceShipper initialisation and as soon as a batch is replicated it is updated.

return;
}

ReplicationEndpoint endpoint = source.getReplicationEndpoint();
endpoint.beforePersistingReplicationOffset();

// Clean up hfile references
for (Entry entry : entriesForCleanUpHFileRefs) {
cleanUpHFileRefs(entry.getEdit());
LOG.trace("shipped entry {}: ", entry);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide a more accurate message. This is not what really happened here.

}
entriesForCleanUpHFileRefs.clear();

stagedWalSize = 0;
lastStagedFlushTs = EnvironmentEdgeManager.currentTime();

// Log and clean up WAL logs
updateLogPosition(lastShippedBatch);
}

private void cleanUpHFileRefs(WALEdit edit) throws IOException {
String peerId = source.getPeerId();
if (peerId.contains("-")) {
Expand Down