Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: perform the getPrimary lookup only once during the bulk path #152

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.*;
import java.util.function.BiPredicate;
import java.util.zip.CRC32;
import java.util.zip.Checksum;

Expand Down Expand Up @@ -187,39 +188,36 @@ public Integer getMaxHashLength() {
return this.maxHashLength.get();
}

private Collection<String> validateReadQueueSize(Collection<String> canonicalKeys, EVCache.Call call) {
if (evcacheMemcachedClient.getNodeLocator() == null) return canonicalKeys;
final Collection<String> retKeys = new ArrayList<>(canonicalKeys.size());
for (String key : canonicalKeys) {
final MemcachedNode node = evcacheMemcachedClient.getNodeLocator().getPrimary(key);
if (node instanceof EVCacheNode) {
final EVCacheNode evcNode = (EVCacheNode) node;
if (!evcNode.isAvailable(call)) {
continue;
}
private boolean validateReadQueueSize(MemcachedNode node, String key, EVCache.Call call) {
if (!(node instanceof EVCacheNode)) {
return true;
}

final int size = evcNode.getReadQueueSize();
final boolean canAddToOpQueue = size < (maxReadQueueSize.get() * 2);
// if (log.isDebugEnabled()) log.debug("Bulk Current Read Queue
// Size - " + size + " for app " + appName + " & zone " + zone +
// " ; node " + node);
if (!canAddToOpQueue) {
final String hostName;
if(evcNode.getSocketAddress() instanceof InetSocketAddress) {
hostName = ((InetSocketAddress)evcNode.getSocketAddress()).getHostName();
} else {
hostName = evcNode.getSocketAddress().toString();
}
final EVCacheNode evcNode = (EVCacheNode) node;
if (!evcNode.isAvailable(call)) {
return false;
}

incrementFailure(EVCacheMetricsFactory.READ_QUEUE_FULL, call, hostName);
if (log.isDebugEnabled()) log.debug("Read Queue Full on Bulk Operation for app : " + appName
+ "; zone : " + zone + "; Current Size : " + size + "; Max Size : " + maxReadQueueSize.get() * 2);
} else {
retKeys.add(key);
}
final int size = evcNode.getReadQueueSize();
final boolean canAddToOpQueue = size < (maxReadQueueSize.get() * 2);

if (!canAddToOpQueue) {
final String hostName;
if (evcNode.getSocketAddress() instanceof InetSocketAddress) {
hostName = ((InetSocketAddress) evcNode.getSocketAddress()).getHostName();
} else {
hostName = evcNode.getSocketAddress().toString();
}

incrementFailure(EVCacheMetricsFactory.READ_QUEUE_FULL, call, hostName);
if (log.isDebugEnabled()) {
log.debug("Read Queue Full on Bulk Operation for app : " + appName
+ "; zone : " + zone + "; Current Size : " + size
+ "; Max Size : " + maxReadQueueSize.get() * 2);
}
}
return retKeys;

return canAddToOpQueue;
}

private void incrementFailure(String metric, EVCache.Call call) {
Expand Down Expand Up @@ -966,16 +964,16 @@ public <T> Single<T> getAndTouch(String key, Transcoder<T> transcoder, int timeT
}
}

public <T> Map<String, T> getBulk(Collection<String> _canonicalKeys, Transcoder<T> tc, boolean _throwException,
public <T> Map<String, T> getBulk(Collection<String> canonicalKeys, Transcoder<T> tc, boolean _throwException,
boolean hasZF) throws Exception {
final Collection<String> canonicalKeys = validateReadQueueSize(_canonicalKeys, Call.BULK);
final Map<String, T> returnVal;
try {
if (tc == null) tc = (Transcoder<T>) getTranscoder();
if (enableChunking.get()) {
returnVal = assembleChunks(_canonicalKeys, tc, hasZF);
returnVal = assembleChunks(canonicalKeys, tc, hasZF);
} else {
returnVal = evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null)
final BiPredicate<MemcachedNode, String> validator = (node, key) -> validateReadQueueSize(node, key, Call.BULK);
returnVal = evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null, validator)
.getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF);
}
} catch (Exception e) {
Expand All @@ -985,24 +983,24 @@ public <T> Map<String, T> getBulk(Collection<String> _canonicalKeys, Transcoder<
return returnVal;
}

public <T> CompletableFuture<Map<String, T>> getAsyncBulk(Collection<String> _canonicalKeys, Transcoder<T> tc) {
final Collection<String> canonicalKeys = validateReadQueueSize(_canonicalKeys, Call.COMPLETABLE_FUTURE_GET_BULK);
public <T> CompletableFuture<Map<String, T>> getAsyncBulk(Collection<String> canonicalKeys, Transcoder<T> tc) {
final BiPredicate<MemcachedNode, String> validator = (node, key) -> validateReadQueueSize(node, key, Call.COMPLETABLE_FUTURE_GET_BULK);
if (tc == null) tc = (Transcoder<T>) getTranscoder();
return evcacheMemcachedClient
.asyncGetBulk(canonicalKeys, tc, null)
.asyncGetBulk(canonicalKeys, tc, null, validator)
.getAsyncSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS);

}

public <T> Single<Map<String, T>> getBulk(Collection<String> _canonicalKeys, final Transcoder<T> transcoder, boolean _throwException,
public <T> Single<Map<String, T>> getBulk(Collection<String> canonicalKeys, final Transcoder<T> transcoder, boolean _throwException,
boolean hasZF, Scheduler scheduler) {
try {
final Collection<String> canonicalKeys = validateReadQueueSize(_canonicalKeys, Call.BULK);
final Transcoder<T> tc = (transcoder == null) ? (Transcoder<T>) getTranscoder() : transcoder;
if (enableChunking.get()) {
return assembleChunks(_canonicalKeys, tc, hasZF, scheduler);
return assembleChunks(canonicalKeys, tc, hasZF, scheduler);
} else {
return evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null)
final BiPredicate<MemcachedNode, String> validator = (node, key) -> validateReadQueueSize(node, key, Call.BULK);
return evcacheMemcachedClient.asyncGetBulk(canonicalKeys, tc, null, validator)
.getSome(bulkReadTimeout.get(), TimeUnit.MILLISECONDS, _throwException, hasZF, scheduler);
}
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiPredicate;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -192,6 +193,13 @@ public void complete() {
public <T> EVCacheBulkGetFuture<T> asyncGetBulk(Collection<String> keys,
final Transcoder<T> tc,
EVCacheGetOperationListener<T> listener) {
return asyncGetBulk(keys, tc, listener, (node, key) -> true);
}

public <T> EVCacheBulkGetFuture<T> asyncGetBulk(Collection<String> keys,
final Transcoder<T> tc,
EVCacheGetOperationListener<T> listener,
BiPredicate<MemcachedNode, String> nodeValidator) {
final Map<String, Future<T>> m = new ConcurrentHashMap<String, Future<T>>();

// Break the gets down into groups by key
Expand All @@ -202,7 +210,7 @@ public <T> EVCacheBulkGetFuture<T> asyncGetBulk(Collection<String> keys,
for (String key : keys) {
StringUtils.validateKey(key, opFact instanceof BinaryOperationFactory);
final MemcachedNode primaryNode = locator.getPrimary(key);
if (primaryNode.isActive()) {
if (primaryNode.isActive() && nodeValidator.test(primaryNode, key)) {
Collection<String> ks = chunks.computeIfAbsent(primaryNode, k -> new ArrayList<>());
ks.add(key);
}
Expand Down
Loading