From e18247613d6b3ad6329d461862d7ab1833f83be5 Mon Sep 17 00:00:00 2001 From: Sunjeet Date: Fri, 23 Jan 2026 11:18:16 -0800 Subject: [PATCH 1/7] take 1 --- .../java/com/netflix/evcache/EVCacheImpl.java | 134 +++++++++++------- .../operation/EVCacheBulkGetFuture.java | 47 +++--- .../operation/EVCacheOperationFuture.java | 2 +- .../netflix/evcache/pool/EVCacheClient.java | 2 +- .../spy/memcached/EVCacheMemcachedClient.java | 59 ++++---- 5 files changed, 139 insertions(+), 105 deletions(-) diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java index c3606e3f..5770af0b 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java @@ -2,41 +2,11 @@ import static com.netflix.evcache.util.Sneaky.sneakyThrow; -import java.lang.management.ManagementFactory; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.Function; -import java.util.stream.Collectors; - -import javax.management.MBeanServer; -import javax.management.ObjectName; - -import com.netflix.evcache.dto.KeyMapDto; -import com.netflix.evcache.util.EVCacheBulkDataDto; -import com.netflix.evcache.util.KeyHasher; -import com.netflix.evcache.util.RetryCount; -import com.netflix.evcache.util.Sneaky; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.netflix.archaius.api.Property; import com.netflix.archaius.api.PropertyRepository; import com.netflix.evcache.EVCacheInMemoryCache.DataNotFoundException; import com.netflix.evcache.EVCacheLatch.Policy; +import com.netflix.evcache.dto.KeyMapDto; import com.netflix.evcache.event.EVCacheEvent; import com.netflix.evcache.event.EVCacheEventListener; import com.netflix.evcache.metrics.EVCacheMetricsFactory; @@ -52,14 +22,40 @@ import com.netflix.evcache.pool.EVCacheClientUtil; import com.netflix.evcache.pool.EVCacheValue; import com.netflix.evcache.pool.ServerGroup; +import com.netflix.evcache.util.EVCacheBulkDataDto; +import com.netflix.evcache.util.KeyHasher; +import com.netflix.evcache.util.RetryCount; +import com.netflix.evcache.util.Sneaky; import com.netflix.spectator.api.BasicTag; import com.netflix.spectator.api.Counter; import com.netflix.spectator.api.DistributionSummary; import com.netflix.spectator.api.Tag; import com.netflix.spectator.api.Timer; - +import java.lang.management.ManagementFactory; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; +import java.util.stream.Collectors; +import javax.management.MBeanServer; +import javax.management.ObjectName; import net.spy.memcached.CachedData; import net.spy.memcached.transcoders.Transcoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import rx.Observable; import rx.Scheduler; import rx.Single; @@ -1899,33 +1895,71 @@ private CompletableFuture> getAsyncBulkData(EVCacheClient private CompletableFuture> getAsyncBulkData(EVCacheClient client, List evcacheKeys, Transcoder tc) { - KeyMapDto keyMapDto = buildKeyMap(client, evcacheKeys); - final Map keyMap = keyMapDto.getKeyMap(); - boolean hasHashedKey = keyMapDto.isKeyHashed(); - if (hasHashedKey) { + // Split keys into hashed and non-hashed to use appropriate transcoder for each + final Map hashedKeyMap = new HashMap<>(); + final Map nonHashedKeyMap = new HashMap<>(); + + for (EVCacheKey evcKey : evcacheKeys) { + String key = evcKey.getCanonicalKey(client.isDuetClient()); + String hashKey = evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), + client.shouldEncodeHashKey(), client.getMaxDigestBytes(), + client.getMaxHashLength(), client.getBaseEncoder()); + if (hashKey != null) { + hashedKeyMap.put(hashKey, evcKey); + } else { + nonHashedKeyMap.put(key, evcKey); + } + } + + final Transcoder tcCopy = (tc == null && _transcoder != null) ? (Transcoder) _transcoder : tc; + + // Create futures for hashed and non-hashed keys + + CompletableFuture> nonHashedFuture; + if (!nonHashedKeyMap.isEmpty()) { if (log.isDebugEnabled() && shouldLog()) { - log.debug("fetching bulk data with hashedKey {} ",evcacheKeys); + log.debug("fetching bulk data with non hashedKey {} ", nonHashedKeyMap.keySet()); } - return client.getAsyncBulk(keyMap.keySet(), evcacheValueTranscoder) - .thenApply(data -> buildHashedKeyValueResult(data, tc, client, keyMap)) - .exceptionally(t -> handleBulkException(t, evcacheKeys)); + nonHashedFuture = client.getAsyncBulk(nonHashedKeyMap.keySet(), tcCopy) + .thenApply(data -> buildNonHashedKeyValueResult(data, nonHashedKeyMap)); } else { - final Transcoder tcCopy; - if (tc == null && _transcoder != null) { - tcCopy = (Transcoder) _transcoder; - } else { - tcCopy = tc; - } + nonHashedFuture = CompletableFuture.completedFuture(new HashMap<>()); + } + + CompletableFuture> hashedFuture; + if (!hashedKeyMap.isEmpty()) { if (log.isDebugEnabled() && shouldLog()) { - log.debug("fetching bulk data with non hashedKey {} ",keyMap.keySet()); + log.debug("fetching bulk data with hashedKey {} ", hashedKeyMap.keySet()); } - return client.getAsyncBulk(keyMap.keySet(), tcCopy ) - .thenApply(data -> buildNonHashedKeyValueResult(data, keyMap)) - .exceptionally(t -> handleBulkException(t, evcacheKeys)); + hashedFuture = client.getAsyncBulk(hashedKeyMap.keySet(), evcacheValueTranscoder) + .thenApply(data -> buildHashedKeyValueResult(data, tcCopy, client, hashedKeyMap)); + } else { + hashedFuture = CompletableFuture.completedFuture(new HashMap<>()); } + + // Combine results from both hashed and non-hashed keys + return hashedFuture.thenCombine(nonHashedFuture, (hashedResults, nonHashedResults) -> { + try { + Map result = new HashMap<>(); + if (hashedResults != null) { + result.putAll(hashedResults); + } + if (nonHashedResults != null) { + result.putAll(nonHashedResults); + } + return result; + } catch (Exception e) { + log.error("SNAP: {}", e); + throw new RuntimeException(e); + } + + }) + .exceptionally(t -> handleBulkException(t, evcacheKeys)); } private Map handleBulkException(Throwable t, Collection evCacheKeys) { + System.out.println("SNAP: " + t); + log.error("SNAP: {}", t); if (log.isDebugEnabled() && shouldLog()) log.debug("Exception while getBulk data for APP " + _appName + ", key : " + evCacheKeys, t); throw Sneaky.sneakyThrow(t); diff --git a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java index 07d60bd8..107f485d 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java +++ b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java @@ -1,5 +1,12 @@ package com.netflix.evcache.operation; +import com.netflix.evcache.metrics.EVCacheMetricsFactory; +import com.netflix.evcache.pool.EVCacheClient; +import com.netflix.evcache.pool.ServerGroup; +import com.netflix.evcache.util.EVCacheConfig; +import com.netflix.spectator.api.BasicTag; +import com.netflix.spectator.api.Tag; +import com.sun.management.GcInfo; import java.lang.management.GarbageCollectorMXBean; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; @@ -8,33 +15,25 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceArray; - -import com.netflix.evcache.EVCacheGetOperationListener; -import com.netflix.evcache.util.Pair; -import net.spy.memcached.internal.BulkGetCompletionListener; -import net.spy.memcached.internal.CheckedOperationTimeoutException; -import net.spy.memcached.ops.GetOperation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.netflix.evcache.metrics.EVCacheMetricsFactory; -import com.netflix.evcache.pool.EVCacheClient; -import com.netflix.evcache.pool.ServerGroup; -import com.netflix.evcache.util.EVCacheConfig; -import com.netflix.spectator.api.BasicTag; -import com.netflix.spectator.api.Tag; -import com.sun.management.GcInfo; - import net.spy.memcached.MemcachedConnection; import net.spy.memcached.internal.BulkGetFuture; +import net.spy.memcached.internal.CheckedOperationTimeoutException; +import net.spy.memcached.ops.GetOperation; import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationState; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import rx.Scheduler; import rx.Single; @@ -211,6 +210,7 @@ public CompletableFuture> getAsyncSome(long timeout, TimeUnit uni doAsyncGetSome(future); return future.handle((data, ex) -> { if (ex != null) { + log.error("SNAP: ", ex); handleBulkException(); } return data; @@ -219,8 +219,12 @@ public CompletableFuture> getAsyncSome(long timeout, TimeUnit uni public void handleBulkException() { ExecutionException t = null; + Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); + if (state == null) { + throw new RuntimeException("An operation in bulk get terminated without a state- either timed out, or cancelled, or some other error"); + } if (!state.completed) { if (state.cancelled) { throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled"))); @@ -248,8 +252,9 @@ public void doAsyncGetSome(CompletableFuture> promise) { m.put(me.getKey(), (T)me.getValue()); } promise.complete(m); - } catch (Exception t) { - promise.completeExceptionally(t); + } catch (Exception e) { + log.error("SNAP: ", e); + promise.completeExceptionally(e); } }); } diff --git a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheOperationFuture.java b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheOperationFuture.java index 2b9941cd..c701ba32 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheOperationFuture.java +++ b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheOperationFuture.java @@ -293,7 +293,7 @@ private static CompletableFuture getNext(CompletableFuture future, timeout, unit, timeoutSlots); - future.completeExceptionally(new TimeoutException("Timeout after " + timeout)); + future.completeExceptionally(new TimeoutException("Timeout after " + timeout + "ms")); }, splitTimeout, unit); diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java index bef04d88..9e9b3875 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java @@ -988,7 +988,7 @@ public CompletableFuture> getAsyncBulk(Collection can if (tc == null) tc = (Transcoder) getTranscoder(); return evcacheMemcachedClient .asyncGetBulk(canonicalKeys, tc, null, validator) - .getAsyncSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS); + .getAsyncSome(bulkReadTimeout.get() * 1000, TimeUnit.MILLISECONDS);// SNAP: TODO: } diff --git a/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java b/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java index 9fc6f875..76b79458 100644 --- a/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java +++ b/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java @@ -1,33 +1,8 @@ package net.spy.memcached; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiPredicate; -import java.util.function.Consumer; - -import com.netflix.archaius.api.PropertyRepository; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.netflix.archaius.api.Property; import com.netflix.archaius.api.Property.Subscription; +import com.netflix.archaius.api.PropertyRepository; import com.netflix.evcache.EVCacheGetOperationListener; import com.netflix.evcache.EVCacheLatch; import com.netflix.evcache.metrics.EVCacheMetricsFactory; @@ -44,8 +19,24 @@ import com.netflix.spectator.api.DistributionSummary; import com.netflix.spectator.api.Tag; import com.netflix.spectator.api.Timer; -import com.netflix.spectator.ipc.IpcStatus; - +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiPredicate; +import java.util.function.Consumer; import net.spy.memcached.internal.GetFuture; import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.ops.ConcatenationType; @@ -56,16 +47,16 @@ import net.spy.memcached.ops.Operation; import net.spy.memcached.ops.OperationCallback; import net.spy.memcached.ops.OperationStatus; -import net.spy.memcached.ops.StatsOperation; import net.spy.memcached.ops.StatusCode; import net.spy.memcached.ops.StoreOperation; import net.spy.memcached.ops.StoreType; -import net.spy.memcached.protocol.binary.BinaryOperationFactory; -import net.spy.memcached.transcoders.Transcoder; -import net.spy.memcached.util.StringUtils; import net.spy.memcached.protocol.ascii.ExecCmdOperation; import net.spy.memcached.protocol.ascii.MetaDebugOperation; import net.spy.memcached.protocol.ascii.MetaGetOperation; +import net.spy.memcached.protocol.binary.BinaryOperationFactory; +import net.spy.memcached.transcoders.Transcoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @edu.umd.cs.findbugs.annotations.SuppressFBWarnings({ "PRMC_POSSIBLY_REDUNDANT_METHOD_CALLS", "SIC_INNER_SHOULD_BE_STATIC_ANON" }) @@ -317,6 +308,9 @@ public EVCacheBulkGetFuture asyncGetBulk(Collection keys, } } + // @SuppressWarnings("unchecked") + // final Transcoder myTranscoder = (tc == null) ? (Transcoder) getTranscoder() : tc; + final AtomicInteger pendingChunks = new AtomicInteger(chunks.size()); int initialLatchCount = chunks.isEmpty() ? 0 : 1; final CountDownLatch latch = new CountDownLatch(initialLatchCount); @@ -354,6 +348,7 @@ public void gotData(String k, int flags, byte[] data) { if (data != null) { dataSizeDS.record(data.length); } + // m.put(k, tcService.decode(myTranscoder, new CachedData(flags, data, myTranscoder.getMaxSize()))); m.put(k, tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()))); } From 662ae3682d890671bb8df86ed320c0011d98436b Mon Sep 17 00:00:00 2001 From: Sunjeet Date: Fri, 23 Jan 2026 14:09:08 -0800 Subject: [PATCH 2/7] try- smart transcoder- fail --- .../java/com/netflix/evcache/EVCacheImpl.java | 150 +++++++++++------- .../com/netflix/evcache/dto/KeyMapDto.java | 27 ++-- .../operation/EVCacheBulkGetFuture.java | 1 - 3 files changed, 112 insertions(+), 66 deletions(-) diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java index 5770af0b..dee9ee40 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java @@ -1,5 +1,6 @@ package com.netflix.evcache; +import static com.netflix.evcache.EVCacheSerializingTranscoder.SERIALIZED; import static com.netflix.evcache.util.Sneaky.sneakyThrow; import com.netflix.archaius.api.Property; @@ -1895,65 +1896,72 @@ private CompletableFuture> getAsyncBulkData(EVCacheClient private CompletableFuture> getAsyncBulkData(EVCacheClient client, List evcacheKeys, Transcoder tc) { - // Split keys into hashed and non-hashed to use appropriate transcoder for each - final Map hashedKeyMap = new HashMap<>(); - final Map nonHashedKeyMap = new HashMap<>(); - - for (EVCacheKey evcKey : evcacheKeys) { - String key = evcKey.getCanonicalKey(client.isDuetClient()); - String hashKey = evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), - client.shouldEncodeHashKey(), client.getMaxDigestBytes(), - client.getMaxHashLength(), client.getBaseEncoder()); - if (hashKey != null) { - hashedKeyMap.put(hashKey, evcKey); - } else { - nonHashedKeyMap.put(key, evcKey); - } - } + KeyMapDto keyMapDto = buildKeyMap(client, evcacheKeys); final Transcoder tcCopy = (tc == null && _transcoder != null) ? (Transcoder) _transcoder : tc; - // Create futures for hashed and non-hashed keys + if (log.isDebugEnabled() && shouldLog()) { + log.debug("fetching bulk data for keys {} ", keyMapDto.getAllKeys()); + } + + // Smart transcoder that checks flags to determine how to decode + // Hashed keys (with SERIALIZED flag) -> use evcacheValueTranscoder to unwrap EVCacheValue + // Non-hashed keys -> use user's transcoder directly + Transcoder smartTranscoder = new Transcoder() { + @Override + public boolean asyncDecode(CachedData d) { + // SNAP: TODO: could handle do this more smartly + // Check both transcoders - if either wants async decode, do it async + boolean evcacheNeedsAsync = evcacheValueTranscoder.asyncDecode(d); + boolean userNeedsAsync = tcCopy != null ? tcCopy.asyncDecode(d) : false; + return evcacheNeedsAsync || userNeedsAsync; + } + + @Override + public T decode(CachedData d) { + // Check if this looks like an EVCacheValue (SERIALIZED flag = 1) + if ((d.getFlags() & 1) != 0) { + // SERIALIZED flag is set - decode with evcacheValueTranscoder + Object obj = evcacheValueTranscoder.decode(d); + if (obj instanceof EVCacheValue) { + // Hashed key - unwrap the EVCacheValue and decode inner data + final EVCacheValue val = (EVCacheValue) obj; + final CachedData innerData = new CachedData(val.getFlags(), val.getValue(), CachedData.MAX_SIZE); + if (tcCopy == null) { + return (T) client.getTranscoder().decode(innerData); + } else { + return tcCopy.decode(innerData); + } + } else { + // SERIALIZED flag set but not an EVCacheValue + // This is a non-hashed key with a serialized user object + // evcacheValueTranscoder already decoded it correctly + return (T) obj; + } + } - CompletableFuture> nonHashedFuture; - if (!nonHashedKeyMap.isEmpty()) { - if (log.isDebugEnabled() && shouldLog()) { - log.debug("fetching bulk data with non hashedKey {} ", nonHashedKeyMap.keySet()); + // Non-hashed key with flags=0 - decode with user's transcoder + if (tcCopy == null) { + return (T) client.getTranscoder().decode(d); + } else { + return tcCopy.decode(d); + } } - nonHashedFuture = client.getAsyncBulk(nonHashedKeyMap.keySet(), tcCopy) - .thenApply(data -> buildNonHashedKeyValueResult(data, nonHashedKeyMap)); - } else { - nonHashedFuture = CompletableFuture.completedFuture(new HashMap<>()); - } - CompletableFuture> hashedFuture; - if (!hashedKeyMap.isEmpty()) { - if (log.isDebugEnabled() && shouldLog()) { - log.debug("fetching bulk data with hashedKey {} ", hashedKeyMap.keySet()); + @Override + public CachedData encode(Object o) { + // Encoding not used in bulk get, but provide implementation + return evcacheValueTranscoder.encode(o); } - hashedFuture = client.getAsyncBulk(hashedKeyMap.keySet(), evcacheValueTranscoder) - .thenApply(data -> buildHashedKeyValueResult(data, tcCopy, client, hashedKeyMap)); - } else { - hashedFuture = CompletableFuture.completedFuture(new HashMap<>()); - } - // Combine results from both hashed and non-hashed keys - return hashedFuture.thenCombine(nonHashedFuture, (hashedResults, nonHashedResults) -> { - try { - Map result = new HashMap<>(); - if (hashedResults != null) { - result.putAll(hashedResults); - } - if (nonHashedResults != null) { - result.putAll(nonHashedResults); - } - return result; - } catch (Exception e) { - log.error("SNAP: {}", e); - throw new RuntimeException(e); - } + @Override + public int getMaxSize() { + return evcacheValueTranscoder.getMaxSize(); + } + }; - }) + return client.getAsyncBulk(keyMapDto.getAllKeys(), smartTranscoder) + .thenApply(data -> buildBulkKeyValueResult(data, keyMapDto)) .exceptionally(t -> handleBulkException(t, evcacheKeys)); } @@ -1966,20 +1974,50 @@ private Map handleBulkException(Throwable t, Collection evcacheKeys) { - boolean hasHashedKey = false; - final Map keyMap = new HashMap(evcacheKeys.size() * 2); + final Map hashedKeyMap = new HashMap<>(); + final Map nonHashedKeyMap = new HashMap<>(); + for (EVCacheKey evcKey : evcacheKeys) { String key = evcKey.getCanonicalKey(client.isDuetClient()); String hashKey = evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxDigestBytes(), client.getMaxHashLength(), client.getBaseEncoder()); if (hashKey != null) { if (log.isDebugEnabled() && shouldLog()) log.debug("APP " + _appName + ", key [" + key + "], has been hashed [" + hashKey + "]"); - key = hashKey; - hasHashedKey = true; + hashedKeyMap.put(hashKey, evcKey); + } else { + nonHashedKeyMap.put(key, evcKey); } - keyMap.put(key, evcKey); } - return new KeyMapDto(keyMap, hasHashedKey); + return new KeyMapDto(hashedKeyMap, nonHashedKeyMap); + } + + private Map buildBulkKeyValueResult(Map objMap, + KeyMapDto keyMapDto) { + final Map hashedKeyMap = keyMapDto.getHashedKeyMap(); + final Map nonHashedKeyMap = keyMapDto.getNonHashedKeyMap(); + final Map retMap = new HashMap<>((int) (objMap.size() / 0.75) + 1); + + for (Map.Entry entry : objMap.entrySet()) { + final String key = entry.getKey(); + final T value = entry.getValue(); + + // Map string key to EVCacheKey - check both maps + EVCacheKey evcKey = hashedKeyMap.get(key); + if (evcKey == null) { + evcKey = nonHashedKeyMap.get(key); + } + + if (evcKey != null) { + if (log.isDebugEnabled() && shouldLog()) + log.debug("APP " + _appName + ", key [" + key + "] EVCacheKey " + evcKey); + retMap.put(evcKey, value); + } else { + // Key not in either map - should not happen + if (log.isWarnEnabled() && shouldLog()) + log.warn("APP " + _appName + ", key [" + key + "] not found in hashedKeyMap or nonHashedKeyMap"); + } + } + return retMap; } private Map buildNonHashedKeyValueResult(Map objMap, diff --git a/evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java b/evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java index 64098e27..e1f254c6 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java +++ b/evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java @@ -3,21 +3,30 @@ import com.netflix.evcache.EVCacheKey; import java.util.Map; +import java.util.Set; +import java.util.HashSet; public class KeyMapDto { - Map keyMap; - boolean isKeyHashed; + Map hashedKeyMap; + Map nonHashedKeyMap; - public KeyMapDto(Map keyMap, boolean isKeyHashed) { - this.keyMap = keyMap; - this.isKeyHashed = isKeyHashed; + public KeyMapDto(Map hashedKeyMap, Map nonHashedKeyMap) { + this.hashedKeyMap = hashedKeyMap; + this.nonHashedKeyMap = nonHashedKeyMap; } - public Map getKeyMap() { - return keyMap; + public Set getAllKeys() { + Set allKeys = new HashSet<>(hashedKeyMap.size() + nonHashedKeyMap.size()); + allKeys.addAll(hashedKeyMap.keySet()); + allKeys.addAll(nonHashedKeyMap.keySet()); + return allKeys; } - public boolean isKeyHashed() { - return isKeyHashed; + public Map getHashedKeyMap() { + return hashedKeyMap; + } + + public Map getNonHashedKeyMap() { + return nonHashedKeyMap; } } diff --git a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java index 107f485d..b038cbc0 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java +++ b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java @@ -219,7 +219,6 @@ public CompletableFuture> getAsyncSome(long timeout, TimeUnit uni public void handleBulkException() { ExecutionException t = null; - Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); if (state == null) { From 75d3eee319889b58b016f4ce11a3ca3248e4d2bf Mon Sep 17 00:00:00 2001 From: Sunjeet Date: Fri, 23 Jan 2026 14:09:20 -0800 Subject: [PATCH 3/7] Revert "try- smart transcoder- fail" This reverts commit 662ae3682d890671bb8df86ed320c0011d98436b. --- .../java/com/netflix/evcache/EVCacheImpl.java | 150 +++++++----------- .../com/netflix/evcache/dto/KeyMapDto.java | 27 ++-- .../operation/EVCacheBulkGetFuture.java | 1 + 3 files changed, 66 insertions(+), 112 deletions(-) diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java index dee9ee40..5770af0b 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java @@ -1,6 +1,5 @@ package com.netflix.evcache; -import static com.netflix.evcache.EVCacheSerializingTranscoder.SERIALIZED; import static com.netflix.evcache.util.Sneaky.sneakyThrow; import com.netflix.archaius.api.Property; @@ -1896,72 +1895,65 @@ private CompletableFuture> getAsyncBulkData(EVCacheClient private CompletableFuture> getAsyncBulkData(EVCacheClient client, List evcacheKeys, Transcoder tc) { - KeyMapDto keyMapDto = buildKeyMap(client, evcacheKeys); + // Split keys into hashed and non-hashed to use appropriate transcoder for each + final Map hashedKeyMap = new HashMap<>(); + final Map nonHashedKeyMap = new HashMap<>(); + + for (EVCacheKey evcKey : evcacheKeys) { + String key = evcKey.getCanonicalKey(client.isDuetClient()); + String hashKey = evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), + client.shouldEncodeHashKey(), client.getMaxDigestBytes(), + client.getMaxHashLength(), client.getBaseEncoder()); + if (hashKey != null) { + hashedKeyMap.put(hashKey, evcKey); + } else { + nonHashedKeyMap.put(key, evcKey); + } + } final Transcoder tcCopy = (tc == null && _transcoder != null) ? (Transcoder) _transcoder : tc; - if (log.isDebugEnabled() && shouldLog()) { - log.debug("fetching bulk data for keys {} ", keyMapDto.getAllKeys()); - } - - // Smart transcoder that checks flags to determine how to decode - // Hashed keys (with SERIALIZED flag) -> use evcacheValueTranscoder to unwrap EVCacheValue - // Non-hashed keys -> use user's transcoder directly - Transcoder smartTranscoder = new Transcoder() { - @Override - public boolean asyncDecode(CachedData d) { - // SNAP: TODO: could handle do this more smartly - // Check both transcoders - if either wants async decode, do it async - boolean evcacheNeedsAsync = evcacheValueTranscoder.asyncDecode(d); - boolean userNeedsAsync = tcCopy != null ? tcCopy.asyncDecode(d) : false; - return evcacheNeedsAsync || userNeedsAsync; - } - - @Override - public T decode(CachedData d) { - // Check if this looks like an EVCacheValue (SERIALIZED flag = 1) - if ((d.getFlags() & 1) != 0) { - // SERIALIZED flag is set - decode with evcacheValueTranscoder - Object obj = evcacheValueTranscoder.decode(d); - if (obj instanceof EVCacheValue) { - // Hashed key - unwrap the EVCacheValue and decode inner data - final EVCacheValue val = (EVCacheValue) obj; - final CachedData innerData = new CachedData(val.getFlags(), val.getValue(), CachedData.MAX_SIZE); - if (tcCopy == null) { - return (T) client.getTranscoder().decode(innerData); - } else { - return tcCopy.decode(innerData); - } - } else { - // SERIALIZED flag set but not an EVCacheValue - // This is a non-hashed key with a serialized user object - // evcacheValueTranscoder already decoded it correctly - return (T) obj; - } - } + // Create futures for hashed and non-hashed keys - // Non-hashed key with flags=0 - decode with user's transcoder - if (tcCopy == null) { - return (T) client.getTranscoder().decode(d); - } else { - return tcCopy.decode(d); - } + CompletableFuture> nonHashedFuture; + if (!nonHashedKeyMap.isEmpty()) { + if (log.isDebugEnabled() && shouldLog()) { + log.debug("fetching bulk data with non hashedKey {} ", nonHashedKeyMap.keySet()); } + nonHashedFuture = client.getAsyncBulk(nonHashedKeyMap.keySet(), tcCopy) + .thenApply(data -> buildNonHashedKeyValueResult(data, nonHashedKeyMap)); + } else { + nonHashedFuture = CompletableFuture.completedFuture(new HashMap<>()); + } - @Override - public CachedData encode(Object o) { - // Encoding not used in bulk get, but provide implementation - return evcacheValueTranscoder.encode(o); + CompletableFuture> hashedFuture; + if (!hashedKeyMap.isEmpty()) { + if (log.isDebugEnabled() && shouldLog()) { + log.debug("fetching bulk data with hashedKey {} ", hashedKeyMap.keySet()); } + hashedFuture = client.getAsyncBulk(hashedKeyMap.keySet(), evcacheValueTranscoder) + .thenApply(data -> buildHashedKeyValueResult(data, tcCopy, client, hashedKeyMap)); + } else { + hashedFuture = CompletableFuture.completedFuture(new HashMap<>()); + } - @Override - public int getMaxSize() { - return evcacheValueTranscoder.getMaxSize(); - } - }; + // Combine results from both hashed and non-hashed keys + return hashedFuture.thenCombine(nonHashedFuture, (hashedResults, nonHashedResults) -> { + try { + Map result = new HashMap<>(); + if (hashedResults != null) { + result.putAll(hashedResults); + } + if (nonHashedResults != null) { + result.putAll(nonHashedResults); + } + return result; + } catch (Exception e) { + log.error("SNAP: {}", e); + throw new RuntimeException(e); + } - return client.getAsyncBulk(keyMapDto.getAllKeys(), smartTranscoder) - .thenApply(data -> buildBulkKeyValueResult(data, keyMapDto)) + }) .exceptionally(t -> handleBulkException(t, evcacheKeys)); } @@ -1974,50 +1966,20 @@ private Map handleBulkException(Throwable t, Collection evcacheKeys) { - final Map hashedKeyMap = new HashMap<>(); - final Map nonHashedKeyMap = new HashMap<>(); - + boolean hasHashedKey = false; + final Map keyMap = new HashMap(evcacheKeys.size() * 2); for (EVCacheKey evcKey : evcacheKeys) { String key = evcKey.getCanonicalKey(client.isDuetClient()); String hashKey = evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxDigestBytes(), client.getMaxHashLength(), client.getBaseEncoder()); if (hashKey != null) { if (log.isDebugEnabled() && shouldLog()) log.debug("APP " + _appName + ", key [" + key + "], has been hashed [" + hashKey + "]"); - hashedKeyMap.put(hashKey, evcKey); - } else { - nonHashedKeyMap.put(key, evcKey); + key = hashKey; + hasHashedKey = true; } + keyMap.put(key, evcKey); } - return new KeyMapDto(hashedKeyMap, nonHashedKeyMap); - } - - private Map buildBulkKeyValueResult(Map objMap, - KeyMapDto keyMapDto) { - final Map hashedKeyMap = keyMapDto.getHashedKeyMap(); - final Map nonHashedKeyMap = keyMapDto.getNonHashedKeyMap(); - final Map retMap = new HashMap<>((int) (objMap.size() / 0.75) + 1); - - for (Map.Entry entry : objMap.entrySet()) { - final String key = entry.getKey(); - final T value = entry.getValue(); - - // Map string key to EVCacheKey - check both maps - EVCacheKey evcKey = hashedKeyMap.get(key); - if (evcKey == null) { - evcKey = nonHashedKeyMap.get(key); - } - - if (evcKey != null) { - if (log.isDebugEnabled() && shouldLog()) - log.debug("APP " + _appName + ", key [" + key + "] EVCacheKey " + evcKey); - retMap.put(evcKey, value); - } else { - // Key not in either map - should not happen - if (log.isWarnEnabled() && shouldLog()) - log.warn("APP " + _appName + ", key [" + key + "] not found in hashedKeyMap or nonHashedKeyMap"); - } - } - return retMap; + return new KeyMapDto(keyMap, hasHashedKey); } private Map buildNonHashedKeyValueResult(Map objMap, diff --git a/evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java b/evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java index e1f254c6..64098e27 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java +++ b/evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java @@ -3,30 +3,21 @@ import com.netflix.evcache.EVCacheKey; import java.util.Map; -import java.util.Set; -import java.util.HashSet; public class KeyMapDto { - Map hashedKeyMap; - Map nonHashedKeyMap; + Map keyMap; + boolean isKeyHashed; - public KeyMapDto(Map hashedKeyMap, Map nonHashedKeyMap) { - this.hashedKeyMap = hashedKeyMap; - this.nonHashedKeyMap = nonHashedKeyMap; + public KeyMapDto(Map keyMap, boolean isKeyHashed) { + this.keyMap = keyMap; + this.isKeyHashed = isKeyHashed; } - public Set getAllKeys() { - Set allKeys = new HashSet<>(hashedKeyMap.size() + nonHashedKeyMap.size()); - allKeys.addAll(hashedKeyMap.keySet()); - allKeys.addAll(nonHashedKeyMap.keySet()); - return allKeys; + public Map getKeyMap() { + return keyMap; } - public Map getHashedKeyMap() { - return hashedKeyMap; - } - - public Map getNonHashedKeyMap() { - return nonHashedKeyMap; + public boolean isKeyHashed() { + return isKeyHashed; } } diff --git a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java index b038cbc0..107f485d 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java +++ b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java @@ -219,6 +219,7 @@ public CompletableFuture> getAsyncSome(long timeout, TimeUnit uni public void handleBulkException() { ExecutionException t = null; + Operation[] opsArray = ops.toArray(new Operation[0]); for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); if (state == null) { From 3cc511e26c50cf5feec537aa74c1ec0f9d98bb52 Mon Sep 17 00:00:00 2001 From: Sunjeet Date: Fri, 23 Jan 2026 14:47:21 -0800 Subject: [PATCH 4/7] wip --- .../java/com/netflix/evcache/EVCacheImpl.java | 100 +++++++----------- .../com/netflix/evcache/dto/KeyMapDto.java | 33 ++++-- .../netflix/evcache/pool/EVCacheClient.java | 63 ++++++----- .../spy/memcached/EVCacheMemcachedClient.java | 46 +++++--- 4 files changed, 131 insertions(+), 111 deletions(-) diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java index 5770af0b..b4f4a653 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java @@ -1895,91 +1895,69 @@ private CompletableFuture> getAsyncBulkData(EVCacheClient private CompletableFuture> getAsyncBulkData(EVCacheClient client, List evcacheKeys, Transcoder tc) { - // Split keys into hashed and non-hashed to use appropriate transcoder for each - final Map hashedKeyMap = new HashMap<>(); - final Map nonHashedKeyMap = new HashMap<>(); - - for (EVCacheKey evcKey : evcacheKeys) { - String key = evcKey.getCanonicalKey(client.isDuetClient()); - String hashKey = evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), - client.shouldEncodeHashKey(), client.getMaxDigestBytes(), - client.getMaxHashLength(), client.getBaseEncoder()); - if (hashKey != null) { - hashedKeyMap.put(hashKey, evcKey); + KeyMapDto keyMapDto = buildKeyMap(client, evcacheKeys); + Map hashedKeyMap = keyMapDto.getHashedKeyMap(); + Map unHashedKeyMap = keyMapDto.getNonHashedKeyMap(); + + if (!unHashedKeyMap.isEmpty() && hashedKeyMap.isEmpty()) { // all keys are non-hashed + final Transcoder tcCopy; + if (tc == null && _transcoder != null) { + tcCopy = (Transcoder) _transcoder; } else { - nonHashedKeyMap.put(key, evcKey); + tcCopy = tc; } - } - - final Transcoder tcCopy = (tc == null && _transcoder != null) ? (Transcoder) _transcoder : tc; - - // Create futures for hashed and non-hashed keys - - CompletableFuture> nonHashedFuture; - if (!nonHashedKeyMap.isEmpty()) { if (log.isDebugEnabled() && shouldLog()) { - log.debug("fetching bulk data with non hashedKey {} ", nonHashedKeyMap.keySet()); + log.debug("fetching bulk data with un-hashed keys {}", unHashedKeyMap.keySet()); } - nonHashedFuture = client.getAsyncBulk(nonHashedKeyMap.keySet(), tcCopy) - .thenApply(data -> buildNonHashedKeyValueResult(data, nonHashedKeyMap)); - } else { - nonHashedFuture = CompletableFuture.completedFuture(new HashMap<>()); - } - - CompletableFuture> hashedFuture; - if (!hashedKeyMap.isEmpty()) { + return client.getAsyncBulk(unHashedKeyMap.keySet(), tcCopy ) + .thenApply(data -> buildNonHashedKeyValueResult(data, unHashedKeyMap)) + .exceptionally(t -> handleBulkException(t, evcacheKeys)); + } else if (!hashedKeyMap.isEmpty() && unHashedKeyMap.isEmpty()) { // all keys are hashed if (log.isDebugEnabled() && shouldLog()) { - log.debug("fetching bulk data with hashedKey {} ", hashedKeyMap.keySet()); + log.debug("fetching bulk data with hashed keys {} : {}", hashedKeyMap.keySet()); + } + return client.getAsyncBulk(hashedKeyMap.keySet(), evcacheValueTranscoder) + .thenApply(data -> buildHashedKeyValueResult(data, tc, client, hashedKeyMap)) + .exceptionally(t -> handleBulkException(t, evcacheKeys)); + } else { // a mix of hashed and un-hashed keys + final Transcoder tcCopy; + if (tc == null && _transcoder != null) { + tcCopy = (Transcoder) _transcoder; + } else { + tcCopy = tc; } - hashedFuture = client.getAsyncBulk(hashedKeyMap.keySet(), evcacheValueTranscoder) - .thenApply(data -> buildHashedKeyValueResult(data, tcCopy, client, hashedKeyMap)); - } else { - hashedFuture = CompletableFuture.completedFuture(new HashMap<>()); + if (log.isDebugEnabled() && shouldLog()) { + log.debug("fetching bulk data with hashed keys: {} AND un-hashed keys {}", hashedKeyMap.keySet(), unHashedKeyMap.keySet()); + } + return client.getAsyncBulk(hashedKeyMap.keySet(), evcacheValueTranscoder, + unHashedKeyMap.keySet(), tcCopy) + .thenApply(data -> buildHashedKeyValueResult(data, tc, client, hashedKeyMap)) + .exceptionally(t -> handleBulkException(t, evcacheKeys)); } - - // Combine results from both hashed and non-hashed keys - return hashedFuture.thenCombine(nonHashedFuture, (hashedResults, nonHashedResults) -> { - try { - Map result = new HashMap<>(); - if (hashedResults != null) { - result.putAll(hashedResults); - } - if (nonHashedResults != null) { - result.putAll(nonHashedResults); - } - return result; - } catch (Exception e) { - log.error("SNAP: {}", e); - throw new RuntimeException(e); - } - - }) - .exceptionally(t -> handleBulkException(t, evcacheKeys)); } private Map handleBulkException(Throwable t, Collection evCacheKeys) { - System.out.println("SNAP: " + t); - log.error("SNAP: {}", t); if (log.isDebugEnabled() && shouldLog()) log.debug("Exception while getBulk data for APP " + _appName + ", key : " + evCacheKeys, t); throw Sneaky.sneakyThrow(t); } private KeyMapDto buildKeyMap(EVCacheClient client, Collection evcacheKeys) { - boolean hasHashedKey = false; - final Map keyMap = new HashMap(evcacheKeys.size() * 2); + final Map hashedKeyMap = new HashMap<>(); + final Map nonHashedKeyMap = new HashMap<>(); + for (EVCacheKey evcKey : evcacheKeys) { String key = evcKey.getCanonicalKey(client.isDuetClient()); String hashKey = evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxDigestBytes(), client.getMaxHashLength(), client.getBaseEncoder()); if (hashKey != null) { if (log.isDebugEnabled() && shouldLog()) log.debug("APP " + _appName + ", key [" + key + "], has been hashed [" + hashKey + "]"); - key = hashKey; - hasHashedKey = true; + hashedKeyMap.put(hashKey, evcKey); + } else { + nonHashedKeyMap.put(key, evcKey); } - keyMap.put(key, evcKey); } - return new KeyMapDto(keyMap, hasHashedKey); + return new KeyMapDto(hashedKeyMap, nonHashedKeyMap); } private Map buildNonHashedKeyValueResult(Map objMap, diff --git a/evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java b/evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java index 64098e27..cb1d4a5d 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java +++ b/evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java @@ -1,23 +1,36 @@ package com.netflix.evcache.dto; import com.netflix.evcache.EVCacheKey; - +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import javax.annotation.Nonnull; public class KeyMapDto { - Map keyMap; - boolean isKeyHashed; + Map hashedKeyMap; + Map nonHashedKeyMap; + + public KeyMapDto(@Nonnull Map hashedKeyMap, @Nonnull Map nonHashedKeyMap) { + this.hashedKeyMap = hashedKeyMap; + this.nonHashedKeyMap = nonHashedKeyMap; + } + + public Map getHashedKeyMap() { + return hashedKeyMap; + } - public KeyMapDto(Map keyMap, boolean isKeyHashed) { - this.keyMap = keyMap; - this.isKeyHashed = isKeyHashed; + public Map getNonHashedKeyMap() { + return nonHashedKeyMap; } - public Map getKeyMap() { - return keyMap; + public Set getAllKeys() { + Set allKeys = new HashSet<>(hashedKeyMap.size() + nonHashedKeyMap.size()); + allKeys.addAll(hashedKeyMap.keySet()); + allKeys.addAll(nonHashedKeyMap.keySet()); + return allKeys; } - public boolean isKeyHashed() { - return isKeyHashed; + public boolean isKeyHashed(String key) { + return hashedKeyMap.containsKey(key); } } diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java index 9e9b3875..76b95027 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java @@ -1,29 +1,5 @@ package com.netflix.evcache.pool; -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.PrintWriter; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; -import java.net.URLDecoder; -import java.nio.charset.StandardCharsets; -import java.util.AbstractMap.SimpleEntry; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.*; -import java.util.function.BiPredicate; -import java.util.zip.CRC32; -import java.util.zip.Checksum; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.netflix.archaius.api.Property; import com.netflix.evcache.EVCache; import com.netflix.evcache.EVCache.Call; @@ -32,6 +8,7 @@ import com.netflix.evcache.EVCacheLatch; import com.netflix.evcache.EVCacheReadQueueException; import com.netflix.evcache.EVCacheSerializingTranscoder; +import com.netflix.evcache.EVCacheTranscoder; import com.netflix.evcache.metrics.EVCacheMetricsFactory; import com.netflix.evcache.operation.EVCacheFutures; import com.netflix.evcache.operation.EVCacheItem; @@ -44,19 +21,44 @@ import com.netflix.spectator.api.BasicTag; import com.netflix.spectator.api.Counter; import com.netflix.spectator.api.Tag; - +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.URLDecoder; +import java.nio.charset.StandardCharsets; +import java.util.AbstractMap.SimpleEntry; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.BiPredicate; +import java.util.zip.CRC32; +import java.util.zip.Checksum; import net.spy.memcached.CASValue; import net.spy.memcached.CachedData; import net.spy.memcached.ConnectionFactory; import net.spy.memcached.EVCacheMemcachedClient; import net.spy.memcached.EVCacheNode; -import net.spy.memcached.MemcachedClient; import net.spy.memcached.MemcachedNode; import net.spy.memcached.NodeLocator; import net.spy.memcached.internal.ListenableFuture; import net.spy.memcached.internal.OperationCompletionListener; import net.spy.memcached.internal.OperationFuture; import net.spy.memcached.transcoders.Transcoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import rx.Scheduler; import rx.Single; @@ -992,6 +994,15 @@ public CompletableFuture> getAsyncBulk(Collection can } + public CompletableFuture> getAsyncBulk(Collection hashedKeys, EVCacheTranscoder evCacheTranscoder, Collection unHashedKeys, Transcoder tc) { + final BiPredicate validator = (node, key) -> validateReadQueueSize(node, Call.COMPLETABLE_FUTURE_GET_BULK); + if (tc == null) tc = (Transcoder) getTranscoder(); + return evcacheMemcachedClient + .asyncGetBulk(hashedKeys, evCacheTranscoder, unHashedKeys, tc, null, validator) + .getAsyncSome(bulkReadTimeout.get() * 1000, TimeUnit.MILLISECONDS);// SNAP: TODO: + + } + public Single> getBulk(Collection canonicalKeys, final Transcoder transcoder, boolean _throwException, boolean hasZF, Scheduler scheduler) { try { diff --git a/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java b/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java index 76b79458..12d26e7e 100644 --- a/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java +++ b/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java @@ -5,6 +5,7 @@ import com.netflix.archaius.api.PropertyRepository; import com.netflix.evcache.EVCacheGetOperationListener; import com.netflix.evcache.EVCacheLatch; +import com.netflix.evcache.EVCacheTranscoder; import com.netflix.evcache.metrics.EVCacheMetricsFactory; import com.netflix.evcache.operation.EVCacheAsciiOperationFactory; import com.netflix.evcache.operation.EVCacheBulkGetFuture; @@ -292,6 +293,15 @@ public EVCacheBulkGetFuture asyncGetBulk(Collection keys, final Transcoder tc, EVCacheGetOperationListener listener, BiPredicate nodeValidator) { + return asyncGetBulk(keys, tc, new ArrayList<>(), null, listener, nodeValidator); + } + + public EVCacheBulkGetFuture asyncGetBulk(Collection unHashedKeys, + final Transcoder tc, + Collection hashedKeys, + final EVCacheTranscoder evCacheTranscoder, + EVCacheGetOperationListener listener, + BiPredicate nodeValidator) { final Map> m = new ConcurrentHashMap>(); // Break the gets down into groups by key @@ -299,17 +309,20 @@ public EVCacheBulkGetFuture asyncGetBulk(Collection keys, final NodeLocator locator = mconn.getLocator(); //Populate Node and key Map - for (String key : keys) { + for (String key : unHashedKeys) {// SNAP: TODO: is there a shorthand to iterating over both collections here? EVCacheClientUtil.validateKey(key, opFact instanceof BinaryOperationFactory); final MemcachedNode primaryNode = locator.getPrimary(key); if (primaryNode.isActive() && nodeValidator.test(primaryNode, key)) { - Collection ks = chunks.computeIfAbsent(primaryNode, k -> new ArrayList<>()); - ks.add(key); + chunks.computeIfAbsent(primaryNode, k -> new ArrayList<>()); + } + } + for (String key : hashedKeys) { + EVCacheClientUtil.validateKey(key, opFact instanceof BinaryOperationFactory); + final MemcachedNode primaryNode = locator.getPrimary(key); + if (primaryNode.isActive() && nodeValidator.test(primaryNode, key)) { + chunks.computeIfAbsent(primaryNode, k -> new ArrayList<>()); } } - - // @SuppressWarnings("unchecked") - // final Transcoder myTranscoder = (tc == null) ? (Transcoder) getTranscoder() : tc; final AtomicInteger pendingChunks = new AtomicInteger(chunks.size()); int initialLatchCount = chunks.isEmpty() ? 0 : 1; @@ -319,9 +332,9 @@ public EVCacheBulkGetFuture asyncGetBulk(Collection keys, rv.setExpectedCount(chunks.size()); final DistributionSummary dataSizeDS = getDataSizeDistributionSummary( - EVCacheMetricsFactory.BULK_OPERATION, - EVCacheMetricsFactory.READ, - EVCacheMetricsFactory.IPC_SIZE_INBOUND); + EVCacheMetricsFactory.BULK_OPERATION, + EVCacheMetricsFactory.READ, + EVCacheMetricsFactory.IPC_SIZE_INBOUND); class EVCacheBulkGetSingleFutureCallback implements GetOperation.Callback { final int thisOpId; @@ -339,7 +352,7 @@ void bindOp(GetOperation op) { @Override public void receivedStatus(OperationStatus status) { - if (log.isDebugEnabled()) log.debug("GetBulk Keys : " + keys + "; Status : " + status.getStatusCode().name() + "; Message : " + status.getMessage() + "; Elapsed Time - " + (System.currentTimeMillis() - rv.getStartTime())); + if (log.isDebugEnabled()) log.debug("GetBulk Keys : " + unHashedKeys.addAll(hashedKeys) + "; Status : " + status.getStatusCode().name() + "; Message : " + status.getMessage() + "; Elapsed Time - " + (System.currentTimeMillis() - rv.getStartTime())); rv.setStatus(status); } @@ -348,8 +361,13 @@ public void gotData(String k, int flags, byte[] data) { if (data != null) { dataSizeDS.record(data.length); } - // m.put(k, tcService.decode(myTranscoder, new CachedData(flags, data, myTranscoder.getMaxSize()))); - m.put(k, tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()))); + if (unHashedKeys.contains(k)) { + m.put(k, tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()))); + } else if (hashedKeys.contains(k)) { + m.put(k, tcService.decode(evCacheTranscoder, new CachedData(flags, data, evCacheTranscoder.getMaxSize()))); + } else { + throw new IllegalStateException("// SNAP: TODO: key was in neither map"); + } } @Override @@ -359,7 +377,7 @@ public void complete() { rv.signalSingleOpComplete(thisOpId, op); if (pendingChunks.decrementAndGet() <= 0) { latch.countDown(); - getTimer(EVCacheMetricsFactory.BULK_OPERATION, EVCacheMetricsFactory.READ, rv.getStatus(), (m.size() == keys.size() ? EVCacheMetricsFactory.YES : EVCacheMetricsFactory.NO), null, getReadMetricMaxValue()).record((System.currentTimeMillis() - rv.getStartTime()), TimeUnit.MILLISECONDS); + getTimer(EVCacheMetricsFactory.BULK_OPERATION, EVCacheMetricsFactory.READ, rv.getStatus(), (m.size() == (unHashedKeys.size() + hashedKeys.size()) ? EVCacheMetricsFactory.YES : EVCacheMetricsFactory.NO), null, getReadMetricMaxValue()).record((System.currentTimeMillis() - rv.getStartTime()), TimeUnit.MILLISECONDS); rv.signalComplete(); } } @@ -367,7 +385,7 @@ public void complete() { // Now that we know how many servers it breaks down into, and the latch // is all set up, convert all of these strings collections to operations - final Map mops = new HashMap(); + final Map mops = new HashMap<>(); int thisOpId = 0; for (Map.Entry> me : chunks.entrySet()) { EVCacheBulkGetSingleFutureCallback cb = new EVCacheBulkGetSingleFutureCallback(thisOpId); From d11711c3e42a7a20a15fe2d0c3f48409d40b8b52 Mon Sep 17 00:00:00 2001 From: Sunjeet Date: Fri, 23 Jan 2026 15:01:45 -0800 Subject: [PATCH 5/7] testing --- .../java/com/netflix/evcache/EVCacheImpl.java | 114 +++++++++++++++++- .../netflix/evcache/pool/EVCacheClient.java | 6 +- .../spy/memcached/EVCacheMemcachedClient.java | 22 ++-- 3 files changed, 128 insertions(+), 14 deletions(-) diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java index b4f4a653..bd5252e0 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java @@ -1896,8 +1896,8 @@ private CompletableFuture> getAsyncBulkData(EVCacheClient List evcacheKeys, Transcoder tc) { KeyMapDto keyMapDto = buildKeyMap(client, evcacheKeys); - Map hashedKeyMap = keyMapDto.getHashedKeyMap(); Map unHashedKeyMap = keyMapDto.getNonHashedKeyMap(); + Map hashedKeyMap = keyMapDto.getHashedKeyMap(); if (!unHashedKeyMap.isEmpty() && hashedKeyMap.isEmpty()) { // all keys are non-hashed final Transcoder tcCopy; @@ -1929,9 +1929,13 @@ private CompletableFuture> getAsyncBulkData(EVCacheClient if (log.isDebugEnabled() && shouldLog()) { log.debug("fetching bulk data with hashed keys: {} AND un-hashed keys {}", hashedKeyMap.keySet(), unHashedKeyMap.keySet()); } - return client.getAsyncBulk(hashedKeyMap.keySet(), evcacheValueTranscoder, - unHashedKeyMap.keySet(), tcCopy) - .thenApply(data -> buildHashedKeyValueResult(data, tc, client, hashedKeyMap)) + return client.getAsyncBulk(unHashedKeyMap.keySet(), tcCopy, hashedKeyMap.keySet(), evcacheValueTranscoder) + .thenApply(data -> { + // asyncGetBulk now returns Map for mixed hashed/non-hashed keys + @SuppressWarnings("unchecked") + Map objectMap = (Map) (Map) data; + return myBuildMixedKeyValueResult(objectMap, tc, client, unHashedKeyMap, hashedKeyMap); + }) .exceptionally(t -> handleBulkException(t, evcacheKeys)); } } @@ -2010,6 +2014,108 @@ private Map buildHashedKeyValueResult(Map o return retMap; } + private Map myBuildMixedKeyValueResult(Map objMap, + Transcoder tc, + EVCacheClient client, + Map unHashedKeyMap, + Map hashedKeyMap) { + final Map retMap = new HashMap<>((int) (objMap.size() / 0.75) + 1); + for (Map.Entry i : objMap.entrySet()) { + EVCacheKey evcKey = hashedKeyMap.get(i.getKey());// SNAP: TODO: make final + if (evcKey == null) { + evcKey = unHashedKeyMap.get(i.getKey()); + } + if (evcKey == null) { + throw new IllegalStateException("Key not found anywhere"); // SNAP: TODO: handle this + } + final Object obj = i.getValue(); + if (obj instanceof EVCacheValue) { + if (log.isDebugEnabled() && shouldLog()) + log.debug("APP " + _appName + ", The value for key [" + i.getKey() + "] is EVCache Value"); + final EVCacheValue val = (EVCacheValue) obj; + final CachedData cd = new CachedData(val.getFlags(), val.getValue(), CachedData.MAX_SIZE); + final T tVal; + if (tc == null) { + tVal = (T) client.getTranscoder().decode(cd); + } else { + tVal = tc.decode(cd); + } + if (evcKey.getCanonicalKey(client.isDuetClient()).equals(val.getKey())) { + if (log.isDebugEnabled() && shouldLog()) + log.debug("APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey); + retMap.put(evcKey, tVal); + } else { + if (log.isDebugEnabled() && shouldLog()) + log.debug("CACHE COLLISION : APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey); + incrementFailure(EVCacheMetricsFactory.KEY_HASH_COLLISION, Call.COMPLETABLE_FUTURE_GET_BULK.name(), EVCacheMetricsFactory.READ); + } + } else { + if (log.isDebugEnabled() && shouldLog()) + log.debug("APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey); + retMap.put(evcKey, (T) obj); + } + } + return retMap; + } + + private Map buildMixedKeyValueResult(Map objMap, + Transcoder tc, + EVCacheClient client, + KeyMapDto keyMapDto) { + final Map hashedKeyMap = keyMapDto.getHashedKeyMap(); + final Map nonHashedKeyMap = keyMapDto.getNonHashedKeyMap(); + final Map retMap = new HashMap<>((int) (objMap.size() / 0.75) + 1); + + for (Map.Entry i : objMap.entrySet()) { + final String key = i.getKey(); + final Object obj = i.getValue(); + + if (obj instanceof EVCacheValue) { + // Hashed key - lookup in hashedKeyMap only + final EVCacheValue val = (EVCacheValue) obj; + final EVCacheKey evcKey = hashedKeyMap.get(key); + if (evcKey == null) { + if (log.isWarnEnabled() && shouldLog()) + log.warn("APP " + _appName + ", hashed key [" + key + "] not found in hashedKeyMap"); + continue;// SNAP: TODO: may still want to return that key somehow + } + + // Decode inner data + final CachedData cd = new CachedData(val.getFlags(), val.getValue(), CachedData.MAX_SIZE); + final T tVal; + if (tc == null) { + tVal = (T) client.getTranscoder().decode(cd); + } else { + tVal = tc.decode(cd); + } + + // Verify canonical key matches + if (evcKey.getCanonicalKey(client.isDuetClient()).equals(val.getKey())) { + if (log.isDebugEnabled() && shouldLog()) + log.debug("APP " + _appName + ", key [" + key + "] EVCacheKey " + evcKey); + retMap.put(evcKey, tVal); + } else { + if (log.isDebugEnabled() && shouldLog()) + log.debug("CACHE COLLISION : APP " + _appName + ", key [" + key + "] EVCacheKey " + evcKey); + incrementFailure(EVCacheMetricsFactory.KEY_HASH_COLLISION, Call.COMPLETABLE_FUTURE_GET_BULK.name(), EVCacheMetricsFactory.READ); + } + } else { + // Non-hashed key - lookup in nonHashedKeyMap only + final EVCacheKey evcKey = nonHashedKeyMap.get(key); + if (evcKey == null) { + if (log.isWarnEnabled() && shouldLog()) + log.warn("APP " + _appName + ", non-hashed key [" + key + "] not found in nonHashedKeyMap"); + continue; + } + + if (log.isDebugEnabled() && shouldLog()) + log.debug("APP " + _appName + ", key [" + key + "] EVCacheKey " + evcKey); + retMap.put(evcKey, (T) obj); + } + } + return retMap; + } + private Map getBulkData(EVCacheClient client, Collection evcacheKeys, Transcoder tc, boolean throwException, boolean hasZF) throws Exception { try { boolean hasHashedKey = false; diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java index 76b95027..b95a02a6 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java @@ -994,12 +994,12 @@ public CompletableFuture> getAsyncBulk(Collection can } - public CompletableFuture> getAsyncBulk(Collection hashedKeys, EVCacheTranscoder evCacheTranscoder, Collection unHashedKeys, Transcoder tc) { + public CompletableFuture> getAsyncBulk(Collection unHashedKeys, Transcoder tc, Collection hashedKeys, EVCacheTranscoder evCacheTranscoder) { final BiPredicate validator = (node, key) -> validateReadQueueSize(node, Call.COMPLETABLE_FUTURE_GET_BULK); if (tc == null) tc = (Transcoder) getTranscoder(); return evcacheMemcachedClient - .asyncGetBulk(hashedKeys, evCacheTranscoder, unHashedKeys, tc, null, validator) - .getAsyncSome(bulkReadTimeout.get() * 1000, TimeUnit.MILLISECONDS);// SNAP: TODO: + .asyncGetBulk(unHashedKeys, tc, hashedKeys, evCacheTranscoder, null, validator) + .getAsyncSome(bulkReadTimeout.get() * 10000, TimeUnit.MILLISECONDS);// SNAP: TODO: } diff --git a/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java b/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java index 12d26e7e..9437df63 100644 --- a/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java +++ b/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java @@ -302,7 +302,8 @@ public EVCacheBulkGetFuture asyncGetBulk(Collection unHashedKeys, final EVCacheTranscoder evCacheTranscoder, EVCacheGetOperationListener listener, BiPredicate nodeValidator) { - final Map> m = new ConcurrentHashMap>(); + // Use Map> to accept both T (from tc) and Object (from evCacheTranscoder) + final Map> m = new ConcurrentHashMap<>(); // Break the gets down into groups by key final Map> chunks = new HashMap>(); @@ -313,14 +314,14 @@ public EVCacheBulkGetFuture asyncGetBulk(Collection unHashedKeys, EVCacheClientUtil.validateKey(key, opFact instanceof BinaryOperationFactory); final MemcachedNode primaryNode = locator.getPrimary(key); if (primaryNode.isActive() && nodeValidator.test(primaryNode, key)) { - chunks.computeIfAbsent(primaryNode, k -> new ArrayList<>()); + chunks.computeIfAbsent(primaryNode, k -> new ArrayList<>()).add(key); } } for (String key : hashedKeys) { EVCacheClientUtil.validateKey(key, opFact instanceof BinaryOperationFactory); final MemcachedNode primaryNode = locator.getPrimary(key); if (primaryNode.isActive() && nodeValidator.test(primaryNode, key)) { - chunks.computeIfAbsent(primaryNode, k -> new ArrayList<>()); + chunks.computeIfAbsent(primaryNode, k -> new ArrayList<>()).add(key); } } @@ -328,7 +329,11 @@ public EVCacheBulkGetFuture asyncGetBulk(Collection unHashedKeys, int initialLatchCount = chunks.isEmpty() ? 0 : 1; final CountDownLatch latch = new CountDownLatch(initialLatchCount); final Collection ops = new ArrayList(chunks.size()); - final EVCacheBulkGetFuture rv = new EVCacheBulkGetFuture(m, ops, latch, executorService, client); + // Cast the map to Map> for EVCacheBulkGetFuture + // The map contains Object values which will be post-processed by the caller + @SuppressWarnings("unchecked") + final Map> castedMap = (Map>) (Map) m; + final EVCacheBulkGetFuture rv = new EVCacheBulkGetFuture(castedMap, ops, latch, executorService, client); rv.setExpectedCount(chunks.size()); final DistributionSummary dataSizeDS = getDataSizeDistributionSummary( @@ -352,21 +357,24 @@ void bindOp(GetOperation op) { @Override public void receivedStatus(OperationStatus status) { - if (log.isDebugEnabled()) log.debug("GetBulk Keys : " + unHashedKeys.addAll(hashedKeys) + "; Status : " + status.getStatusCode().name() + "; Message : " + status.getMessage() + "; Elapsed Time - " + (System.currentTimeMillis() - rv.getStartTime())); + if (log.isDebugEnabled()) log.debug("GetBulk Keys (unhashed): " + unHashedKeys + ", (hashed): " + hashedKeys + "; Status : " + status.getStatusCode().name() + "; Message : " + status.getMessage() + "; Elapsed Time - " + (System.currentTimeMillis() - rv.getStartTime())); rv.setStatus(status); } @Override + @SuppressWarnings("unchecked") public void gotData(String k, int flags, byte[] data) { if (data != null) { dataSizeDS.record(data.length); } if (unHashedKeys.contains(k)) { - m.put(k, tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()))); + // Cast Future to Future for the map + m.put(k, (Future) (Future) tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize()))); } else if (hashedKeys.contains(k)) { + // evCacheTranscoder already returns Future, but cast for consistency m.put(k, tcService.decode(evCacheTranscoder, new CachedData(flags, data, evCacheTranscoder.getMaxSize()))); } else { - throw new IllegalStateException("// SNAP: TODO: key was in neither map"); + throw new IllegalStateException("Key was in neither unHashedKeys nor hashedKeys map"); } } From a5db4201cd833b4a7334e0b8ee9340440531a511 Mon Sep 17 00:00:00 2001 From: Sunjeet Date: Fri, 23 Jan 2026 16:12:07 -0800 Subject: [PATCH 6/7] cleanup --- .../java/com/netflix/evcache/EVCacheImpl.java | 7 ++--- .../netflix/evcache/pool/EVCacheClient.java | 2 +- .../spy/memcached/EVCacheMemcachedClient.java | 31 ++++++++++--------- 3 files changed, 19 insertions(+), 21 deletions(-) diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java index bd5252e0..fab5d57c 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java @@ -2021,12 +2021,9 @@ private Map myBuildMixedKeyValueResult(Map Map hashedKeyMap) { final Map retMap = new HashMap<>((int) (objMap.size() / 0.75) + 1); for (Map.Entry i : objMap.entrySet()) { - EVCacheKey evcKey = hashedKeyMap.get(i.getKey());// SNAP: TODO: make final + final EVCacheKey evcKey = hashedKeyMap.getOrDefault(i.getKey(), unHashedKeyMap.get(i.getKey())); if (evcKey == null) { - evcKey = unHashedKeyMap.get(i.getKey()); - } - if (evcKey == null) { - throw new IllegalStateException("Key not found anywhere"); // SNAP: TODO: handle this + throw new IllegalStateException("Key not found anywhere"); } final Object obj = i.getValue(); if (obj instanceof EVCacheValue) { diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java index b95a02a6..9909be5a 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java @@ -999,7 +999,7 @@ public CompletableFuture> getAsyncBulk(Collection unH if (tc == null) tc = (Transcoder) getTranscoder(); return evcacheMemcachedClient .asyncGetBulk(unHashedKeys, tc, hashedKeys, evCacheTranscoder, null, validator) - .getAsyncSome(bulkReadTimeout.get() * 10000, TimeUnit.MILLISECONDS);// SNAP: TODO: + .getAsyncSome(bulkReadTimeout.get() * 1000, TimeUnit.MILLISECONDS);// SNAP: TODO: } diff --git a/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java b/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java index 9437df63..e5379088 100644 --- a/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java +++ b/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java @@ -296,6 +296,19 @@ public EVCacheBulkGetFuture asyncGetBulk(Collection keys, return asyncGetBulk(keys, tc, new ArrayList<>(), null, listener, nodeValidator); } + private void populateChunks(Collection keys, + Map> chunks, + NodeLocator locator, + BiPredicate nodeValidator) { + for (String key : keys) { + EVCacheClientUtil.validateKey(key, opFact instanceof BinaryOperationFactory); + final MemcachedNode primaryNode = locator.getPrimary(key); + if (primaryNode.isActive() && nodeValidator.test(primaryNode, key)) { + chunks.computeIfAbsent(primaryNode, k -> new ArrayList<>()).add(key); + } + } + } + public EVCacheBulkGetFuture asyncGetBulk(Collection unHashedKeys, final Transcoder tc, Collection hashedKeys, @@ -309,21 +322,9 @@ public EVCacheBulkGetFuture asyncGetBulk(Collection unHashedKeys, final Map> chunks = new HashMap>(); final NodeLocator locator = mconn.getLocator(); - //Populate Node and key Map - for (String key : unHashedKeys) {// SNAP: TODO: is there a shorthand to iterating over both collections here? - EVCacheClientUtil.validateKey(key, opFact instanceof BinaryOperationFactory); - final MemcachedNode primaryNode = locator.getPrimary(key); - if (primaryNode.isActive() && nodeValidator.test(primaryNode, key)) { - chunks.computeIfAbsent(primaryNode, k -> new ArrayList<>()).add(key); - } - } - for (String key : hashedKeys) { - EVCacheClientUtil.validateKey(key, opFact instanceof BinaryOperationFactory); - final MemcachedNode primaryNode = locator.getPrimary(key); - if (primaryNode.isActive() && nodeValidator.test(primaryNode, key)) { - chunks.computeIfAbsent(primaryNode, k -> new ArrayList<>()).add(key); - } - } + //Populate Node and key Map - using separate loops for efficiency (avoids Stream overhead in hot path) + populateChunks(unHashedKeys, chunks, locator, nodeValidator); + populateChunks(hashedKeys, chunks, locator, nodeValidator); final AtomicInteger pendingChunks = new AtomicInteger(chunks.size()); int initialLatchCount = chunks.isEmpty() ? 0 : 1; From c69a0fab9603913d0a9ff185839e9dfa1c983d0e Mon Sep 17 00:00:00 2001 From: Sunjeet Date: Fri, 23 Jan 2026 16:48:41 -0800 Subject: [PATCH 7/7] more cleanup --- .../java/com/netflix/evcache/EVCacheImpl.java | 72 ++----------------- .../operation/EVCacheBulkGetFuture.java | 4 +- .../netflix/evcache/pool/EVCacheClient.java | 7 +- 3 files changed, 14 insertions(+), 69 deletions(-) diff --git a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java index fab5d57c..ffc612fb 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java +++ b/evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java @@ -1933,8 +1933,8 @@ private CompletableFuture> getAsyncBulkData(EVCacheClient .thenApply(data -> { // asyncGetBulk now returns Map for mixed hashed/non-hashed keys @SuppressWarnings("unchecked") - Map objectMap = (Map) (Map) data; - return myBuildMixedKeyValueResult(objectMap, tc, client, unHashedKeyMap, hashedKeyMap); + Map objectMap = (Map) data; + return buildMixedKeyValueResult(objectMap, tc, client, unHashedKeyMap, hashedKeyMap); }) .exceptionally(t -> handleBulkException(t, evcacheKeys)); } @@ -2014,11 +2014,11 @@ private Map buildHashedKeyValueResult(Map o return retMap; } - private Map myBuildMixedKeyValueResult(Map objMap, - Transcoder tc, - EVCacheClient client, - Map unHashedKeyMap, - Map hashedKeyMap) { + private Map buildMixedKeyValueResult(Map objMap, + Transcoder tc, + EVCacheClient client, + Map unHashedKeyMap, + Map hashedKeyMap) { final Map retMap = new HashMap<>((int) (objMap.size() / 0.75) + 1); for (Map.Entry i : objMap.entrySet()) { final EVCacheKey evcKey = hashedKeyMap.getOrDefault(i.getKey(), unHashedKeyMap.get(i.getKey())); @@ -2055,64 +2055,6 @@ private Map myBuildMixedKeyValueResult(Map return retMap; } - private Map buildMixedKeyValueResult(Map objMap, - Transcoder tc, - EVCacheClient client, - KeyMapDto keyMapDto) { - final Map hashedKeyMap = keyMapDto.getHashedKeyMap(); - final Map nonHashedKeyMap = keyMapDto.getNonHashedKeyMap(); - final Map retMap = new HashMap<>((int) (objMap.size() / 0.75) + 1); - - for (Map.Entry i : objMap.entrySet()) { - final String key = i.getKey(); - final Object obj = i.getValue(); - - if (obj instanceof EVCacheValue) { - // Hashed key - lookup in hashedKeyMap only - final EVCacheValue val = (EVCacheValue) obj; - final EVCacheKey evcKey = hashedKeyMap.get(key); - if (evcKey == null) { - if (log.isWarnEnabled() && shouldLog()) - log.warn("APP " + _appName + ", hashed key [" + key + "] not found in hashedKeyMap"); - continue;// SNAP: TODO: may still want to return that key somehow - } - - // Decode inner data - final CachedData cd = new CachedData(val.getFlags(), val.getValue(), CachedData.MAX_SIZE); - final T tVal; - if (tc == null) { - tVal = (T) client.getTranscoder().decode(cd); - } else { - tVal = tc.decode(cd); - } - - // Verify canonical key matches - if (evcKey.getCanonicalKey(client.isDuetClient()).equals(val.getKey())) { - if (log.isDebugEnabled() && shouldLog()) - log.debug("APP " + _appName + ", key [" + key + "] EVCacheKey " + evcKey); - retMap.put(evcKey, tVal); - } else { - if (log.isDebugEnabled() && shouldLog()) - log.debug("CACHE COLLISION : APP " + _appName + ", key [" + key + "] EVCacheKey " + evcKey); - incrementFailure(EVCacheMetricsFactory.KEY_HASH_COLLISION, Call.COMPLETABLE_FUTURE_GET_BULK.name(), EVCacheMetricsFactory.READ); - } - } else { - // Non-hashed key - lookup in nonHashedKeyMap only - final EVCacheKey evcKey = nonHashedKeyMap.get(key); - if (evcKey == null) { - if (log.isWarnEnabled() && shouldLog()) - log.warn("APP " + _appName + ", non-hashed key [" + key + "] not found in nonHashedKeyMap"); - continue; - } - - if (log.isDebugEnabled() && shouldLog()) - log.debug("APP " + _appName + ", key [" + key + "] EVCacheKey " + evcKey); - retMap.put(evcKey, (T) obj); - } - } - return retMap; - } - private Map getBulkData(EVCacheClient client, Collection evcacheKeys, Transcoder tc, boolean throwException, boolean hasZF) throws Exception { try { boolean hasHashedKey = false; diff --git a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java index 107f485d..67533680 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java +++ b/evcache-core/src/main/java/com/netflix/evcache/operation/EVCacheBulkGetFuture.java @@ -223,7 +223,7 @@ public void handleBulkException() { for (int i = 0; i < operationStates.length(); i++) { SingleOperationState state = operationStates.get(i); if (state == null) { - throw new RuntimeException("An operation in bulk get terminated without a state- either timed out, or cancelled, or some other error"); + throw new IllegalStateException("This is a temporary msg, Shih-hao has a PR that addresses this case"); //SNAP: TODO: remove } if (!state.completed) { if (state.cancelled) { @@ -253,7 +253,7 @@ public void doAsyncGetSome(CompletableFuture> promise) { } promise.complete(m); } catch (Exception e) { - log.error("SNAP: ", e); + log.error("SNAP: ", e); // SNAP: TODO: cleanup promise.completeExceptionally(e); } }); diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java index 9909be5a..5ca75826 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java @@ -990,16 +990,19 @@ public CompletableFuture> getAsyncBulk(Collection can if (tc == null) tc = (Transcoder) getTranscoder(); return evcacheMemcachedClient .asyncGetBulk(canonicalKeys, tc, null, validator) - .getAsyncSome(bulkReadTimeout.get() * 1000, TimeUnit.MILLISECONDS);// SNAP: TODO: + .getAsyncSome(bulkReadTimeout.get() * 10000, TimeUnit.MILLISECONDS);// SNAP: TODO: } + /** + * One bulk call for some hashed some unhashed keys (that require different transcoders) + */ public CompletableFuture> getAsyncBulk(Collection unHashedKeys, Transcoder tc, Collection hashedKeys, EVCacheTranscoder evCacheTranscoder) { final BiPredicate validator = (node, key) -> validateReadQueueSize(node, Call.COMPLETABLE_FUTURE_GET_BULK); if (tc == null) tc = (Transcoder) getTranscoder(); return evcacheMemcachedClient .asyncGetBulk(unHashedKeys, tc, hashedKeys, evCacheTranscoder, null, validator) - .getAsyncSome(bulkReadTimeout.get() * 1000, TimeUnit.MILLISECONDS);// SNAP: TODO: + .getAsyncSome(bulkReadTimeout.get() * 10000, TimeUnit.MILLISECONDS);// SNAP: TODO: }