diff --git a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java index 584e551f1cf6b..388020a5a17ce 100644 --- a/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java +++ b/x-pack/plugin/blob-cache/src/main/java/org/elasticsearch/blobcache/shared/SharedBlobCacheService.java @@ -311,9 +311,9 @@ private CacheEntry(T chunk) { private final int numRegions; private final ConcurrentLinkedQueue freeRegions = new ConcurrentLinkedQueue<>(); - private final Cache cache; + private final Cache> cache; - private final ConcurrentHashMap regionOwners; // to assert exclusive access of regions + private final ConcurrentHashMap> regionOwners; // to assert exclusive access of regions private final LongAdder writeCount = new LongAdder(); private final LongAdder writeBytes = new LongAdder(); @@ -471,7 +471,7 @@ public int getRegionSize() { return regionSize; } - CacheFileRegion get(KeyType cacheKey, long fileLength, int region) { + CacheFileRegion get(KeyType cacheKey, long fileLength, int region) { return cache.get(cacheKey, fileLength, region).chunk; } @@ -516,7 +516,7 @@ public boolean maybeFetchFullEntry( return true; } final ActionListener regionListener = refCountingListener.acquire(ignored -> {}); - final CacheFileRegion entry; + final CacheFileRegion entry; try { entry = get(cacheKey, length, region); } catch (AlreadyClosedException e) { @@ -583,7 +583,7 @@ public void maybeFetchRegion( listener.onResponse(false); return; } - final CacheFileRegion entry = get(cacheKey, blobLength, region); + final CacheFileRegion entry = get(cacheKey, blobLength, region); entry.populate(regionRange, writer, fetchExecutor, listener); } catch (Exception e) { listener.onFailure(e); @@ -631,7 +631,7 @@ public void maybeFetchRange( listener.onResponse(false); return; } - final CacheFileRegion entry = get(cacheKey, blobLength, region); + final CacheFileRegion entry = get(cacheKey, blobLength, region); entry.populate( regionRange, writerWithOffset(writer, Math.toIntExact(range.start() - getRegionStart(region))), @@ -705,7 +705,7 @@ public int forceEvict(Predicate cacheKeyPredicate) { } // used by tests - int getFreq(CacheFileRegion cacheFileRegion) { + int getFreq(CacheFileRegion cacheFileRegion) { if (cache instanceof LFUCache lfuCache) { return lfuCache.getFreq(cacheFileRegion); } @@ -787,16 +787,19 @@ protected boolean assertOffsetsWithinFileLength(long offset, long length, long f /** * While this class has incRef and tryIncRef methods, incRefEnsureOpen and tryIncrefEnsureOpen should * always be used, ensuring the right ordering between incRef/tryIncRef and ensureOpen - * (see {@link LFUCache#maybeEvictAndTakeForFrequency(Runnable, int)}) + * (see {@link SharedBlobCacheService.LFUCache#maybeEvictAndTakeForFrequency(Runnable, int)}) */ - class CacheFileRegion extends EvictableRefCounted { + static class CacheFileRegion extends EvictableRefCounted { + + final SharedBlobCacheService blobCacheService; final RegionKey regionKey; final SparseFileTracker tracker; // io can be null when not init'ed or after evict/take volatile SharedBytes.IO io = null; - CacheFileRegion(RegionKey regionKey, int regionSize) { + CacheFileRegion(SharedBlobCacheService blobCacheService, RegionKey regionKey, int regionSize) { + this.blobCacheService = blobCacheService; this.regionKey = regionKey; assert regionSize > 0; // NOTE we use a constant string for description to avoid consume extra heap space @@ -805,7 +808,7 @@ class CacheFileRegion extends EvictableRefCounted { public long physicalStartOffset() { var ioRef = io; - return ioRef == null ? -1L : (long) regionKey.region * regionSize; + return ioRef == null ? -1L : (long) regionKey.region * blobCacheService.regionSize; } public boolean tryIncRefEnsureOpen() { @@ -832,10 +835,10 @@ private void ensureOpenOrDecRef() { // tries to evict this chunk if noone is holding onto its resources anymore // visible for tests. boolean tryEvict() { - assert Thread.holdsLock(SharedBlobCacheService.this) : "must hold lock when evicting"; + assert Thread.holdsLock(blobCacheService) : "must hold lock when evicting"; if (refCount() <= 1 && evict()) { logger.trace("evicted {} with channel offset {}", regionKey, physicalStartOffset()); - evictCount.increment(); + blobCacheService.evictCount.increment(); decRef(); return true; } @@ -843,10 +846,10 @@ boolean tryEvict() { } boolean tryEvictNoDecRef() { - assert Thread.holdsLock(SharedBlobCacheService.this) : "must hold lock when evicting"; + assert Thread.holdsLock(blobCacheService) : "must hold lock when evicting"; if (refCount() <= 1 && evict()) { logger.trace("evicted and take {} with channel offset {}", regionKey, physicalStartOffset()); - evictCount.increment(); + blobCacheService.evictCount.increment(); return true; } @@ -854,10 +857,10 @@ boolean tryEvictNoDecRef() { } public boolean forceEvict() { - assert Thread.holdsLock(SharedBlobCacheService.this) : "must hold lock when evicting"; + assert Thread.holdsLock(blobCacheService) : "must hold lock when evicting"; if (evict()) { logger.trace("force evicted {} with channel offset {}", regionKey, physicalStartOffset()); - evictCount.increment(); + blobCacheService.evictCount.increment(); decRef(); return true; } @@ -869,8 +872,8 @@ protected void closeInternal() { // now actually free the region associated with this chunk // we held the "this" lock when this was evicted, hence if io is not filled in, chunk will never be registered. if (io != null) { - assert regionOwners.remove(io) == this; - freeRegions.add(io); + assert blobCacheService.regionOwners.remove(io) == this; + blobCacheService.freeRegions.add(io); } logger.trace("closed {} with channel offset {}", regionKey, physicalStartOffset()); } @@ -886,7 +889,7 @@ private static void throwAlreadyEvicted() { boolean tryRead(ByteBuffer buf, long offset) throws IOException { SharedBytes.IO ioRef = this.io; if (ioRef != null) { - int readBytes = ioRef.read(buf, getRegionRelativePosition(offset)); + int readBytes = ioRef.read(buf, blobCacheService.getRegionRelativePosition(offset)); if (isEvicted()) { buf.position(buf.position() - readBytes); return false; @@ -922,7 +925,7 @@ void populate( rangeToWrite, rangeToWrite, Assertions.ENABLED ? ActionListener.releaseAfter(ActionListener.running(() -> { - assert regionOwners.get(io) == this; + assert blobCacheService.regionOwners.get(io) == this; }), refs.acquire()) : refs.acquireListener() ); if (gaps.isEmpty()) { @@ -959,7 +962,7 @@ void populateAndRead( rangeToRead, ActionListener.releaseAfter(listener, refs.acquire()).delegateFailureAndWrap((l, success) -> { var ioRef = io; - assert regionOwners.get(ioRef) == this; + assert blobCacheService.regionOwners.get(ioRef) == this; final int start = Math.toIntExact(rangeToRead.start()); final int read = reader.onRangeAvailable(ioRef, start, start, Math.toIntExact(rangeToRead.length())); assert read == rangeToRead.length() @@ -970,7 +973,7 @@ void populateAndRead( + '-' + rangeToRead.start() + ']'; - readCount.increment(); + blobCacheService.readCount.increment(); l.onResponse(read); }) ); @@ -1017,7 +1020,7 @@ private Runnable fillGapRunnable( ) { return () -> ActionListener.run(listener, l -> { var ioRef = io; - assert regionOwners.get(ioRef) == CacheFileRegion.this; + assert blobCacheService.regionOwners.get(ioRef) == CacheFileRegion.this; assert CacheFileRegion.this.hasReferences() : CacheFileRegion.this; int start = Math.toIntExact(gap.start()); writer.fillCacheRange( @@ -1028,9 +1031,9 @@ private Runnable fillGapRunnable( Math.toIntExact(gap.end() - start), progress -> gap.onProgress(start + progress), l.map(unused -> { - assert regionOwners.get(ioRef) == CacheFileRegion.this; + assert blobCacheService.regionOwners.get(ioRef) == CacheFileRegion.this; assert CacheFileRegion.this.hasReferences() : CacheFileRegion.this; - writeCount.increment(); + blobCacheService.writeCount.increment(); gap.onCompletion(); return null; }).delegateResponse((delegate, e) -> failGapAndListener(gap, delegate, e)) @@ -1058,7 +1061,7 @@ public class CacheFile { private final KeyType cacheKey; private final long length; - private CacheEntry lastAccessedRegion; + private CacheEntry> lastAccessedRegion; private CacheFile(KeyType cacheKey, long length) { this.cacheKey = cacheKey; @@ -1161,7 +1164,7 @@ private int readSingleRegion( int region ) throws InterruptedException, ExecutionException { final PlainActionFuture readFuture = new PlainActionFuture<>(); - final CacheFileRegion fileRegion = get(cacheKey, length, region); + final CacheFileRegion fileRegion = get(cacheKey, length, region); final long regionStart = getRegionStart(region); fileRegion.populateAndRead( mapSubRangeToRegion(rangeToWrite, region), @@ -1193,7 +1196,7 @@ private int readMultiRegions( } ActionListener listener = listeners.acquire(i -> bytesRead.updateAndGet(j -> Math.addExact(i, j))); try { - final CacheFileRegion fileRegion = get(cacheKey, length, region); + final CacheFileRegion fileRegion = get(cacheKey, length, region); final long regionStart = getRegionStart(region); fileRegion.populateAndRead( mapSubRangeToRegion(rangeToWrite, region), @@ -1213,7 +1216,7 @@ private int readMultiRegions( return bytesRead.get(); } - private RangeMissingHandler writerWithOffset(RangeMissingHandler writer, CacheFileRegion fileRegion, int writeOffset) { + private RangeMissingHandler writerWithOffset(RangeMissingHandler writer, CacheFileRegion fileRegion, int writeOffset) { final RangeMissingHandler adjustedWriter; if (writeOffset == 0) { // no need to allocate a new capturing lambda if the offset isn't adjusted @@ -1274,7 +1277,7 @@ public void fillCacheRange( return adjustedWriter; } - private RangeAvailableHandler readerWithOffset(RangeAvailableHandler reader, CacheFileRegion fileRegion, int readOffset) { + private RangeAvailableHandler readerWithOffset(RangeAvailableHandler reader, CacheFileRegion fileRegion, int readOffset) { final RangeAvailableHandler adjustedReader = (channel, channelPos, relativePos, len) -> reader.onRangeAvailable( channel, channelPos, @@ -1293,7 +1296,7 @@ private RangeAvailableHandler readerWithOffset(RangeAvailableHandler reader, Cac return adjustedReader; } - private boolean assertValidRegionAndLength(CacheFileRegion fileRegion, int channelPos, int len) { + private boolean assertValidRegionAndLength(CacheFileRegion fileRegion, int channelPos, int len) { assert fileRegion.io != null; assert fileRegion.hasReferences(); assert regionOwners.get(fileRegion.io) == fileRegion; @@ -1421,15 +1424,15 @@ public record Stats( public static final Stats EMPTY = new Stats(0, 0L, 0L, 0L, 0L, 0L, 0L, 0L); } - private class LFUCache implements Cache { + private class LFUCache implements Cache> { - class LFUCacheEntry extends CacheEntry { + class LFUCacheEntry extends CacheEntry> { LFUCacheEntry prev; LFUCacheEntry next; int freq; volatile long lastAccessedEpoch; - LFUCacheEntry(CacheFileRegion chunk, long lastAccessed) { + LFUCacheEntry(CacheFileRegion chunk, long lastAccessed) { super(chunk); this.lastAccessedEpoch = lastAccessed; // todo: consider whether freq=1 is still right for new entries. @@ -1467,7 +1470,7 @@ public void close() { decayAndNewEpochTask.close(); } - int getFreq(CacheFileRegion cacheFileRegion) { + int getFreq(CacheFileRegion cacheFileRegion) { return keyMapping.get(cacheFileRegion.regionKey).freq; } @@ -1480,7 +1483,10 @@ public LFUCacheEntry get(KeyType cacheKey, long fileLength, int region) { var entry = keyMapping.get(regionKey); if (entry == null) { final int effectiveRegionSize = computeCacheFileRegionSize(fileLength, region); - entry = keyMapping.computeIfAbsent(regionKey, key -> new LFUCacheEntry(new CacheFileRegion(key, effectiveRegionSize), now)); + entry = keyMapping.computeIfAbsent( + regionKey, + key -> new LFUCacheEntry(new CacheFileRegion(SharedBlobCacheService.this, key, effectiveRegionSize), now) + ); } // io is volatile, double locking is fine, as long as we assign it last. if (entry.chunk.io == null) { diff --git a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java index 6c49b50c06e82..917b28d2ea566 100644 --- a/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java +++ b/x-pack/plugin/blob-cache/src/test/java/org/elasticsearch/blobcache/shared/SharedBlobCacheServiceTests.java @@ -149,7 +149,7 @@ public void testBasicEviction() throws IOException { } } - private static boolean tryEvict(SharedBlobCacheService.CacheFileRegion region1) { + private static boolean tryEvict(SharedBlobCacheService.CacheFileRegion region1) { if (randomBoolean()) { return region1.tryEvict(); } else { @@ -486,7 +486,7 @@ public void testGetMultiThreaded() throws IOException { ready.await(); for (int i = 0; i < iterations; ++i) { try { - SharedBlobCacheService.CacheFileRegion cacheFileRegion; + SharedBlobCacheService.CacheFileRegion cacheFileRegion; try { cacheFileRegion = cacheService.get(cacheKeys[i], fileLength, regions[i]); } catch (AlreadyClosedException e) { @@ -865,7 +865,7 @@ public void testMaybeEvictLeastUsed() throws Exception { final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue(); try ( NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings)); - var cacheService = new SharedBlobCacheService<>( + var cacheService = new SharedBlobCacheService( environment, settings, taskQueue.getThreadPool(), @@ -873,7 +873,7 @@ public void testMaybeEvictLeastUsed() throws Exception { BlobCacheMetrics.NOOP ) ) { - final Map.CacheFileRegion> cacheEntries = new HashMap<>(); + final Map> cacheEntries = new HashMap<>(); assertThat("All regions are free", cacheService.freeRegionCount(), equalTo(numRegions)); assertThat("Cache has no entries", cacheService.maybeEvictLeastUsed(), is(false));