diff --git a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java index dda32499..f16558b7 100644 --- a/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java +++ b/evcache-core/src/main/java/com/netflix/evcache/pool/EVCacheClient.java @@ -17,6 +17,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.*; +import java.util.function.BiPredicate; import java.util.zip.CRC32; import java.util.zip.Checksum; @@ -187,39 +188,36 @@ public Integer getMaxHashLength() { return this.maxHashLength.get(); } - private Collection validateReadQueueSize(Collection canonicalKeys, EVCache.Call call) { - if (evcacheMemcachedClient.getNodeLocator() == null) return canonicalKeys; - final Collection retKeys = new ArrayList<>(canonicalKeys.size()); - for (String key : canonicalKeys) { - final MemcachedNode node = evcacheMemcachedClient.getNodeLocator().getPrimary(key); - if (node instanceof EVCacheNode) { - final EVCacheNode evcNode = (EVCacheNode) node; - if (!evcNode.isAvailable(call)) { - continue; - } + private boolean validateReadQueueSize(MemcachedNode node, String key, EVCache.Call call) { + if (!(node instanceof EVCacheNode)) { + return true; + } - final int size = evcNode.getReadQueueSize(); - final boolean canAddToOpQueue = size < (maxReadQueueSize.get() * 2); - // if (log.isDebugEnabled()) log.debug("Bulk Current Read Queue - // Size - " + size + " for app " + appName + " & zone " + zone + - // " ; node " + node); - if (!canAddToOpQueue) { - final String hostName; - if(evcNode.getSocketAddress() instanceof InetSocketAddress) { - hostName = ((InetSocketAddress)evcNode.getSocketAddress()).getHostName(); - } else { - hostName = evcNode.getSocketAddress().toString(); - } + final EVCacheNode evcNode = (EVCacheNode) node; + if (!evcNode.isAvailable(call)) { + return false; + } - incrementFailure(EVCacheMetricsFactory.READ_QUEUE_FULL, call, hostName); - if (log.isDebugEnabled()) log.debug("Read Queue Full on Bulk Operation for app : " + appName - + "; zone : " + zone + "; Current Size : " + size + "; Max Size : " + maxReadQueueSize.get() * 2); - } else { - retKeys.add(key); - } + final int size = evcNode.getReadQueueSize(); + final boolean canAddToOpQueue = size < (maxReadQueueSize.get() * 2); + + if (!canAddToOpQueue) { + final String hostName; + if (evcNode.getSocketAddress() instanceof InetSocketAddress) { + hostName = ((InetSocketAddress) evcNode.getSocketAddress()).getHostName(); + } else { + hostName = evcNode.getSocketAddress().toString(); + } + + incrementFailure(EVCacheMetricsFactory.READ_QUEUE_FULL, call, hostName); + if (log.isDebugEnabled()) { + log.debug("Read Queue Full on Bulk Operation for app : " + appName + + "; zone : " + zone + "; Current Size : " + size + + "; Max Size : " + maxReadQueueSize.get() * 2); } } - return retKeys; + + return canAddToOpQueue; } private void incrementFailure(String metric, EVCache.Call call) { @@ -966,16 +964,16 @@ public Single getAndTouch(String key, Transcoder transcoder, int timeT } } - public Map getBulk(Collection _canonicalKeys, Transcoder tc, boolean _throwException, + public Map getBulk(Collection canonicalKeys, Transcoder tc, boolean _throwException, boolean hasZF) throws Exception { - final Collection canonicalKeys = validateReadQueueSize(_canonicalKeys, Call.BULK); final Map returnVal; try { if (tc == null) tc = (Transcoder) getTranscoder(); if (enableChunking.get()) { - returnVal = assembleChunks(_canonicalKeys, tc, hasZF); + returnVal = assembleChunks(canonicalKeys, tc, hasZF); } else { - returnVal = evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null) + final BiPredicate validator = (node, key) -> validateReadQueueSize(node, key, Call.BULK); + returnVal = evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null, validator) .getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF); } } catch (Exception e) { @@ -985,24 +983,24 @@ public Map getBulk(Collection _canonicalKeys, Transcoder< return returnVal; } - public CompletableFuture> getAsyncBulk(Collection _canonicalKeys, Transcoder tc) { - final Collection canonicalKeys = validateReadQueueSize(_canonicalKeys, Call.COMPLETABLE_FUTURE_GET_BULK); + public CompletableFuture> getAsyncBulk(Collection canonicalKeys, Transcoder tc) { + final BiPredicate validator = (node, key) -> validateReadQueueSize(node, key, Call.COMPLETABLE_FUTURE_GET_BULK); if (tc == null) tc = (Transcoder) getTranscoder(); return evcacheMemcachedClient - .asyncGetBulk(canonicalKeys, tc, null) + .asyncGetBulk(canonicalKeys, tc, null, validator) .getAsyncSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS); } - public Single> getBulk(Collection _canonicalKeys, final Transcoder transcoder, boolean _throwException, + public Single> getBulk(Collection canonicalKeys, final Transcoder transcoder, boolean _throwException, boolean hasZF, Scheduler scheduler) { try { - final Collection canonicalKeys = validateReadQueueSize(_canonicalKeys, Call.BULK); final Transcoder tc = (transcoder == null) ? (Transcoder) getTranscoder() : transcoder; if (enableChunking.get()) { - return assembleChunks(_canonicalKeys, tc, hasZF, scheduler); + return assembleChunks(canonicalKeys, tc, hasZF, scheduler); } else { - return evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null) + final BiPredicate validator = (node, key) -> validateReadQueueSize(node, key, Call.BULK); + return evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null, validator) .getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF, scheduler); } } catch (Throwable e) { diff --git a/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java b/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java index dd2462c9..402f8329 100644 --- a/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java +++ b/evcache-core/src/main/java/net/spy/memcached/EVCacheMemcachedClient.java @@ -18,6 +18,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiPredicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -192,6 +193,13 @@ public void complete() { public EVCacheBulkGetFuture asyncGetBulk(Collection keys, final Transcoder tc, EVCacheGetOperationListener listener) { + return asyncGetBulk(keys, tc, listener, (node, key) -> true); + } + + public EVCacheBulkGetFuture asyncGetBulk(Collection keys, + final Transcoder tc, + EVCacheGetOperationListener listener, + BiPredicate nodeValidator) { final Map> m = new ConcurrentHashMap>(); // Break the gets down into groups by key @@ -202,7 +210,7 @@ public EVCacheBulkGetFuture asyncGetBulk(Collection keys, for (String key : keys) { StringUtils.validateKey(key, opFact instanceof BinaryOperationFactory); final MemcachedNode primaryNode = locator.getPrimary(key); - if (primaryNode.isActive()) { + if (primaryNode.isActive() && nodeValidator.test(primaryNode, key)) { Collection ks = chunks.computeIfAbsent(primaryNode, k -> new ArrayList<>()); ks.add(key); }