Skip to content

Commit

Permalink
Reduce the lock hold time during Map.clear (fixes #835)
Browse files Browse the repository at this point in the history
When invalidating the cache, it is faster and simpler to discard under
the eviction lock. This avoids thrashing on the write buffer and causing
excessive maintenence runs, which are triggered per write. However, for
a large cache this hold time may be excessive by causing other writes to
pile up so that the write buffer is full and backpressure causes delays.

In that case removing through the write buffer allows fairer throughput
as all writes compete to append into the buffer and async draining will
captures a batch of work. This avoids delaying any other write, or at
least makes the backpressure times short, so that writes have better
latencies by disfavoring the clear throughput.

The cache adapts strategies by monitoring the size of the write buffer.
If it grows over a threshold then the clear operation backs off by
releasing the eviction lock and performing a one-by-one removal instead.
  • Loading branch information
ben-manes committed Dec 13, 2022
1 parent 2fbc869 commit 68fbff8
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1989,34 +1989,42 @@ public long estimatedSize() {
}

@Override
@SuppressWarnings("FutureReturnValueIgnored")
public void clear() {
evictionLock.lock();
try {
long now = expirationTicker().read();
// Discard all pending reads
readBuffer.drainTo(e -> {});

// Apply all pending writes
Runnable task;
while ((task = writeBuffer.poll()) != null) {
task.run();
}

// Discard all entries
for (var entry : data.entrySet()) {
removeNode(entry.getValue(), now);
}

// Cancel the scheduled cleanup
Pacer pacer = pacer();
if (pacer != null) {
pacer.cancel();
}

// Discard all pending reads
readBuffer.drainTo(e -> {});
// Discard all entries
int threshold = (WRITE_BUFFER_MAX / 2);
long now = expirationTicker().read();
for (var node : data.values()) {
if (writeBuffer.size() >= threshold) {
// Fallback to one-by-one to avoid excessive lock hold times
break;
}
removeNode(node, now);
}
} finally {
evictionLock.unlock();
}

// Remove any stragglers, such as if released early to more aggressively flush incoming writes
for (Object key : keySet()) {
remove(key);
}
}

@GuardedBy("evictionLock")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,25 @@
@Test(dataProviderClass = CacheProvider.class)
public final class BoundedLocalCacheTest {

@Test(dataProvider = "caches")
@CacheSpec(population = Population.FULL, removalListener = Listener.MOCKITO)
public void clear_pendingWrites(BoundedLocalCache<Int, Int> cache, CacheContext context) {
var insert = new boolean[] { true };
Mockito.doAnswer(invocation -> {
if (insert[0]) {
while (cache.writeBuffer.offer(() -> {})) {
// ignored
}
insert[0] = false;
}
return null;
}).when(context.removalListener()).onRemoval(any(), any(), any());

cache.clear();
assertThat(cache).isExhaustivelyEmpty();
assertThat(cache.writeBuffer).isEmpty();
}

/* --------------- Maintenance --------------- */

@Test
Expand Down
3 changes: 2 additions & 1 deletion gradle/codeQuality.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ tasks.withType(JavaCompile).configureEach {
'UnusedTypeParameter', 'UsingJsr305CheckReturnValue']
enabledChecks.each { enable(it) }

def disabledChecks = [ 'LexicographicalAnnotationListing', 'MissingSummary', 'StaticImport' ]
def disabledChecks = [ 'LexicographicalAnnotationListing',
'MissingSummary', 'IsInstanceLambdaUsage', 'StaticImport' ]
disabledChecks.each { disable(it) }

def errorChecks = [ 'NullAway' ]
Expand Down
20 changes: 10 additions & 10 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ ext {
config: '1.4.2',
ehcache3: '3.10.8',
errorprone: '2.16',
errorproneSupport: '0.5.0',
errorproneSupport: '0.6.0',
expiringMap: '0.5.10',
fastfilter: '1.0.2',
fastutil: '8.5.9',
fastutil: '8.5.11',
flipTables: '1.1.0',
googleJavaFormat: '1.15.0',
guava: '31.1-jre',
Expand All @@ -58,7 +58,7 @@ ext {
ohc: '0.6.1',
osgiComponentAnnotations: '1.5.0',
picocli: '4.7.0',
slf4j: '2.0.5',
slf4j: '2.0.6',
tcache: '2.0.1',
stream: '2.9.8',
univocityParsers: '2.9.1',
Expand All @@ -70,6 +70,8 @@ ext {
awaitility: '4.2.0',
commonsCollectionsTests: '4.4',
eclipseCollections: '11.1.0',
felix: '7.0.5',
felixScr: '2.2.4',
guice: '5.1.0',
hamcrest: '2.2',
jcacheTck: '1.1.1',
Expand All @@ -79,21 +81,19 @@ ext {
junitTestNG: '1.0.4',
lincheck: '2.16',
mockito: '4.9.0',
paxExam: '4.13.5',
slf4jTest: '2.6.1',
testng: '7.6.1',
truth: '1.1.3',
felix: '7.0.5',
felixScr: '2.2.4',
osgiSvcComponent: '1.5.0',
osgiUtilFunction: '1.2.0',
osgiUtilPromise: '1.2.0',
paxExam: '4.13.5',
slf4jTest: '2.6.1',
testng: '7.7.0',
truth: '1.1.3',
]
pluginVersions = [
bnd: '6.4.0',
checkstyle: '10.5.0',
coveralls: '2.12.0',
dependencyCheck: '7.3.2',
dependencyCheck: '7.4.1',
errorprone: '3.0.1',
findsecbugs: '1.12.0',
forbiddenApis: '3.4',
Expand Down

0 comments on commit 68fbff8

Please sign in to comment.