From ba61a0c522f3a4c9f52580263c3b9c113ae6e3fc Mon Sep 17 00:00:00 2001 From: Kodey Converse Date: Fri, 25 Jul 2025 09:51:43 -0400 Subject: [PATCH] HBASE-29850 Add an ability to generate request attributes per request --- .../hbase/client/AsyncTableBuilder.java | 9 ++ .../hbase/client/AsyncTableBuilderBase.java | 10 ++ .../hbase/client/RawAsyncTableImpl.java | 16 +- .../client/RequestAttributesFactory.java | 47 ++++++ .../hbase/client/TestRequestAttributes.java | 140 +++++++++++++++++- 5 files changed, 218 insertions(+), 4 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestAttributesFactory.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java index 007f7ad48685..be55177e3aac 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java @@ -142,6 +142,15 @@ default AsyncTableBuilder setMaxRetries(int maxRetries) { */ AsyncTableBuilder setRequestAttribute(String key, byte[] value); + /** + * Set a factory for creating request attributes per request. This will be called with any + * attributes set by {@link #setRequestAttribute(String, byte[])}. + */ + default AsyncTableBuilder + setRequestAttributesFactory(RequestAttributesFactory requestAttributesFactory) { + throw new UnsupportedOperationException("Not implemented"); + } + /** * Create the {@link AsyncTable} instance. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java index 428e7358195e..372d7b9c1d0f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java @@ -55,6 +55,9 @@ abstract class AsyncTableBuilderBase protected Map requestAttributes = Collections.emptyMap(); + protected RequestAttributesFactory requestAttributesFactory = + RequestAttributesFactory.PASSTHROUGH; + AsyncTableBuilderBase(TableName tableName, AsyncConnectionConfiguration connConf) { this.tableName = tableName; this.operationTimeoutNs = tableName.isSystemTable() @@ -135,4 +138,11 @@ public AsyncTableBuilder setRequestAttribute(String key, byte[] value) { requestAttributes.put(key, value); return this; } + + @Override + public AsyncTableBuilder + setRequestAttributesFactory(RequestAttributesFactory requestAttributesFactory) { + this.requestAttributesFactory = requestAttributesFactory; + return this; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 553b4afa55ea..d3a61d2f1ff1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -123,6 +123,8 @@ class RawAsyncTableImpl implements AsyncTable { private final Map requestAttributes; + private final RequestAttributesFactory requestAttributesFactory; + RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase builder) { this.conn = conn; this.retryTimer = retryTimer; @@ -150,6 +152,14 @@ class RawAsyncTableImpl implements AsyncTable { : conn.connConf.getScannerCaching(); this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); this.requestAttributes = builder.requestAttributes; + this.requestAttributesFactory = builder.requestAttributesFactory; + } + + private Map createRequestAttributes() { + Map attributes = requestAttributesFactory.create(requestAttributes); + Preconditions.checkState(attributes != null, + "RequestAttributesFactory.create() must not return null"); + return attributes; } @Override @@ -216,7 +226,7 @@ private SingleRequestCallerBuilder newCaller(byte[] row, int priority, lo .pause(pauseNs, TimeUnit.NANOSECONDS) .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) - .setRequestAttributes(requestAttributes); + .setRequestAttributes(createRequestAttributes()); } private SingleRequestCallerBuilder @@ -616,7 +626,7 @@ private Scan setDefaultScanConfig(Scan scan) { public void scan(Scan scan, AdvancedScanResultConsumer consumer) { new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer, pauseNs, pauseNsForServerOverloaded, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, - startLogErrorsCnt, requestAttributes).start(); + startLogErrorsCnt, createRequestAttributes()).start(); } private long resultSize2CacheSize(long maxResultSize) { @@ -713,7 +723,7 @@ private List> batch(List actions, long r .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS) .pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS) .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt) - .setRequestAttributes(requestAttributes).call(); + .setRequestAttributes(createRequestAttributes()).call(); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestAttributesFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestAttributesFactory.java new file mode 100644 index 000000000000..246e8588d535 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestAttributesFactory.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.Map; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * A factory for creating request attributes. This is called each time a new call is started, + * allowing for dynamic attributes based on the current context or existing attributes. + *

+ * The {@link #create} method is guaranteed to be called on the same thread that initiates the + * client call (e.g., {@link AsyncTable#get}, {@link AsyncTable#put}, {@link AsyncTable#scan}, + * etc.). + */ +@InterfaceAudience.Public +public interface RequestAttributesFactory { + + /** + * A factory that returns the input attributes unchanged. + */ + RequestAttributesFactory PASSTHROUGH = (requestAttributes) -> requestAttributes; + + /** + * Creates a new map of request attributes based on the existing attributes for the table. + *

+ * This method is guaranteed to be called on the same thread that initiates the client call. + * @param requestAttributes The existing attributes configured on the table + * @return The new map of request attributes. Must not be null. + */ + Map create(Map requestAttributes); +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAttributes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAttributes.java index 9d6dc33a46a3..ccaffaad6108 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAttributes.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRequestAttributes.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.client; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.util.Arrays; import java.util.HashMap; @@ -24,8 +27,11 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AuthUtil; import org.apache.hadoop.hbase.Cell; @@ -72,8 +78,18 @@ public class TestRequestAttributes { private static final byte[] ROW_KEY6 = Bytes.toBytes("6"); private static final byte[] ROW_KEY7 = Bytes.toBytes("7"); private static final byte[] ROW_KEY8 = Bytes.toBytes("8"); + private static final byte[] ROW_KEY_FACTORY_GET = Bytes.toBytes("F1"); + private static final byte[] ROW_KEY_FACTORY_SCAN = Bytes.toBytes("F2"); + private static final byte[] ROW_KEY_FACTORY_PUT = Bytes.toBytes("F3"); + private static final byte[] ROW_KEY_FACTORY_AUGMENT = Bytes.toBytes("F4"); + private static final byte[] ROW_KEY_FACTORY_PER_REQUEST = Bytes.toBytes("F5"); + private static final String FACTORY_KEY = "factoryKey"; + private static final byte[] FACTORY_VALUE = Bytes.toBytes("factoryValue"); + private static final String STATIC_KEY = "staticKey"; + private static final byte[] STATIC_VALUE = Bytes.toBytes("staticValue"); private static final Map CONNECTION_ATTRIBUTES = new HashMap<>(); private static final Map REQUEST_ATTRIBUTES_SCAN = addRandomRequestAttributes(); + private static final Map REQUEST_ATTRIBUTES_FACTORY_SCAN = new HashMap<>(); private static final Map> ROW_KEY_TO_REQUEST_ATTRIBUTES = new HashMap<>(); static { @@ -88,6 +104,25 @@ public class TestRequestAttributes { ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY6, addRandomRequestAttributes()); ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY7, addRandomRequestAttributes()); ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY8, new HashMap()); + + Map factoryGetAttrs = new HashMap<>(); + factoryGetAttrs.put(FACTORY_KEY, FACTORY_VALUE); + ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY_FACTORY_GET, factoryGetAttrs); + + REQUEST_ATTRIBUTES_FACTORY_SCAN.put(FACTORY_KEY, FACTORY_VALUE); + + Map factoryPutAttrs = new HashMap<>(); + factoryPutAttrs.put(FACTORY_KEY, FACTORY_VALUE); + ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY_FACTORY_PUT, factoryPutAttrs); + + Map factoryAugmentAttrs = new HashMap<>(); + factoryAugmentAttrs.put(STATIC_KEY, STATIC_VALUE); + factoryAugmentAttrs.put(FACTORY_KEY, FACTORY_VALUE); + ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY_FACTORY_AUGMENT, factoryAugmentAttrs); + + Map factoryPerRequestAttrs = new HashMap<>(); + factoryPerRequestAttrs.put(FACTORY_KEY, FACTORY_VALUE); + ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY_FACTORY_PER_REQUEST, factoryPerRequestAttrs); } private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(100); private static final byte[] FAMILY = Bytes.toBytes("0"); @@ -237,6 +272,106 @@ public void testNoRequestAttributes() throws IOException { } } + @Test + public void testAsyncRequestAttributesFactoryGet() + throws IOException, ExecutionException, InterruptedException { + Configuration conf = TEST_UTIL.getConfiguration(); + try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) { + AsyncTable table = conn.getTableBuilder(TABLE_NAME).setRequestAttributesFactory(attrs -> { + Map newAttrs = new HashMap<>(attrs); + newAttrs.put(FACTORY_KEY, FACTORY_VALUE); + return newAttrs; + }).build(); + table.get(new Get(ROW_KEY_FACTORY_GET)).get(); + } + } + + @Test + public void testAsyncRequestAttributesFactoryScan() + throws IOException, ExecutionException, InterruptedException { + Configuration conf = TEST_UTIL.getConfiguration(); + try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) { + AsyncTable table = conn.getTableBuilder(TABLE_NAME).setRequestAttributesFactory(attrs -> { + Map newAttrs = new HashMap<>(attrs); + newAttrs.put(FACTORY_KEY, FACTORY_VALUE); + return newAttrs; + }).build(); + List results = table + .scanAll( + new Scan().withStartRow(ROW_KEY_FACTORY_SCAN).withStopRow(ROW_KEY_FACTORY_SCAN, true)) + .get(); + } + } + + @Test + public void testAsyncRequestAttributesFactoryPut() + throws IOException, ExecutionException, InterruptedException { + Configuration conf = TEST_UTIL.getConfiguration(); + try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) { + AsyncTable table = conn.getTableBuilder(TABLE_NAME).setRequestAttributesFactory(attrs -> { + Map newAttrs = new HashMap<>(attrs); + newAttrs.put(FACTORY_KEY, FACTORY_VALUE); + return newAttrs; + }).build(); + Put put = new Put(ROW_KEY_FACTORY_PUT); + put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v")); + table.put(put).get(); + } + } + + @Test + public void testAsyncRequestAttributesFactoryAugmentsStaticAttributes() + throws IOException, ExecutionException, InterruptedException { + Configuration conf = TEST_UTIL.getConfiguration(); + try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) { + AsyncTable table = conn.getTableBuilder(TABLE_NAME) + .setRequestAttribute(STATIC_KEY, STATIC_VALUE).setRequestAttributesFactory(attrs -> { + Map newAttrs = new HashMap<>(attrs); + newAttrs.put(FACTORY_KEY, FACTORY_VALUE); + return newAttrs; + }).build(); + table.get(new Get(ROW_KEY_FACTORY_AUGMENT)).get(); + } + } + + @Test + public void testAsyncRequestAttributesFactoryCalledPerRequest() + throws IOException, ExecutionException, InterruptedException { + Configuration conf = TEST_UTIL.getConfiguration(); + AtomicInteger callCount = new AtomicInteger(0); + try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) { + AsyncTable table = conn.getTableBuilder(TABLE_NAME).setRequestAttributesFactory(attrs -> { + callCount.incrementAndGet(); + Map newAttrs = new HashMap<>(attrs); + newAttrs.put(FACTORY_KEY, FACTORY_VALUE); + return newAttrs; + }).build(); + table.get(new Get(ROW_KEY_FACTORY_PER_REQUEST)).get(); + table.get(new Get(ROW_KEY_FACTORY_PER_REQUEST)).get(); + table.get(new Get(ROW_KEY_FACTORY_PER_REQUEST)).get(); + } + assertTrue("Factory should be called at least 3 times", callCount.get() >= 3); + } + + @Test + public void testAsyncRequestAttributesFactoryCalledOnInitiatingThread() + throws IOException, ExecutionException, InterruptedException { + Configuration conf = TEST_UTIL.getConfiguration(); + Thread testThread = Thread.currentThread(); + AtomicReference factoryThread = new AtomicReference<>(); + try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) { + AsyncTable table = conn.getTableBuilder(TABLE_NAME).setRequestAttributesFactory(attrs -> { + factoryThread.set(Thread.currentThread()); + Map newAttrs = new HashMap<>(attrs); + newAttrs.put(FACTORY_KEY, FACTORY_VALUE); + return newAttrs; + }).build(); + table.get(new Get(ROW_KEY_FACTORY_GET)).get(); + } + assertEquals("Factory should be called on the initiating thread", testThread, + factoryThread.get()); + } + private static Map addRandomRequestAttributes() { Map requestAttributes = new HashMap<>(); int j = Math.max(2, (int) (10 * Math.random())); @@ -324,7 +459,10 @@ public void preGetOp(ObserverContext c, @Override public boolean preScannerNext(ObserverContext c, InternalScanner s, List result, int limit, boolean hasNext) throws IOException { - if (!isValidRequestAttributes(REQUEST_ATTRIBUTES_SCAN)) { + if ( + !isValidRequestAttributes(REQUEST_ATTRIBUTES_SCAN) + && !isValidRequestAttributes(REQUEST_ATTRIBUTES_FACTORY_SCAN) + ) { throw new IOException("Incorrect request attributes"); } return hasNext;