diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java index c3606e3f..ffc612fb 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java @@ -2,41 +2,11 @@ import static com.netflix.evcache.util.Sneaky.sneakyThrow; -import java.lang.management.ManagementFactory; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.Function; -import java.util.stream.Collectors; - -import javax.management.MBeanServer; -import javax.management.ObjectName; - -import com.netflix.evcache.dto.KeyMapDto; -import com.netflix.evcache.util.EVCacheBulkDataDto; -import com.netflix.evcache.util.KeyHasher; -import com.netflix.evcache.util.RetryCount; -import com.netflix.evcache.util.Sneaky; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.netflix.archaius.api.Property; import com.netflix.archaius.api.PropertyRepository; import com.netflix.evcache.EVCacheInMemoryCache.DataNotFoundException; import com.netflix.evcache.EVCacheLatch.Policy; +import com.netflix.evcache.dto.KeyMapDto; import com.netflix.evcache.event.EVCacheEvent; import com.netflix.evcache.event.EVCacheEventListener; import com.netflix.evcache.metrics.EVCacheMetricsFactory; @@ -52,14 +22,40 @@ import com.netflix.evcache.pool.EVCacheClientUtil; import com.netflix.evcache.pool.EVCacheValue; import com.netflix.evcache.pool.ServerGroup; +import com.netflix.evcache.util.EVCacheBulkDataDto; +import com.netflix.evcache.util.KeyHasher; +import com.netflix.evcache.util.RetryCount; +import com.netflix.evcache.util.Sneaky; import com.netflix.spectator.api.BasicTag; import com.netflix.spectator.api.Counter; import com.netflix.spectator.api.DistributionSummary; import com.netflix.spectator.api.Tag; import com.netflix.spectator.api.Timer; - +import java.lang.management.ManagementFactory; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.management.MBeanServer; +import javax.management.ObjectName; import net.spy.memcached.CachedData; import net.spy.memcached.transcoders.Transcoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import rx.Observable; import rx.Scheduler; import rx.Single; @@ -1900,16 +1896,30 @@ private CompletableFuture> getAsyncBulkData(EVCacheClient List evcacheKeys, Transcoder tc) { KeyMapDto keyMapDto = buildKeyMap(client, evcacheKeys); - final Map keyMap = keyMapDto.getKeyMap(); - boolean hasHashedKey = keyMapDto.isKeyHashed(); - if (hasHashedKey) { + Map unHashedKeyMap = keyMapDto.getNonHashedKeyMap(); + Map hashedKeyMap = keyMapDto.getHashedKeyMap(); + + if (!unHashedKeyMap.isEmpty() && hashedKeyMap.isEmpty()) { // all keys are non-hashed + final Transcoder tcCopy; + if (tc == null && _transcoder != null) { + tcCopy = (Transcoder) _transcoder; + } else { + tcCopy = tc; + } if (log.isDebugEnabled() && shouldLog()) { - log.debug("fetching bulk data with hashedKey {} ",evcacheKeys); + log.debug("fetching bulk data with un-hashed keys {}", unHashedKeyMap.keySet()); } - return client.getAsyncBulk(keyMap.keySet(), evcacheValueTranscoder) - .thenApply(data -> buildHashedKeyValueResult(data, tc, client, keyMap)) + return client.getAsyncBulk(unHashedKeyMap.keySet(), tcCopy ) + .thenApply(data -> buildNonHashedKeyValueResult(data, unHashedKeyMap)) .exceptionally(t -> handleBulkException(t, evcacheKeys)); - } else { + } else if (!hashedKeyMap.isEmpty() && unHashedKeyMap.isEmpty()) { // all keys are hashed + if (log.isDebugEnabled() && shouldLog()) { + log.debug("fetching bulk data with hashed keys {} : {}", hashedKeyMap.keySet()); + } + return client.getAsyncBulk(hashedKeyMap.keySet(), evcacheValueTranscoder) + .thenApply(data -> buildHashedKeyValueResult(data, tc, client, hashedKeyMap)) + .exceptionally(t -> handleBulkException(t, evcacheKeys)); + } else { // a mix of hashed and un-hashed keys final Transcoder tcCopy; if (tc == null && _transcoder != null) { tcCopy = (Transcoder) _transcoder; @@ -1917,10 +1927,15 @@ private CompletableFuture> getAsyncBulkData(EVCacheClient tcCopy = tc; } if (log.isDebugEnabled() && shouldLog()) { - log.debug("fetching bulk data with non hashedKey {} ",keyMap.keySet()); - } - return client.getAsyncBulk(keyMap.keySet(), tcCopy ) - .thenApply(data -> buildNonHashedKeyValueResult(data, keyMap)) + log.debug("fetching bulk data with hashed keys: {} AND un-hashed keys {}", hashedKeyMap.keySet(), unHashedKeyMap.keySet()); + } + return client.getAsyncBulk(unHashedKeyMap.keySet(), tcCopy, hashedKeyMap.keySet(), evcacheValueTranscoder) + .thenApply(data -> { + // asyncGetBulk now returns Map for mixed hashed/non-hashed keys + @SuppressWarnings("unchecked") + Map objectMap = (Map) data; + return buildMixedKeyValueResult(objectMap, tc, client, unHashedKeyMap, hashedKeyMap); + }) .exceptionally(t -> handleBulkException(t, evcacheKeys)); } } @@ -1932,20 +1947,21 @@ private Map handleBulkException(Throwable t, Collection evcacheKeys) { - boolean hasHashedKey = false; - final Map keyMap = new HashMap(evcacheKeys.size() * 2); + final Map hashedKeyMap = new HashMap<>(); + final Map nonHashedKeyMap = new HashMap<>(); + for (EVCacheKey evcKey : evcacheKeys) { String key = evcKey.getCanonicalKey(client.isDuetClient()); String hashKey = evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxDigestBytes(), client.getMaxHashLength(), client.getBaseEncoder()); if (hashKey != null) { if (log.isDebugEnabled() && shouldLog()) log.debug("APP " + _appName + ", key [" + key + "], has been hashed [" + hashKey + "]"); - key = hashKey; - hasHashedKey = true; + hashedKeyMap.put(hashKey, evcKey); + } else { + nonHashedKeyMap.put(key, evcKey); } - keyMap.put(key, evcKey); } - return new KeyMapDto(keyMap, hasHashedKey); + return new KeyMapDto(hashedKeyMap, nonHashedKeyMap); } private Map buildNonHashedKeyValueResult(Map objMap, @@ -1998,6 +2014,47 @@ private Map buildHashedKeyValueResult(Map o return retMap; } + private Map buildMixedKeyValueResult(Map objMap, + Transcoder tc, + EVCacheClient client, + Map unHashedKeyMap, + Map hashedKeyMap) { + final Map retMap = new HashMap<>((int) (objMap.size() / 0.75) + 1); + for (Map.Entry i : objMap.entrySet()) { + final EVCacheKey evcKey = hashedKeyMap.getOrDefault(i.getKey(), unHashedKeyMap.get(i.getKey())); + if (evcKey == null) { + throw new IllegalStateException("Key not found anywhere"); + } + final Object obj = i.getValue(); + if (obj instanceof EVCacheValue) { + if (log.isDebugEnabled() && shouldLog()) + log.debug("APP " + _appName + ", The value for key [" + i.getKey() + "] is EVCache Value"); + final EVCacheValue val = (EVCacheValue) obj; + final CachedData cd = new CachedData(val.getFlags(), val.getValue(), CachedData.MAX_SIZE); + final T tVal; + if (tc == null) { + tVal = (T) client.getTranscoder().decode(cd); + } else { + tVal = tc.decode(cd); + } + if (evcKey.getCanonicalKey(client.isDuetClient()).equals(val.getKey())) { + if (log.isDebugEnabled() && shouldLog()) + log.debug("APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey); + retMap.put(evcKey, tVal); + } else { + if (log.isDebugEnabled() && shouldLog()) + log.debug("CACHE COLLISION : APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey); + incrementFailure(EVCacheMetricsFactory.KEY_HASH_COLLISION, Call.COMPLETABLE_FUTURE_GET_BULK.name(), EVCacheMetricsFactory.READ); + } + } else { + if (log.isDebugEnabled() && shouldLog()) + log.debug("APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey); + retMap.put(evcKey, (T) obj); + } + } + return retMap; + } + private Map getBulkData(EVCacheClient client, Collection evcacheKeys, Transcoder tc, boolean throwException, boolean hasZF) throws Exception { try { boolean hasHashedKey = false; diff --git a/evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java b/evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java index 64098e27..cb1d4a5d 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java +++ b/evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java @@ -1,23 +1,36 @@ package com.netflix.evcache.dto; import com.netflix.evcache.EVCacheKey; - +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import javax.annotation.Nonnull; public class KeyMapDto { - Map keyMap; - boolean isKeyHashed; + Map hashedKeyMap; + Map nonHashedKeyMap; + + public KeyMapDto(@Nonnull Map hashedKeyMap, @Nonnull Map nonHashedKeyMap) { + this.hashedKeyMap = hashedKeyMap; + this.nonHashedKeyMap = nonHashedKeyMap; + } + + public Map getHashedKeyMap() { + return hashedKeyMap; + } - public KeyMapDto(Map keyMap, boolean isKeyHashed) { - this.keyMap = keyMap; - this.isKeyHashed = isKeyHashed; + public Map getNonHashedKeyMap() { + return nonHashedKeyMap; } - public Map getKeyMap() { - return keyMap; + public Set getAllKeys() { + Set allKeys = new HashSet<>(hashedKeyMap.size() + nonHashedKeyMap.size()); + allKeys.addAll(hashedKeyMap.keySet()); + allKeys.addAll(nonHashedKeyMap.keySet()); + return allKeys; } - public boolean isKeyHashed() { - return isKeyHashed; + public boolean isKeyHashed(String key) { + return hashedKeyMap.containsKey(key); } } diff --git a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java index 07d60bd8..67533680 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java +++ b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java @@ -1,5 +1,12 @@ package com.netflix.evcache.operation; +import com.netflix.evcache.metrics.EVCacheMetricsFactory; +import com.netflix.evcache.pool.EVCacheClient; +import com.netflix.evcache.pool.ServerGroup; +import com.netflix.evcache.util.EVCacheConfig; +import com.netflix.spectator.api.BasicTag; +import com.netflix.spectator.api.Tag; +import com.sun.management.GcInfo; import java.lang.management.GarbageCollectorMXBean; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; @@ -8,33 +15,25 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceArray; - -import com.netflix.evcache.EVCacheGetOperationListener; -import com.netflix.evcache.util.Pair; -import net.spy.memcached.internal.BulkGetCompletionListener; -import net.spy.memcached.internal.CheckedOperationTimeoutException; -import net.spy.memcached.ops.GetOperation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.netflix.evcache.metrics.EVCacheMetricsFactory; -import com.netflix.evcache.pool.EVCacheClient; -import com.netflix.evcache.pool.ServerGroup; -import com.netflix.evcache.util.EVCacheConfig; -import com.netflix.spectator.api.BasicTag; -import com.netflix.spectator.api.Tag; -import com.sun.management.GcInfo; - import net.spy.memcached.MemcachedConnection; import net.spy.memcached.internal.BulkGetFuture; +import net.spy.memcached.internal.CheckedOperationTimeoutException; +import net.spy.memcached.ops.GetOperation; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import rx.Scheduler; import rx.Single; @@ -211,6 +210,7 @@ public CompletableFuture> getAsyncSome(long timeout, TimeUnit uni doAsyncGetSome(future); return future.handle((data, ex) -> { if (ex != null) { + log.error("SNAP: ", ex); handleBulkException(); } return data; @@ -219,8 +219,12 @@ public CompletableFuture> getAsyncSome(long timeout, TimeUnit uni public void handleBulkException() { ExecutionException t = null; + Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); + if (state == null) { + throw new IllegalStateException("This is a temporary msg, Shih-hao has a PR that addresses this case"); //SNAP: TODO: remove + } if (!state.completed) { if (state.cancelled) { throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled"))); @@ -248,8 +252,9 @@ public void doAsyncGetSome(CompletableFuture> promise) { m.put(me.getKey(), (T)me.getValue()); } promise.complete(m); - } catch (Exception t) { - promise.completeExceptionally(t); + } catch (Exception e) { + log.error("SNAP: ", e); // SNAP: TODO: cleanup + promise.completeExceptionally(e); } }); } diff --git a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheOperationFuture.java b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheOperationFuture.java index 2b9941cd..c701ba32 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheOperationFuture.java +++ b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheOperationFuture.java @@ -293,7 +293,7 @@ private static CompletableFuture getNext(CompletableFuture future, timeout, unit, timeoutSlots); - future.completeExceptionally(new TimeoutException("Timeout after " + timeout)); + future.completeExceptionally(new TimeoutException("Timeout after " + timeout + "ms")); }, splitTimeout, unit); diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java index bef04d88..5ca75826 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java @@ -1,29 +1,5 @@ package com.netflix.evcache.pool; -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.PrintWriter; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; -import java.net.URLDecoder; -import java.nio.charset.StandardCharsets; -import java.util.AbstractMap.SimpleEntry; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.*; -import java.util.function.BiPredicate; -import java.util.zip.CRC32; -import java.util.zip.Checksum; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.netflix.archaius.api.Property; import com.netflix.evcache.EVCache; import com.netflix.evcache.EVCache.Call; @@ -32,6 +8,7 @@ import com.netflix.evcache.EVCacheLatch; import com.netflix.evcache.EVCacheReadQueueException; import com.netflix.evcache.EVCacheSerializingTranscoder; +import com.netflix.evcache.EVCacheTranscoder; import com.netflix.evcache.metrics.EVCacheMetricsFactory; import com.netflix.evcache.operation.EVCacheFutures; import com.netflix.evcache.operation.EVCacheItem; @@ -44,19 +21,44 @@ import com.netflix.spectator.api.BasicTag; import com.netflix.spectator.api.Counter; import com.netflix.spectator.api.Tag; - +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.URLDecoder; +import java.nio.charset.StandardCharsets; +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.BiPredicate; +import java.util.zip.CRC32; +import java.util.zip.Checksum; import net.spy.memcached.CASValue; import net.spy.memcached.CachedData; import net.spy.memcached.ConnectionFactory; import net.spy.memcached.EVCacheMemcachedClient; import net.spy.memcached.EVCacheNode; -import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedNode; import net.spy.memcached.NodeLocator; import net.spy.memcached.internal.ListenableFuture; import net.spy.memcached.internal.OperationCompletionListener; import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.transcoders.Transcoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import rx.Scheduler; import rx.Single; @@ -988,7 +990,19 @@ public CompletableFuture> getAsyncBulk(Collection can if (tc == null) tc = (Transcoder) getTranscoder(); return evcacheMemcachedClient .asyncGetBulk(canonicalKeys, tc, null, validator) - .getAsyncSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS); + .getAsyncSome(bulkReadTimeout.get() * 10000, TimeUnit.MILLISECONDS);// SNAP: TODO: + + } + + /** + * One bulk call for some hashed some unhashed keys (that require different transcoders) + */ + public CompletableFuture> getAsyncBulk(Collection unHashedKeys, Transcoder tc, Collection hashedKeys, EVCacheTranscoder evCacheTranscoder) { + final BiPredicate validator = (node, key) -> validateReadQueueSize(node, Call.COMPLETABLE_FUTURE_GET_BULK); + if (tc == null) tc = (Transcoder) getTranscoder(); + return evcacheMemcachedClient + .asyncGetBulk(unHashedKeys, tc, hashedKeys, evCacheTranscoder, null, validator) + .getAsyncSome(bulkReadTimeout.get() * 10000, TimeUnit.MILLISECONDS);// SNAP: TODO: } diff --git a/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java b/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java index 9fc6f875..e5379088 100644 --- a/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java +++ b/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java @@ -1,35 +1,11 @@ package net.spy.memcached; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiPredicate; -import java.util.function.Consumer; - -import com.netflix.archaius.api.PropertyRepository; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.netflix.archaius.api.Property; import com.netflix.archaius.api.Property.Subscription; +import com.netflix.archaius.api.PropertyRepository; import com.netflix.evcache.EVCacheGetOperationListener; import com.netflix.evcache.EVCacheLatch; +import com.netflix.evcache.EVCacheTranscoder; import com.netflix.evcache.metrics.EVCacheMetricsFactory; import com.netflix.evcache.operation.EVCacheAsciiOperationFactory; import com.netflix.evcache.operation.EVCacheBulkGetFuture; @@ -44,8 +20,24 @@ import com.netflix.spectator.api.DistributionSummary; import com.netflix.spectator.api.Tag; import com.netflix.spectator.api.Timer; -import com.netflix.spectator.ipc.IpcStatus; - +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiPredicate; +import java.util.function.Consumer; import net.spy.memcached.internal.GetFuture; import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.ops.ConcatenationType; @@ -56,16 +48,16 @@ import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationCallback; import net.spy.memcached.ops.OperationStatus; -import net.spy.memcached.ops.StatsOperation; import net.spy.memcached.ops.StatusCode; import net.spy.memcached.ops.StoreOperation; import net.spy.memcached.ops.StoreType; -import net.spy.memcached.protocol.binary.BinaryOperationFactory; -import net.spy.memcached.transcoders.Transcoder; -import net.spy.memcached.util.StringUtils; import net.spy.memcached.protocol.ascii.ExecCmdOperation; import net.spy.memcached.protocol.ascii.MetaDebugOperation; import net.spy.memcached.protocol.ascii.MetaGetOperation; +import net.spy.memcached.protocol.binary.BinaryOperationFactory; +import net.spy.memcached.transcoders.Transcoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @edu.umd.cs.findbugs.annotations.SuppressFBWarnings({ "PRMC_POSSIBLY_REDUNDANT_METHOD_CALLS", "SIC_INNER_SHOULD_BE_STATIC_ANON" }) @@ -301,33 +293,54 @@ public EVCacheBulkGetFuture asyncGetBulk(Collection keys, final Transcoder tc, EVCacheGetOperationListener listener, BiPredicate nodeValidator) { - final Map> m = new ConcurrentHashMap>(); - - // Break the gets down into groups by key - final Map> chunks = new HashMap>(); - final NodeLocator locator = mconn.getLocator(); + return asyncGetBulk(keys, tc, new ArrayList<>(), null, listener, nodeValidator); + } - //Populate Node and key Map + private void populateChunks(Collection keys, + Map> chunks, + NodeLocator locator, + BiPredicate nodeValidator) { for (String key : keys) { EVCacheClientUtil.validateKey(key, opFact instanceof BinaryOperationFactory); final MemcachedNode primaryNode = locator.getPrimary(key); if (primaryNode.isActive() && nodeValidator.test(primaryNode, key)) { - Collection ks = chunks.computeIfAbsent(primaryNode, k -> new ArrayList<>()); - ks.add(key); + chunks.computeIfAbsent(primaryNode, k -> new ArrayList<>()).add(key); } } + } + + public EVCacheBulkGetFuture asyncGetBulk(Collection unHashedKeys, + final Transcoder tc, + Collection hashedKeys, + final EVCacheTranscoder evCacheTranscoder, + EVCacheGetOperationListener listener, + BiPredicate nodeValidator) { + // Use Map> to accept both T (from tc) and Object (from evCacheTranscoder) + final Map> m = new ConcurrentHashMap<>(); + + // Break the gets down into groups by key + final Map> chunks = new HashMap>(); + final NodeLocator locator = mconn.getLocator(); + + //Populate Node and key Map - using separate loops for efficiency (avoids Stream overhead in hot path) + populateChunks(unHashedKeys, chunks, locator, nodeValidator); + populateChunks(hashedKeys, chunks, locator, nodeValidator); final AtomicInteger pendingChunks = new AtomicInteger(chunks.size()); int initialLatchCount = chunks.isEmpty() ? 0 : 1; final CountDownLatch latch = new CountDownLatch(initialLatchCount); final Collection ops = new ArrayList(chunks.size()); - final EVCacheBulkGetFuture rv = new EVCacheBulkGetFuture(m, ops, latch, executorService, client); + // Cast the map to Map> for EVCacheBulkGetFuture + // The map contains Object values which will be post-processed by the caller + @SuppressWarnings("unchecked") + final Map> castedMap = (Map>) (Map) m; + final EVCacheBulkGetFuture rv = new EVCacheBulkGetFuture(castedMap, ops, latch, executorService, client); rv.setExpectedCount(chunks.size()); final DistributionSummary dataSizeDS = getDataSizeDistributionSummary( - EVCacheMetricsFactory.BULK_OPERATION, - EVCacheMetricsFactory.READ, - EVCacheMetricsFactory.IPC_SIZE_INBOUND); + EVCacheMetricsFactory.BULK_OPERATION, + EVCacheMetricsFactory.READ, + EVCacheMetricsFactory.IPC_SIZE_INBOUND); class EVCacheBulkGetSingleFutureCallback implements GetOperation.Callback { final int thisOpId; @@ -345,16 +358,25 @@ void bindOp(GetOperation op) { @Override public void receivedStatus(OperationStatus status) { - if (log.isDebugEnabled()) log.debug("GetBulk Keys : " + keys + "; Status : " + status.getStatusCode().name() + "; Message : " + status.getMessage() + "; Elapsed Time - " + (System.currentTimeMillis() - rv.getStartTime())); + if (log.isDebugEnabled()) log.debug("GetBulk Keys (unhashed): " + unHashedKeys + ", (hashed): " + hashedKeys + "; Status : " + status.getStatusCode().name() + "; Message : " + status.getMessage() + "; Elapsed Time - " + (System.currentTimeMillis() - rv.getStartTime())); rv.setStatus(status); } @Override + @SuppressWarnings("unchecked") public void gotData(String k, int flags, byte[] data) { if (data != null) { dataSizeDS.record(data.length); } - m.put(k, tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()))); + if (unHashedKeys.contains(k)) { + // Cast Future to Future for the map + m.put(k, (Future) (Future) tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()))); + } else if (hashedKeys.contains(k)) { + // evCacheTranscoder already returns Future, but cast for consistency + m.put(k, tcService.decode(evCacheTranscoder, new CachedData(flags, data, evCacheTranscoder.getMaxSize()))); + } else { + throw new IllegalStateException("Key was in neither unHashedKeys nor hashedKeys map"); + } } @Override @@ -364,7 +386,7 @@ public void complete() { rv.signalSingleOpComplete(thisOpId, op); if (pendingChunks.decrementAndGet() <= 0) { latch.countDown(); - getTimer(EVCacheMetricsFactory.BULK_OPERATION, EVCacheMetricsFactory.READ, rv.getStatus(), (m.size() == keys.size() ? EVCacheMetricsFactory.YES : EVCacheMetricsFactory.NO), null, getReadMetricMaxValue()).record((System.currentTimeMillis() - rv.getStartTime()), TimeUnit.MILLISECONDS); + getTimer(EVCacheMetricsFactory.BULK_OPERATION, EVCacheMetricsFactory.READ, rv.getStatus(), (m.size() == (unHashedKeys.size() + hashedKeys.size()) ? EVCacheMetricsFactory.YES : EVCacheMetricsFactory.NO), null, getReadMetricMaxValue()).record((System.currentTimeMillis() - rv.getStartTime()), TimeUnit.MILLISECONDS); rv.signalComplete(); } } @@ -372,7 +394,7 @@ public void complete() { // Now that we know how many servers it breaks down into, and the latch // is all set up, convert all of these strings collections to operations - final Map mops = new HashMap(); + final Map mops = new HashMap<>(); int thisOpId = 0; for (Map.Entry> me : chunks.entrySet()) { EVCacheBulkGetSingleFutureCallback cb = new EVCacheBulkGetSingleFutureCallback(thisOpId);