Skip to content

Commit

Permalink
Make CacheFileRegion static
Browse files Browse the repository at this point in the history
  • Loading branch information
henningandersen committed Aug 24, 2024
1 parent 1502352 commit cdfd19d
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -311,9 +311,9 @@ private CacheEntry(T chunk) {
private final int numRegions;
private final ConcurrentLinkedQueue<SharedBytes.IO> freeRegions = new ConcurrentLinkedQueue<>();

private final Cache<KeyType, CacheFileRegion> cache;
private final Cache<KeyType, CacheFileRegion<KeyType>> cache;

private final ConcurrentHashMap<SharedBytes.IO, CacheFileRegion> regionOwners; // to assert exclusive access of regions
private final ConcurrentHashMap<SharedBytes.IO, CacheFileRegion<KeyType>> regionOwners; // to assert exclusive access of regions

private final LongAdder writeCount = new LongAdder();
private final LongAdder writeBytes = new LongAdder();
Expand Down Expand Up @@ -471,7 +471,7 @@ public int getRegionSize() {
return regionSize;
}

CacheFileRegion get(KeyType cacheKey, long fileLength, int region) {
CacheFileRegion<KeyType> get(KeyType cacheKey, long fileLength, int region) {
return cache.get(cacheKey, fileLength, region).chunk;
}

Expand Down Expand Up @@ -516,7 +516,7 @@ public boolean maybeFetchFullEntry(
return true;
}
final ActionListener<Integer> regionListener = refCountingListener.acquire(ignored -> {});
final CacheFileRegion entry;
final CacheFileRegion<KeyType> entry;
try {
entry = get(cacheKey, length, region);
} catch (AlreadyClosedException e) {
Expand Down Expand Up @@ -583,7 +583,7 @@ public void maybeFetchRegion(
listener.onResponse(false);
return;
}
final CacheFileRegion entry = get(cacheKey, blobLength, region);
final CacheFileRegion<KeyType> entry = get(cacheKey, blobLength, region);
entry.populate(regionRange, writer, fetchExecutor, listener);
} catch (Exception e) {
listener.onFailure(e);
Expand Down Expand Up @@ -631,7 +631,7 @@ public void maybeFetchRange(
listener.onResponse(false);
return;
}
final CacheFileRegion entry = get(cacheKey, blobLength, region);
final CacheFileRegion<KeyType> entry = get(cacheKey, blobLength, region);
entry.populate(
regionRange,
writerWithOffset(writer, Math.toIntExact(range.start() - getRegionStart(region))),
Expand Down Expand Up @@ -705,7 +705,7 @@ public int forceEvict(Predicate<KeyType> cacheKeyPredicate) {
}

// used by tests
int getFreq(CacheFileRegion cacheFileRegion) {
int getFreq(CacheFileRegion<KeyType> cacheFileRegion) {
if (cache instanceof LFUCache lfuCache) {
return lfuCache.getFreq(cacheFileRegion);
}
Expand Down Expand Up @@ -787,16 +787,19 @@ protected boolean assertOffsetsWithinFileLength(long offset, long length, long f
/**
* While this class has incRef and tryIncRef methods, incRefEnsureOpen and tryIncrefEnsureOpen should
* always be used, ensuring the right ordering between incRef/tryIncRef and ensureOpen
* (see {@link LFUCache#maybeEvictAndTakeForFrequency(Runnable, int)})
* (see {@link SharedBlobCacheService.LFUCache#maybeEvictAndTakeForFrequency(Runnable, int)})
*/
class CacheFileRegion extends EvictableRefCounted {
static class CacheFileRegion<KeyType> extends EvictableRefCounted {

final SharedBlobCacheService<KeyType> blobCacheService;

final RegionKey<KeyType> regionKey;
final SparseFileTracker tracker;
// io can be null when not init'ed or after evict/take
volatile SharedBytes.IO io = null;

CacheFileRegion(RegionKey<KeyType> regionKey, int regionSize) {
CacheFileRegion(SharedBlobCacheService<KeyType> blobCacheService, RegionKey<KeyType> regionKey, int regionSize) {
this.blobCacheService = blobCacheService;
this.regionKey = regionKey;
assert regionSize > 0;
// NOTE we use a constant string for description to avoid consume extra heap space
Expand All @@ -805,7 +808,7 @@ class CacheFileRegion extends EvictableRefCounted {

public long physicalStartOffset() {
var ioRef = io;
return ioRef == null ? -1L : (long) regionKey.region * regionSize;
return ioRef == null ? -1L : (long) regionKey.region * blobCacheService.regionSize;
}

public boolean tryIncRefEnsureOpen() {
Expand All @@ -832,32 +835,32 @@ private void ensureOpenOrDecRef() {
// tries to evict this chunk if noone is holding onto its resources anymore
// visible for tests.
boolean tryEvict() {
assert Thread.holdsLock(SharedBlobCacheService.this) : "must hold lock when evicting";
assert Thread.holdsLock(blobCacheService) : "must hold lock when evicting";
if (refCount() <= 1 && evict()) {
logger.trace("evicted {} with channel offset {}", regionKey, physicalStartOffset());
evictCount.increment();
blobCacheService.evictCount.increment();
decRef();
return true;
}
return false;
}

boolean tryEvictNoDecRef() {
assert Thread.holdsLock(SharedBlobCacheService.this) : "must hold lock when evicting";
assert Thread.holdsLock(blobCacheService) : "must hold lock when evicting";
if (refCount() <= 1 && evict()) {
logger.trace("evicted and take {} with channel offset {}", regionKey, physicalStartOffset());
evictCount.increment();
blobCacheService.evictCount.increment();
return true;
}

return false;
}

public boolean forceEvict() {
assert Thread.holdsLock(SharedBlobCacheService.this) : "must hold lock when evicting";
assert Thread.holdsLock(blobCacheService) : "must hold lock when evicting";
if (evict()) {
logger.trace("force evicted {} with channel offset {}", regionKey, physicalStartOffset());
evictCount.increment();
blobCacheService.evictCount.increment();
decRef();
return true;
}
Expand All @@ -869,8 +872,8 @@ protected void closeInternal() {
// now actually free the region associated with this chunk
// we held the "this" lock when this was evicted, hence if io is not filled in, chunk will never be registered.
if (io != null) {
assert regionOwners.remove(io) == this;
freeRegions.add(io);
assert blobCacheService.regionOwners.remove(io) == this;
blobCacheService.freeRegions.add(io);
}
logger.trace("closed {} with channel offset {}", regionKey, physicalStartOffset());
}
Expand All @@ -886,7 +889,7 @@ private static void throwAlreadyEvicted() {
boolean tryRead(ByteBuffer buf, long offset) throws IOException {
SharedBytes.IO ioRef = this.io;
if (ioRef != null) {
int readBytes = ioRef.read(buf, getRegionRelativePosition(offset));
int readBytes = ioRef.read(buf, blobCacheService.getRegionRelativePosition(offset));
if (isEvicted()) {
buf.position(buf.position() - readBytes);
return false;
Expand Down Expand Up @@ -922,7 +925,7 @@ void populate(
rangeToWrite,
rangeToWrite,
Assertions.ENABLED ? ActionListener.releaseAfter(ActionListener.running(() -> {
assert regionOwners.get(io) == this;
assert blobCacheService.regionOwners.get(io) == this;
}), refs.acquire()) : refs.acquireListener()
);
if (gaps.isEmpty()) {
Expand Down Expand Up @@ -959,7 +962,7 @@ void populateAndRead(
rangeToRead,
ActionListener.releaseAfter(listener, refs.acquire()).delegateFailureAndWrap((l, success) -> {
var ioRef = io;
assert regionOwners.get(ioRef) == this;
assert blobCacheService.regionOwners.get(ioRef) == this;
final int start = Math.toIntExact(rangeToRead.start());
final int read = reader.onRangeAvailable(ioRef, start, start, Math.toIntExact(rangeToRead.length()));
assert read == rangeToRead.length()
Expand All @@ -970,7 +973,7 @@ void populateAndRead(
+ '-'
+ rangeToRead.start()
+ ']';
readCount.increment();
blobCacheService.readCount.increment();
l.onResponse(read);
})
);
Expand Down Expand Up @@ -1017,7 +1020,7 @@ private Runnable fillGapRunnable(
) {
return () -> ActionListener.run(listener, l -> {
var ioRef = io;
assert regionOwners.get(ioRef) == CacheFileRegion.this;
assert blobCacheService.regionOwners.get(ioRef) == CacheFileRegion.this;
assert CacheFileRegion.this.hasReferences() : CacheFileRegion.this;
int start = Math.toIntExact(gap.start());
writer.fillCacheRange(
Expand All @@ -1028,9 +1031,9 @@ private Runnable fillGapRunnable(
Math.toIntExact(gap.end() - start),
progress -> gap.onProgress(start + progress),
l.<Void>map(unused -> {
assert regionOwners.get(ioRef) == CacheFileRegion.this;
assert blobCacheService.regionOwners.get(ioRef) == CacheFileRegion.this;
assert CacheFileRegion.this.hasReferences() : CacheFileRegion.this;
writeCount.increment();
blobCacheService.writeCount.increment();
gap.onCompletion();
return null;
}).delegateResponse((delegate, e) -> failGapAndListener(gap, delegate, e))
Expand Down Expand Up @@ -1058,7 +1061,7 @@ public class CacheFile {
private final KeyType cacheKey;
private final long length;

private CacheEntry<CacheFileRegion> lastAccessedRegion;
private CacheEntry<CacheFileRegion<KeyType>> lastAccessedRegion;

private CacheFile(KeyType cacheKey, long length) {
this.cacheKey = cacheKey;
Expand Down Expand Up @@ -1161,7 +1164,7 @@ private int readSingleRegion(
int region
) throws InterruptedException, ExecutionException {
final PlainActionFuture<Integer> readFuture = new PlainActionFuture<>();
final CacheFileRegion fileRegion = get(cacheKey, length, region);
final CacheFileRegion<KeyType> fileRegion = get(cacheKey, length, region);
final long regionStart = getRegionStart(region);
fileRegion.populateAndRead(
mapSubRangeToRegion(rangeToWrite, region),
Expand Down Expand Up @@ -1193,7 +1196,7 @@ private int readMultiRegions(
}
ActionListener<Integer> listener = listeners.acquire(i -> bytesRead.updateAndGet(j -> Math.addExact(i, j)));
try {
final CacheFileRegion fileRegion = get(cacheKey, length, region);
final CacheFileRegion<KeyType> fileRegion = get(cacheKey, length, region);
final long regionStart = getRegionStart(region);
fileRegion.populateAndRead(
mapSubRangeToRegion(rangeToWrite, region),
Expand All @@ -1213,7 +1216,7 @@ private int readMultiRegions(
return bytesRead.get();
}

private RangeMissingHandler writerWithOffset(RangeMissingHandler writer, CacheFileRegion fileRegion, int writeOffset) {
private RangeMissingHandler writerWithOffset(RangeMissingHandler writer, CacheFileRegion<KeyType> fileRegion, int writeOffset) {
final RangeMissingHandler adjustedWriter;
if (writeOffset == 0) {
// no need to allocate a new capturing lambda if the offset isn't adjusted
Expand Down Expand Up @@ -1274,7 +1277,7 @@ public void fillCacheRange(
return adjustedWriter;
}

private RangeAvailableHandler readerWithOffset(RangeAvailableHandler reader, CacheFileRegion fileRegion, int readOffset) {
private RangeAvailableHandler readerWithOffset(RangeAvailableHandler reader, CacheFileRegion<KeyType> fileRegion, int readOffset) {
final RangeAvailableHandler adjustedReader = (channel, channelPos, relativePos, len) -> reader.onRangeAvailable(
channel,
channelPos,
Expand All @@ -1293,7 +1296,7 @@ private RangeAvailableHandler readerWithOffset(RangeAvailableHandler reader, Cac
return adjustedReader;
}

private boolean assertValidRegionAndLength(CacheFileRegion fileRegion, int channelPos, int len) {
private boolean assertValidRegionAndLength(CacheFileRegion<KeyType> fileRegion, int channelPos, int len) {
assert fileRegion.io != null;
assert fileRegion.hasReferences();
assert regionOwners.get(fileRegion.io) == fileRegion;
Expand Down Expand Up @@ -1421,15 +1424,15 @@ public record Stats(
public static final Stats EMPTY = new Stats(0, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
}

private class LFUCache implements Cache<KeyType, CacheFileRegion> {
private class LFUCache implements Cache<KeyType, CacheFileRegion<KeyType>> {

class LFUCacheEntry extends CacheEntry<CacheFileRegion> {
class LFUCacheEntry extends CacheEntry<CacheFileRegion<KeyType>> {
LFUCacheEntry prev;
LFUCacheEntry next;
int freq;
volatile long lastAccessedEpoch;

LFUCacheEntry(CacheFileRegion chunk, long lastAccessed) {
LFUCacheEntry(CacheFileRegion<KeyType> chunk, long lastAccessed) {
super(chunk);
this.lastAccessedEpoch = lastAccessed;
// todo: consider whether freq=1 is still right for new entries.
Expand Down Expand Up @@ -1467,7 +1470,7 @@ public void close() {
decayAndNewEpochTask.close();
}

int getFreq(CacheFileRegion cacheFileRegion) {
int getFreq(CacheFileRegion<KeyType> cacheFileRegion) {
return keyMapping.get(cacheFileRegion.regionKey).freq;
}

Expand All @@ -1480,7 +1483,10 @@ public LFUCacheEntry get(KeyType cacheKey, long fileLength, int region) {
var entry = keyMapping.get(regionKey);
if (entry == null) {
final int effectiveRegionSize = computeCacheFileRegionSize(fileLength, region);
entry = keyMapping.computeIfAbsent(regionKey, key -> new LFUCacheEntry(new CacheFileRegion(key, effectiveRegionSize), now));
entry = keyMapping.computeIfAbsent(
regionKey,
key -> new LFUCacheEntry(new CacheFileRegion<KeyType>(SharedBlobCacheService.this, key, effectiveRegionSize), now)
);
}
// io is volatile, double locking is fine, as long as we assign it last.
if (entry.chunk.io == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void testBasicEviction() throws IOException {
}
}

private static boolean tryEvict(SharedBlobCacheService<Object>.CacheFileRegion region1) {
private static boolean tryEvict(SharedBlobCacheService.CacheFileRegion<Object> region1) {
if (randomBoolean()) {
return region1.tryEvict();
} else {
Expand Down Expand Up @@ -486,7 +486,7 @@ public void testGetMultiThreaded() throws IOException {
ready.await();
for (int i = 0; i < iterations; ++i) {
try {
SharedBlobCacheService<String>.CacheFileRegion cacheFileRegion;
SharedBlobCacheService.CacheFileRegion<String> cacheFileRegion;
try {
cacheFileRegion = cacheService.get(cacheKeys[i], fileLength, regions[i]);
} catch (AlreadyClosedException e) {
Expand Down Expand Up @@ -865,15 +865,15 @@ public void testMaybeEvictLeastUsed() throws Exception {
final DeterministicTaskQueue taskQueue = new DeterministicTaskQueue();
try (
NodeEnvironment environment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings));
var cacheService = new SharedBlobCacheService<>(
var cacheService = new SharedBlobCacheService<Object>(
environment,
settings,
taskQueue.getThreadPool(),
ThreadPool.Names.GENERIC,
BlobCacheMetrics.NOOP
)
) {
final Map<Object, SharedBlobCacheService<Object>.CacheFileRegion> cacheEntries = new HashMap<>();
final Map<Object, SharedBlobCacheService.CacheFileRegion<Object>> cacheEntries = new HashMap<>();

assertThat("All regions are free", cacheService.freeRegionCount(), equalTo(numRegions));
assertThat("Cache has no entries", cacheService.maybeEvictLeastUsed(), is(false));
Expand Down

0 comments on commit cdfd19d

Please sign in to comment.