Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,15 @@ default AsyncTableBuilder<C> setMaxRetries(int maxRetries) {
*/
AsyncTableBuilder<C> setRequestAttribute(String key, byte[] value);

/**
* Set a factory for creating request attributes per request. This will be called with any
* attributes set by {@link #setRequestAttribute(String, byte[])}.
*/
default AsyncTableBuilder<C>
setRequestAttributesFactory(RequestAttributesFactory requestAttributesFactory) {
throw new UnsupportedOperationException("Not implemented");
}

/**
* Create the {@link AsyncTable} instance.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ abstract class AsyncTableBuilderBase<C extends ScanResultConsumerBase>

protected Map<String, byte[]> requestAttributes = Collections.emptyMap();

protected RequestAttributesFactory requestAttributesFactory =
RequestAttributesFactory.PASSTHROUGH;

AsyncTableBuilderBase(TableName tableName, AsyncConnectionConfiguration connConf) {
this.tableName = tableName;
this.operationTimeoutNs = tableName.isSystemTable()
Expand Down Expand Up @@ -135,4 +138,11 @@ public AsyncTableBuilder<C> setRequestAttribute(String key, byte[] value) {
requestAttributes.put(key, value);
return this;
}

@Override
public AsyncTableBuilder<C>
setRequestAttributesFactory(RequestAttributesFactory requestAttributesFactory) {
this.requestAttributesFactory = requestAttributesFactory;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {

private final Map<String, byte[]> requestAttributes;

private final RequestAttributesFactory requestAttributesFactory;

RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase<?> builder) {
this.conn = conn;
this.retryTimer = retryTimer;
Expand Down Expand Up @@ -150,6 +152,14 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
: conn.connConf.getScannerCaching();
this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
this.requestAttributes = builder.requestAttributes;
this.requestAttributesFactory = builder.requestAttributesFactory;
}

private Map<String, byte[]> createRequestAttributes() {
Map<String, byte[]> attributes = requestAttributesFactory.create(requestAttributes);
Preconditions.checkState(attributes != null,
"RequestAttributesFactory.create() must not return null");
return attributes;
}

@Override
Expand Down Expand Up @@ -216,7 +226,7 @@ private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, lo
.pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
.setRequestAttributes(requestAttributes);
.setRequestAttributes(createRequestAttributes());
}

private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T>
Expand Down Expand Up @@ -616,7 +626,7 @@ private Scan setDefaultScanConfig(Scan scan) {
public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer,
pauseNs, pauseNsForServerOverloaded, maxAttempts, scanTimeoutNs, readRpcTimeoutNs,
startLogErrorsCnt, requestAttributes).start();
startLogErrorsCnt, createRequestAttributes()).start();
}

private long resultSize2CacheSize(long maxResultSize) {
Expand Down Expand Up @@ -713,7 +723,7 @@ private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long r
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
.setRequestAttributes(requestAttributes).call();
.setRequestAttributes(createRequestAttributes()).call();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import java.util.Map;
import org.apache.yetus.audience.InterfaceAudience;

/**
* A factory for creating request attributes. This is called each time a new call is started,
* allowing for dynamic attributes based on the current context or existing attributes.
* <p>
* The {@link #create} method is guaranteed to be called on the same thread that initiates the
* client call (e.g., {@link AsyncTable#get}, {@link AsyncTable#put}, {@link AsyncTable#scan},
* etc.).
*/
@InterfaceAudience.Public
public interface RequestAttributesFactory {

/**
* A factory that returns the input attributes unchanged.
*/
RequestAttributesFactory PASSTHROUGH = (requestAttributes) -> requestAttributes;

/**
* Creates a new map of request attributes based on the existing attributes for the table.
* <p>
* This method is guaranteed to be called on the same thread that initiates the client call.
* @param requestAttributes The existing attributes configured on the table
* @return The new map of request attributes. Must not be null.
*/
Map<String, byte[]> create(Map<String, byte[]> requestAttributes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,21 @@
*/
package org.apache.hadoop.hbase.client;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.Cell;
Expand Down Expand Up @@ -72,8 +78,18 @@ public class TestRequestAttributes {
private static final byte[] ROW_KEY6 = Bytes.toBytes("6");
private static final byte[] ROW_KEY7 = Bytes.toBytes("7");
private static final byte[] ROW_KEY8 = Bytes.toBytes("8");
private static final byte[] ROW_KEY_FACTORY_GET = Bytes.toBytes("F1");
private static final byte[] ROW_KEY_FACTORY_SCAN = Bytes.toBytes("F2");
private static final byte[] ROW_KEY_FACTORY_PUT = Bytes.toBytes("F3");
private static final byte[] ROW_KEY_FACTORY_AUGMENT = Bytes.toBytes("F4");
private static final byte[] ROW_KEY_FACTORY_PER_REQUEST = Bytes.toBytes("F5");
private static final String FACTORY_KEY = "factoryKey";
private static final byte[] FACTORY_VALUE = Bytes.toBytes("factoryValue");
private static final String STATIC_KEY = "staticKey";
private static final byte[] STATIC_VALUE = Bytes.toBytes("staticValue");
private static final Map<String, byte[]> CONNECTION_ATTRIBUTES = new HashMap<>();
private static final Map<String, byte[]> REQUEST_ATTRIBUTES_SCAN = addRandomRequestAttributes();
private static final Map<String, byte[]> REQUEST_ATTRIBUTES_FACTORY_SCAN = new HashMap<>();
private static final Map<byte[], Map<String, byte[]>> ROW_KEY_TO_REQUEST_ATTRIBUTES =
new HashMap<>();
static {
Expand All @@ -88,6 +104,25 @@ public class TestRequestAttributes {
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY6, addRandomRequestAttributes());
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY7, addRandomRequestAttributes());
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY8, new HashMap<String, byte[]>());

Map<String, byte[]> factoryGetAttrs = new HashMap<>();
factoryGetAttrs.put(FACTORY_KEY, FACTORY_VALUE);
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY_FACTORY_GET, factoryGetAttrs);

REQUEST_ATTRIBUTES_FACTORY_SCAN.put(FACTORY_KEY, FACTORY_VALUE);

Map<String, byte[]> factoryPutAttrs = new HashMap<>();
factoryPutAttrs.put(FACTORY_KEY, FACTORY_VALUE);
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY_FACTORY_PUT, factoryPutAttrs);

Map<String, byte[]> factoryAugmentAttrs = new HashMap<>();
factoryAugmentAttrs.put(STATIC_KEY, STATIC_VALUE);
factoryAugmentAttrs.put(FACTORY_KEY, FACTORY_VALUE);
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY_FACTORY_AUGMENT, factoryAugmentAttrs);

Map<String, byte[]> factoryPerRequestAttrs = new HashMap<>();
factoryPerRequestAttrs.put(FACTORY_KEY, FACTORY_VALUE);
ROW_KEY_TO_REQUEST_ATTRIBUTES.put(ROW_KEY_FACTORY_PER_REQUEST, factoryPerRequestAttrs);
}
private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(100);
private static final byte[] FAMILY = Bytes.toBytes("0");
Expand Down Expand Up @@ -237,6 +272,106 @@ public void testNoRequestAttributes() throws IOException {
}
}

@Test
public void testAsyncRequestAttributesFactoryGet()
throws IOException, ExecutionException, InterruptedException {
Configuration conf = TEST_UTIL.getConfiguration();
try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) {
AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setRequestAttributesFactory(attrs -> {
Map<String, byte[]> newAttrs = new HashMap<>(attrs);
newAttrs.put(FACTORY_KEY, FACTORY_VALUE);
return newAttrs;
}).build();
table.get(new Get(ROW_KEY_FACTORY_GET)).get();
}
}

@Test
public void testAsyncRequestAttributesFactoryScan()
throws IOException, ExecutionException, InterruptedException {
Configuration conf = TEST_UTIL.getConfiguration();
try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) {
AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setRequestAttributesFactory(attrs -> {
Map<String, byte[]> newAttrs = new HashMap<>(attrs);
newAttrs.put(FACTORY_KEY, FACTORY_VALUE);
return newAttrs;
}).build();
List<Result> results = table
.scanAll(
new Scan().withStartRow(ROW_KEY_FACTORY_SCAN).withStopRow(ROW_KEY_FACTORY_SCAN, true))
.get();
}
}

@Test
public void testAsyncRequestAttributesFactoryPut()
throws IOException, ExecutionException, InterruptedException {
Configuration conf = TEST_UTIL.getConfiguration();
try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) {
AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setRequestAttributesFactory(attrs -> {
Map<String, byte[]> newAttrs = new HashMap<>(attrs);
newAttrs.put(FACTORY_KEY, FACTORY_VALUE);
return newAttrs;
}).build();
Put put = new Put(ROW_KEY_FACTORY_PUT);
put.addColumn(FAMILY, Bytes.toBytes("c"), Bytes.toBytes("v"));
table.put(put).get();
}
}

@Test
public void testAsyncRequestAttributesFactoryAugmentsStaticAttributes()
throws IOException, ExecutionException, InterruptedException {
Configuration conf = TEST_UTIL.getConfiguration();
try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) {
AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME)
.setRequestAttribute(STATIC_KEY, STATIC_VALUE).setRequestAttributesFactory(attrs -> {
Map<String, byte[]> newAttrs = new HashMap<>(attrs);
newAttrs.put(FACTORY_KEY, FACTORY_VALUE);
return newAttrs;
}).build();
table.get(new Get(ROW_KEY_FACTORY_AUGMENT)).get();
}
}

@Test
public void testAsyncRequestAttributesFactoryCalledPerRequest()
throws IOException, ExecutionException, InterruptedException {
Configuration conf = TEST_UTIL.getConfiguration();
AtomicInteger callCount = new AtomicInteger(0);
try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) {
AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setRequestAttributesFactory(attrs -> {
callCount.incrementAndGet();
Map<String, byte[]> newAttrs = new HashMap<>(attrs);
newAttrs.put(FACTORY_KEY, FACTORY_VALUE);
return newAttrs;
}).build();
table.get(new Get(ROW_KEY_FACTORY_PER_REQUEST)).get();
table.get(new Get(ROW_KEY_FACTORY_PER_REQUEST)).get();
table.get(new Get(ROW_KEY_FACTORY_PER_REQUEST)).get();
}
assertTrue("Factory should be called at least 3 times", callCount.get() >= 3);
}

@Test
public void testAsyncRequestAttributesFactoryCalledOnInitiatingThread()
throws IOException, ExecutionException, InterruptedException {
Configuration conf = TEST_UTIL.getConfiguration();
Thread testThread = Thread.currentThread();
AtomicReference<Thread> factoryThread = new AtomicReference<>();
try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) {
AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setRequestAttributesFactory(attrs -> {
factoryThread.set(Thread.currentThread());
Map<String, byte[]> newAttrs = new HashMap<>(attrs);
newAttrs.put(FACTORY_KEY, FACTORY_VALUE);
return newAttrs;
}).build();
table.get(new Get(ROW_KEY_FACTORY_GET)).get();
}
assertEquals("Factory should be called on the initiating thread", testThread,
factoryThread.get());
}

private static Map<String, byte[]> addRandomRequestAttributes() {
Map<String, byte[]> requestAttributes = new HashMap<>();
int j = Math.max(2, (int) (10 * Math.random()));
Expand Down Expand Up @@ -324,7 +459,10 @@ public void preGetOp(ObserverContext<? extends RegionCoprocessorEnvironment> c,
@Override
public boolean preScannerNext(ObserverContext<? extends RegionCoprocessorEnvironment> c,
InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
if (!isValidRequestAttributes(REQUEST_ATTRIBUTES_SCAN)) {
if (
!isValidRequestAttributes(REQUEST_ATTRIBUTES_SCAN)
&& !isValidRequestAttributes(REQUEST_ATTRIBUTES_FACTORY_SCAN)
) {
throw new IOException("Incorrect request attributes");
}
return hasNext;
Expand Down