Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 106 additions & 49 deletions evcache-core/src/main/java/com/netflix/evcache/EVCacheImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,11 @@

import static com.netflix.evcache.util.Sneaky.sneakyThrow;

import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;

import javax.management.MBeanServer;
import javax.management.ObjectName;

import com.netflix.evcache.dto.KeyMapDto;
import com.netflix.evcache.util.EVCacheBulkDataDto;
import com.netflix.evcache.util.KeyHasher;
import com.netflix.evcache.util.RetryCount;
import com.netflix.evcache.util.Sneaky;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.archaius.api.Property;
import com.netflix.archaius.api.PropertyRepository;
import com.netflix.evcache.EVCacheInMemoryCache.DataNotFoundException;
import com.netflix.evcache.EVCacheLatch.Policy;
import com.netflix.evcache.dto.KeyMapDto;
import com.netflix.evcache.event.EVCacheEvent;
import com.netflix.evcache.event.EVCacheEventListener;
import com.netflix.evcache.metrics.EVCacheMetricsFactory;
Expand All @@ -52,14 +22,40 @@
import com.netflix.evcache.pool.EVCacheClientUtil;
import com.netflix.evcache.pool.EVCacheValue;
import com.netflix.evcache.pool.ServerGroup;
import com.netflix.evcache.util.EVCacheBulkDataDto;
import com.netflix.evcache.util.KeyHasher;
import com.netflix.evcache.util.RetryCount;
import com.netflix.evcache.util.Sneaky;
import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.DistributionSummary;
import com.netflix.spectator.api.Tag;
import com.netflix.spectator.api.Timer;

import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import net.spy.memcached.CachedData;
import net.spy.memcached.transcoders.Transcoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Single;
Expand Down Expand Up @@ -1900,27 +1896,46 @@ private <T> CompletableFuture<Map<EVCacheKey, T>> getAsyncBulkData(EVCacheClient
List<EVCacheKey> evcacheKeys,
Transcoder<T> tc) {
KeyMapDto keyMapDto = buildKeyMap(client, evcacheKeys);
final Map<String, EVCacheKey> keyMap = keyMapDto.getKeyMap();
boolean hasHashedKey = keyMapDto.isKeyHashed();
if (hasHashedKey) {
Map<String, EVCacheKey> unHashedKeyMap = keyMapDto.getNonHashedKeyMap();
Map<String, EVCacheKey> hashedKeyMap = keyMapDto.getHashedKeyMap();

if (!unHashedKeyMap.isEmpty() && hashedKeyMap.isEmpty()) { // all keys are non-hashed
final Transcoder<T> tcCopy;
if (tc == null && _transcoder != null) {
tcCopy = (Transcoder<T>) _transcoder;
} else {
tcCopy = tc;
}
if (log.isDebugEnabled() && shouldLog()) {
log.debug("fetching bulk data with hashedKey {} ",evcacheKeys);
log.debug("fetching bulk data with un-hashed keys {}", unHashedKeyMap.keySet());
}
return client.getAsyncBulk(keyMap.keySet(), evcacheValueTranscoder)
.thenApply(data -> buildHashedKeyValueResult(data, tc, client, keyMap))
return client.getAsyncBulk(unHashedKeyMap.keySet(), tcCopy )
.thenApply(data -> buildNonHashedKeyValueResult(data, unHashedKeyMap))
.exceptionally(t -> handleBulkException(t, evcacheKeys));
} else {
} else if (!hashedKeyMap.isEmpty() && unHashedKeyMap.isEmpty()) { // all keys are hashed
if (log.isDebugEnabled() && shouldLog()) {
log.debug("fetching bulk data with hashed keys {} : {}", hashedKeyMap.keySet());
}
return client.getAsyncBulk(hashedKeyMap.keySet(), evcacheValueTranscoder)
.thenApply(data -> buildHashedKeyValueResult(data, tc, client, hashedKeyMap))
.exceptionally(t -> handleBulkException(t, evcacheKeys));
} else { // a mix of hashed and un-hashed keys
final Transcoder<T> tcCopy;
if (tc == null && _transcoder != null) {
tcCopy = (Transcoder<T>) _transcoder;
} else {
tcCopy = tc;
}
if (log.isDebugEnabled() && shouldLog()) {
log.debug("fetching bulk data with non hashedKey {} ",keyMap.keySet());
}
return client.getAsyncBulk(keyMap.keySet(), tcCopy )
.thenApply(data -> buildNonHashedKeyValueResult(data, keyMap))
log.debug("fetching bulk data with hashed keys: {} AND un-hashed keys {}", hashedKeyMap.keySet(), unHashedKeyMap.keySet());
}
return client.getAsyncBulk(unHashedKeyMap.keySet(), tcCopy, hashedKeyMap.keySet(), evcacheValueTranscoder)
.thenApply(data -> {
// asyncGetBulk now returns Map<String, Object> for mixed hashed/non-hashed keys
@SuppressWarnings("unchecked")
Map<String, Object> objectMap = (Map<String, Object>) data;
return buildMixedKeyValueResult(objectMap, tc, client, unHashedKeyMap, hashedKeyMap);
})
.exceptionally(t -> handleBulkException(t, evcacheKeys));
}
}
Expand All @@ -1932,20 +1947,21 @@ private <T> Map<EVCacheKey, T> handleBulkException(Throwable t, Collection<EVCac
}

private KeyMapDto buildKeyMap(EVCacheClient client, Collection<EVCacheKey> evcacheKeys) {
boolean hasHashedKey = false;
final Map<String, EVCacheKey> keyMap = new HashMap<String, EVCacheKey>(evcacheKeys.size() * 2);
final Map<String, EVCacheKey> hashedKeyMap = new HashMap<>();
final Map<String, EVCacheKey> nonHashedKeyMap = new HashMap<>();

for (EVCacheKey evcKey : evcacheKeys) {
String key = evcKey.getCanonicalKey(client.isDuetClient());
String hashKey = evcKey.getHashKey(client.isDuetClient(), client.getHashingAlgorithm(), client.shouldEncodeHashKey(), client.getMaxDigestBytes(), client.getMaxHashLength(), client.getBaseEncoder());
if (hashKey != null) {
if (log.isDebugEnabled() && shouldLog())
log.debug("APP " + _appName + ", key [" + key + "], has been hashed [" + hashKey + "]");
key = hashKey;
hasHashedKey = true;
hashedKeyMap.put(hashKey, evcKey);
} else {
nonHashedKeyMap.put(key, evcKey);
}
keyMap.put(key, evcKey);
}
return new KeyMapDto(keyMap, hasHashedKey);
return new KeyMapDto(hashedKeyMap, nonHashedKeyMap);
}

private <T> Map<EVCacheKey, T> buildNonHashedKeyValueResult(Map<String, T> objMap,
Expand Down Expand Up @@ -1998,6 +2014,47 @@ private <T> Map<EVCacheKey, T> buildHashedKeyValueResult(Map<String, Object> o
return retMap;
}

private <T> Map<EVCacheKey, T> buildMixedKeyValueResult(Map<String, Object> objMap,
Transcoder<T> tc,
EVCacheClient client,
Map<String, EVCacheKey> unHashedKeyMap,
Map<String, EVCacheKey> hashedKeyMap) {
final Map<EVCacheKey, T> retMap = new HashMap<>((int) (objMap.size() / 0.75) + 1);
for (Map.Entry<String, Object> i : objMap.entrySet()) {
final EVCacheKey evcKey = hashedKeyMap.getOrDefault(i.getKey(), unHashedKeyMap.get(i.getKey()));
if (evcKey == null) {
throw new IllegalStateException("Key not found anywhere");
}
final Object obj = i.getValue();
if (obj instanceof EVCacheValue) {
if (log.isDebugEnabled() && shouldLog())
log.debug("APP " + _appName + ", The value for key [" + i.getKey() + "] is EVCache Value");
final EVCacheValue val = (EVCacheValue) obj;
final CachedData cd = new CachedData(val.getFlags(), val.getValue(), CachedData.MAX_SIZE);
final T tVal;
if (tc == null) {
tVal = (T) client.getTranscoder().decode(cd);
} else {
tVal = tc.decode(cd);
}
if (evcKey.getCanonicalKey(client.isDuetClient()).equals(val.getKey())) {
if (log.isDebugEnabled() && shouldLog())
log.debug("APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey);
retMap.put(evcKey, tVal);
} else {
if (log.isDebugEnabled() && shouldLog())
log.debug("CACHE COLLISION : APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey);
incrementFailure(EVCacheMetricsFactory.KEY_HASH_COLLISION, Call.COMPLETABLE_FUTURE_GET_BULK.name(), EVCacheMetricsFactory.READ);
}
} else {
if (log.isDebugEnabled() && shouldLog())
log.debug("APP " + _appName + ", key [" + i.getKey() + "] EVCacheKey " + evcKey);
retMap.put(evcKey, (T) obj);
}
}
return retMap;
}

private <T> Map<EVCacheKey, T> getBulkData(EVCacheClient client, Collection<EVCacheKey> evcacheKeys, Transcoder<T> tc, boolean throwException, boolean hasZF) throws Exception {
try {
boolean hasHashedKey = false;
Expand Down
33 changes: 23 additions & 10 deletions evcache-core/src/main/java/com/netflix/evcache/dto/KeyMapDto.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,36 @@
package com.netflix.evcache.dto;

import com.netflix.evcache.EVCacheKey;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;

public class KeyMapDto {
Map<String, EVCacheKey> keyMap;
boolean isKeyHashed;
Map<String, EVCacheKey> hashedKeyMap;
Map<String, EVCacheKey> nonHashedKeyMap;

public KeyMapDto(@Nonnull Map<String, EVCacheKey> hashedKeyMap, @Nonnull Map<String, EVCacheKey> nonHashedKeyMap) {
this.hashedKeyMap = hashedKeyMap;
this.nonHashedKeyMap = nonHashedKeyMap;
}

public Map<String, EVCacheKey> getHashedKeyMap() {
return hashedKeyMap;
}

public KeyMapDto(Map<String, EVCacheKey> keyMap, boolean isKeyHashed) {
this.keyMap = keyMap;
this.isKeyHashed = isKeyHashed;
public Map<String, EVCacheKey> getNonHashedKeyMap() {
return nonHashedKeyMap;
}

public Map<String, EVCacheKey> getKeyMap() {
return keyMap;
public Set<String> getAllKeys() {
Set<String> allKeys = new HashSet<>(hashedKeyMap.size() + nonHashedKeyMap.size());
allKeys.addAll(hashedKeyMap.keySet());
allKeys.addAll(nonHashedKeyMap.keySet());
return allKeys;
}

public boolean isKeyHashed() {
return isKeyHashed;
public boolean isKeyHashed(String key) {
return hashedKeyMap.containsKey(key);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package com.netflix.evcache.operation;

import com.netflix.evcache.metrics.EVCacheMetricsFactory;
import com.netflix.evcache.pool.EVCacheClient;
import com.netflix.evcache.pool.ServerGroup;
import com.netflix.evcache.util.EVCacheConfig;
import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Tag;
import com.sun.management.GcInfo;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
Expand All @@ -8,33 +15,25 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceArray;

import com.netflix.evcache.EVCacheGetOperationListener;
import com.netflix.evcache.util.Pair;
import net.spy.memcached.internal.BulkGetCompletionListener;
import net.spy.memcached.internal.CheckedOperationTimeoutException;
import net.spy.memcached.ops.GetOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.evcache.metrics.EVCacheMetricsFactory;
import com.netflix.evcache.pool.EVCacheClient;
import com.netflix.evcache.pool.ServerGroup;
import com.netflix.evcache.util.EVCacheConfig;
import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Tag;
import com.sun.management.GcInfo;

import net.spy.memcached.MemcachedConnection;
import net.spy.memcached.internal.BulkGetFuture;
import net.spy.memcached.internal.CheckedOperationTimeoutException;
import net.spy.memcached.ops.GetOperation;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Scheduler;
import rx.Single;

Expand Down Expand Up @@ -211,6 +210,7 @@ public CompletableFuture<Map<String, T>> getAsyncSome(long timeout, TimeUnit uni
doAsyncGetSome(future);
return future.handle((data, ex) -> {
if (ex != null) {
log.error("SNAP: ", ex);
handleBulkException();
}
return data;
Expand All @@ -219,8 +219,12 @@ public CompletableFuture<Map<String, T>> getAsyncSome(long timeout, TimeUnit uni

public void handleBulkException() {
ExecutionException t = null;
Operation[] opsArray = ops.toArray(new Operation[0]);
for (int i = 0; i < operationStates.length(); i++) {
SingleOperationState state = operationStates.get(i);
if (state == null) {
throw new IllegalStateException("This is a temporary msg, Shih-hao has a PR that addresses this case"); //SNAP: TODO: remove
}
if (!state.completed) {
if (state.cancelled) {
throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled")));
Expand Down Expand Up @@ -248,8 +252,9 @@ public void doAsyncGetSome(CompletableFuture<Map<String, T>> promise) {
m.put(me.getKey(), (T)me.getValue());
}
promise.complete(m);
} catch (Exception t) {
promise.completeExceptionally(t);
} catch (Exception e) {
log.error("SNAP: ", e); // SNAP: TODO: cleanup
promise.completeExceptionally(e);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ private static<T> CompletableFuture<Void> getNext(CompletableFuture<T> future,
timeout,
unit,
timeoutSlots);
future.completeExceptionally(new TimeoutException("Timeout after " + timeout));
future.completeExceptionally(new TimeoutException("Timeout after " + timeout + "ms"));
},
splitTimeout,
unit);
Expand Down
Loading