Skip to content

Commit

Permalink
Merge pull request #1914 from cloudflare/milan/actor-cache-refactor
Browse files Browse the repository at this point in the history
Quick ActorCache refactor
  • Loading branch information
MellowYarker authored Apr 15, 2024
2 parents 2085fbd + 3b5e7a0 commit db44674
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 68 deletions.
120 changes: 61 additions & 59 deletions src/workerd/io/actor-cache.c++
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void ActorCache::clear(Lock& lock) {
if (entry->link.isLinked()) {
if (entry->isDirty()) {
dirtyList.remove(*entry);
entry->syncStatus = EntrySyncStatus::NOT_IN_CACHE;
entry->setNotInCache();
} else {
removeEntry(lock, *entry);
}
Expand Down Expand Up @@ -102,7 +102,7 @@ ActorCache::Entry::~Entry() noexcept(false) {
}

KJ_REQUIRE(!link.isLinked(),
"must remove Entry from lists before destroying", static_cast<int>(syncStatus));
"must remove Entry from lists before destroying", static_cast<int>(getSyncStatus()));
}
}

Expand Down Expand Up @@ -277,25 +277,25 @@ bool ActorCache::SharedLru::evictIfNeeded(Lock& lock) const {
}
}

void ActorCache::touchEntry(Lock& lock, Entry& entry, const ReadOptions& options) {
if (!options.noCache) {
if (!entry.isDirty()) {
entry.isStale = false;
lock->remove(entry);
lock->add(entry);
}

// If this is a dirty entry previously marked no-cache, remove that mark. This results in the
// same end state as if the entry had been flushed and evicted before the read -- it would have
// been read back, and then into cache.
entry.noCache = false;
void ActorCache::touchEntry(Lock& lock, Entry& entry) {
if (!entry.isDirty()) {
entry.isStale = false;
lock->remove(entry);
addToCleanList(lock, entry);
}

// We only call `touchEntry` when the operation or the LRU has !noCache, so we want to cache this.
//
// If this is a dirty entry previously marked no-cache, remove that mark. This results in the
// same end state as if the entry had been flushed and evicted before the read -- it would have
// been read back, and then into cache.
entry.noCache = false;
}

void ActorCache::removeEntry(Lock& lock, Entry& entry) {
KJ_IASSERT(!entry.isDirty());
lock->remove(entry);
entry.syncStatus = EntrySyncStatus::NOT_IN_CACHE;
entry.setNotInCache();
}

void ActorCache::evictEntry(Lock& lock, Entry& entry) {
Expand Down Expand Up @@ -335,7 +335,7 @@ void ActorCache::verifyConsistencyForTest() {
}
prevKey = entry->key;
auto& key = entry->key;
switch(entry->valueStatus) {
switch(entry->getValueStatus()) {
case EntryValueStatus::ABSENT: {
KJ_ASSERT(!prevGapIsKnownEmpty || !entry->gapIsKnownEmpty,
"clean negative entry in the middle of a known-empty gap is redundant", key);
Expand All @@ -352,7 +352,7 @@ void ActorCache::verifyConsistencyForTest() {
}
}

KJ_ASSERT(entry->syncStatus != EntrySyncStatus::NOT_IN_CACHE,
KJ_ASSERT(entry->getSyncStatus() != EntrySyncStatus::NOT_IN_CACHE,
"entry should not appear in map", entry->key);
KJ_ASSERT(entry->link.isLinked());

Expand All @@ -370,7 +370,7 @@ kj::OneOf<kj::Maybe<ActorCache::Value>, kj::Promise<kj::Maybe<ActorCache::Value>

auto lock = lru.cleanList.lockExclusive();
auto entry = findInCache(lock, kj::mv(key), options);
switch (entry->valueStatus) {
switch (entry->getValueStatus()) {
case EntryValueStatus::PRESENT:
case EntryValueStatus::ABSENT: {
return entry->getValue();
Expand Down Expand Up @@ -543,7 +543,7 @@ kj::OneOf<ActorCache::GetResultList, kj::Promise<ActorCache::GetResultList>>
auto lock = lru.cleanList.lockExclusive();
for (auto& key: keys) {
auto entry = findInCache(lock, key, options);
switch(entry->valueStatus) {
switch(entry->getValueStatus()) {
case EntryValueStatus::PRESENT:
case EntryValueStatus::ABSENT: {
cachedEntries.add(kj::mv(entry));
Expand Down Expand Up @@ -954,9 +954,11 @@ kj::OneOf<ActorCache::GetResultList, kj::Promise<ActorCache::GetResultList>>
positiveCount < limit.orDefault(kj::maxValue); ++iter) {
Entry& entry = **iter;

touchEntry(lock, entry, options);
if (!options.noCache) {
touchEntry(lock, entry);
}

switch(entry.valueStatus) {
switch(entry.getValueStatus()) {
case EntryValueStatus::ABSENT: {
cachedEntries.add(kj::atomicAddRef(entry));
if (storageListStart != kj::none && entry.isDirty()) {
Expand All @@ -983,14 +985,16 @@ kj::OneOf<ActorCache::GetResultList, kj::Promise<ActorCache::GetResultList>>
if (storageListStart == kj::none && !entry.gapIsKnownEmpty) {
// The gap after this entry is not cached so we'll have to start our list operation here.
storageListStart = entry.key;
storageListStartIsKnown = entry.valueStatus != EntryValueStatus::UNKNOWN;
storageListStartIsKnown = entry.getValueStatus() != EntryValueStatus::UNKNOWN;
}
}

if (iter != ordered.end() && iter->get()->key == endKey) {
// We have an entry exactly at our end, it might even be a previously inserted UNKNOWN. Let's
// touch it for freshness.
touchEntry(lock, **iter, options);
if (!options.noCache) {
touchEntry(lock, **iter);
}
}

if (storageListStart == kj::none || knownPrefixSize >= limit.orDefault(kj::maxValue)) {
Expand Down Expand Up @@ -1256,7 +1260,9 @@ kj::OneOf<ActorCache::GetResultList, kj::Promise<ActorCache::GetResultList>>
if (iter != map.ordered().end() && iter->get()->key == endKey) {
// We have an entry exactly at our end, it might even be a previously inserted UNKNOWN. Let's
// touch it for freshness.
touchEntry(lock, **iter, options);
if (!options.noCache) {
touchEntry(lock, **iter);
}
}
for (; positiveCount < limit.orDefault(kj::maxValue); ) {
if (iter == ordered.begin()) {
Expand All @@ -1281,11 +1287,13 @@ kj::OneOf<ActorCache::GetResultList, kj::Promise<ActorCache::GetResultList>>
break;
}

touchEntry(lock, entry, options);
if (!options.noCache) {
touchEntry(lock, entry);
}

// Note that we need to add even negative entries to `cachedEntries` so that they override
// whatever we read from storage later. However, they should not count against the limit.
switch(entry.valueStatus) {
switch(entry.getValueStatus()) {
case EntryValueStatus::ABSENT: {
cachedEntries.add(kj::atomicAddRef(entry));
if (storageListEnd != kj::none && entry.isDirty()) {
Expand Down Expand Up @@ -1405,7 +1413,9 @@ kj::Own<ActorCache::Entry> ActorCache::findInCache(
if (iter != ordered.end() && iter->get()->key == key) {
// Found exact matching entry.
Entry& entry = **iter;
touchEntry(lock, entry, options);
if (!options.noCache) {
touchEntry(lock, entry);
}
return kj::atomicAddRef(entry);
} else {
// Key is not in the map, but we have to check for outstanding list() operations by checking
Expand Down Expand Up @@ -1479,22 +1489,20 @@ kj::Own<ActorCache::Entry> ActorCache::addReadResultToCache(
// the caching logic.
//
// Because of this, we know it is correct to leave `gapIsKnownEmpty = false` on our new entry.
entry->syncStatus = EntrySyncStatus::CLEAN;
lock->add(*entry);
addToCleanList(lock, *entry);
return kj::atomicAddRef(*entry);
});

if (slot.get() != entry.get()) {
// There was a pre-existing entry with the key, so ours wasn't inserted.
switch (slot->valueStatus) {
switch (slot->getValueStatus()) {
case EntryValueStatus::UNKNOWN: {
// Oh, it's just a marker for the end of a list range. Go ahead and insert our new entry
// into the same slot.
KJ_ASSERT(!slot->gapIsKnownEmpty); // UNKNOWN entry should never have gapIsKnownEmpty.
removeEntry(lock, *slot);

entry->syncStatus = EntrySyncStatus::CLEAN;
lock->add(*entry);
addToCleanList(lock, *entry);
slot = kj::atomicAddRef(*entry);
break;
}
Expand All @@ -1517,7 +1525,7 @@ kj::Own<ActorCache::Entry> ActorCache::addReadResultToCache(
// one that coincidentally matches what we pulled off disk. However, the open transaction
// is still going to be committed, writing the intermediate value, so we still need to plan
// to write this value again in the next transaction.
touchEntry(lock, *slot, options);
touchEntry(lock, *slot);
break;
}
}
Expand Down Expand Up @@ -1587,8 +1595,7 @@ void ActorCache::markGapsEmpty(Lock& lock, KeyPtr beginKey, kj::Maybe<KeyPtr> en
// We must insert an UNKNOWN entry to cap our range.
KJ_IF_SOME(k, endKey) {
auto entry = kj::atomicRefcounted<Entry>(*this, cloneKey(k), EntryValueStatus::UNKNOWN);
entry->syncStatus = EntrySyncStatus::CLEAN;
lock->add(*entry);
addToCleanList(lock, *entry);
map.insert(kj::mv(entry));
} else {
// No UNKNOWN needed since the end is actually the end of the key space.
Expand All @@ -1606,7 +1613,7 @@ void ActorCache::markGapsEmpty(Lock& lock, KeyPtr beginKey, kj::Maybe<KeyPtr> en
for (auto iter = beginIter; iter != mapEnd; ++iter) {
auto& entry = **iter;

if (entry.valueStatus != EntryValueStatus::PRESENT && !entry.isDirty() &&
if (entry.getValueStatus() != EntryValueStatus::PRESENT && !entry.isDirty() &&
(iter != endIter || iter->get()->gapIsKnownEmpty)) {
// Either:
// (a) This is an UNKNOWN entry.
Expand Down Expand Up @@ -1699,7 +1706,7 @@ ActorCache::GetResultList::GetResultList(

auto add = [&](kj::Own<ActorCache::Entry>&& entry, CacheStatus status) {
// Remove null values.
if (entry->valueStatus == ActorCache::EntryValueStatus::PRESENT) {
if (entry->getValueStatus() == ActorCache::EntryValueStatus::PRESENT) {
entries.add(kj::mv(entry));
cacheStatuses.add(status);
}
Expand Down Expand Up @@ -1971,8 +1978,7 @@ ActorCache::DeleteAllResults ActorCache::deleteAll(WriteOptions options) {
map.findOrCreate(Key{}, [&]() {
Key key;
auto entry = kj::atomicRefcounted<Entry>(*this, kj::mv(key), EntryValueStatus::ABSENT);
lock->add(*entry);
entry->syncStatus = EntrySyncStatus::CLEAN;
addToCleanList(lock, *entry);
entry->gapIsKnownEmpty = true;
return entry;
});
Expand Down Expand Up @@ -2018,7 +2024,7 @@ void ActorCache::putImpl(Lock& lock, kj::Own<Entry> newEntry,
// Exact same entry already exists.
auto& slot = *iter;

switch (slot->valueStatus) {
switch (slot->getValueStatus()) {
case EntryValueStatus::PRESENT: {
if (slot->getValuePtr() == newEntry->getValuePtr()) {
// No change! The entry already had this value. Might as well skip the whole storage
Expand Down Expand Up @@ -2059,7 +2065,7 @@ void ActorCache::putImpl(Lock& lock, kj::Own<Entry> newEntry,

if (slot->isDirty()) {
dirtyList.remove(*slot);
slot->syncStatus = EntrySyncStatus::NOT_IN_CACHE;
slot->setNotInCache();

// Entry may have `countedDelete` indicating we're still waiting to get a count from a
// previous delete operation. If so, we'll need to inherit it in case that delete operation
Expand All @@ -2075,9 +2081,7 @@ void ActorCache::putImpl(Lock& lock, kj::Own<Entry> newEntry,
// Swap in the new entry.
KJ_DASSERT(slot->key == newEntry->key);
slot = kj::mv(newEntry);
slot->syncStatus = EntrySyncStatus::DIRTY;
dirtyList.add(*slot);

addToDirtyList(*slot);
} else {
// No exact matching entry exists, insert a new one.

Expand All @@ -2087,7 +2091,7 @@ void ActorCache::putImpl(Lock& lock, kj::Own<Entry> newEntry,
--iter;
previousGapKnownEmpty = iter->get()->gapIsKnownEmpty;
}
if (previousGapKnownEmpty && newEntry->valueStatus == EntryValueStatus::ABSENT) {
if (previousGapKnownEmpty && newEntry->getValueStatus() == EntryValueStatus::ABSENT) {
// No change! The entry is already known not to exist, and we're trying to delete it. Might
// as well skip the whole storage operation.
return;
Expand All @@ -2101,8 +2105,7 @@ void ActorCache::putImpl(Lock& lock, kj::Own<Entry> newEntry,
KJ_IF_SOME(c, countedDelete) {
slot->countedDelete = kj::addRef(c);
}
slot->syncStatus = EntrySyncStatus::DIRTY;
dirtyList.add(*slot);
addToDirtyList(*slot);
}

ensureFlushScheduled(options);
Expand Down Expand Up @@ -2315,7 +2318,7 @@ kj::Promise<void> ActorCache::flushImpl(uint retryCount) {
auto countEntry = [&](Entry& entry) {
// Counts up the number of operations and RPC message sizes we'll need to cover this entry.

entry.syncStatus = EntrySyncStatus::FLUSHING;
entry.setFlushing();

auto keySizeInWords = bytesToWordsRoundUp(entry.key.size());

Expand Down Expand Up @@ -2488,7 +2491,7 @@ kj::Promise<void> ActorCache::flushImpl(uint retryCount) {
// TODO(cleanup): kj::Vector<T>::filter() would be nice to have here.
auto dst = r.deletedDirty.begin();
for (auto src = r.deletedDirty.begin(); src != r.deletedDirty.end(); ++src) {
if (src->get()->syncStatus == EntrySyncStatus::DIRTY) {
if (src->get()->getSyncStatus() == EntrySyncStatus::DIRTY) {
if (dst != src) *dst = kj::mv(*src);
++dst;
}
Expand All @@ -2498,12 +2501,12 @@ kj::Promise<void> ActorCache::flushImpl(uint retryCount) {
// Mark all `FLUSHING` entries as `CLEAN`. Note that we know that all `FLUSHING` must
// form a prefix of `dirtyList` since any new entries would have been added to the end.
for (auto& entry: dirtyList) {
if (entry.syncStatus == EntrySyncStatus::DIRTY) {
if (entry.getSyncStatus() == EntrySyncStatus::DIRTY) {
// Completed all FLUSHING entries.
break;
}

KJ_ASSERT(entry.syncStatus == EntrySyncStatus::FLUSHING);
KJ_ASSERT(entry.getSyncStatus() == EntrySyncStatus::FLUSHING);

// We know all `countedDelete` operations were satisfied so we can remove this if it's
// present. Note that if, during the flush, the entry was overwritten, then the new entry
Expand All @@ -2514,10 +2517,10 @@ kj::Promise<void> ActorCache::flushImpl(uint retryCount) {

dirtyList.remove(entry);
if (entry.noCache) {
entry.syncStatus = EntrySyncStatus::NOT_IN_CACHE;
entry.setNotInCache();
evictEntry(lock, entry);
} else {
if (entry.gapIsKnownEmpty && entry.valueStatus == EntryValueStatus::ABSENT) {
if (entry.gapIsKnownEmpty && entry.getValueStatus() == EntryValueStatus::ABSENT) {
// This is a negative entry, and is followed by a known-empty gap. If the previous entry
// also has `gapIsKnownEmpty`, then this entry is entirely redundant.
auto& map = currentValues.get(lock);
Expand All @@ -2529,16 +2532,15 @@ kj::Promise<void> ActorCache::flushImpl(uint retryCount) {
--prevIter;
if (prevIter->get()->gapIsKnownEmpty) {
// Yep!
entry.syncStatus = EntrySyncStatus::NOT_IN_CACHE;
entry.setNotInCache();
map.erase(*entryIter);
// WARNING: We might have just deleted `entry`.
continue;
}
}
}

entry.syncStatus = EntrySyncStatus::CLEAN;
lock->add(entry);
addToCleanList(lock, entry);
}
}
}
Expand Down Expand Up @@ -3069,7 +3071,7 @@ kj::OneOf<ActorCache::GetResultList, kj::Promise<ActorCache::GetResultList>>
// TODO(cleanup): Add `iterRange()` to KJ's public interface.
for (auto& change: kj::_::iterRange(beginIter, endIter)) {
changedEntries.add(kj::atomicAddRef(*change.entry));
if (change.entry->valueStatus == EntryValueStatus::PRESENT) {
if (change.entry->getValueStatus() == EntryValueStatus::PRESENT) {
++positiveCount;
}
if (positiveCount == limit.orDefault(kj::maxValue)) break;
Expand Down Expand Up @@ -3098,7 +3100,7 @@ kj::OneOf<ActorCache::GetResultList, kj::Promise<ActorCache::GetResultList>>
for (auto iter = endIter; iter != beginIter;) {
--iter;
changedEntries.add(kj::atomicAddRef(*iter->entry));
if (iter->entry->valueStatus == EntryValueStatus::PRESENT) {
if (iter->entry->getValueStatus() == EntryValueStatus::PRESENT) {
++positiveCount;
}
if (positiveCount == limit.orDefault(kj::maxValue)) break;
Expand Down Expand Up @@ -3278,7 +3280,7 @@ kj::Maybe<ActorCache::KeyPtr> ActorCache::Transaction::putImpl(
auto& slot = entriesToWrite.upsert(kj::mv(change), [&](auto& existing, auto&& replacement) {
replaced = true;
KJ_IF_SOME(c, count) {
c += existing.entry->valueStatus == EntryValueStatus::PRESENT;
c += existing.entry->getValueStatus() == EntryValueStatus::PRESENT;
}
existing = kj::mv(replacement);
});
Expand Down
Loading

0 comments on commit db44674

Please sign in to comment.