Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix get logs patch 2 #1381

Merged
merged 5 commits into from
Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

### Bug Fixes
* Added `debug_getBadBlocks` JSON-RPC API to analyze and detect consensus flaws. Even if a block is rejected it will be returned by this method [\#1378](https://github.com/hyperledger/besu/pull/1378)
* Fix logs queries missing results against chain head [\#1351](https://github.com/hyperledger/besu/pull/1351)
* Fix logs queries missing results against chain head [\#1351](https://github.com/hyperledger/besu/pull/1351) and [\#1381](https://github.com/hyperledger/besu/pull/1381)

#### Previously identified known issues

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.hyperledger.besu.ethereum.api.query.cache.LogBloomCacheMetadata.DEFAULT_VERSION;

import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -51,7 +52,7 @@ public void start() {
}
final LogBloomCacheMetadata logBloomCacheMetadata =
LogBloomCacheMetadata.lookUpFrom(cacheDir);
if (logBloomCacheMetadata.getVersion() == 0) {
if (logBloomCacheMetadata.getVersion() < DEFAULT_VERSION) {
try (Stream<Path> walk = Files.walk(cacheDir)) {
walk.filter(Files::isRegularFile).map(Path::toFile).forEach(File::delete);
} catch (Exception e) {
Expand All @@ -65,10 +66,14 @@ public void start() {
blockchain.observeBlockAdded(
event -> {
if (event.isNewCanonicalHead()) {
final BlockHeader eventBlockHeader = event.getBlock().getHeader();
final Optional<BlockHeader> commonAncestorBlockHeader =
blockchain.getBlockHeader(event.getCommonAncestorHash());
transactionLogBloomCacher.cacheLogsBloomForBlockHeader(
event.getBlock().getHeader(), Optional.empty());
eventBlockHeader, commonAncestorBlockHeader, Optional.empty());
}
}));

transactionLogBloomCacher
.getScheduler()
.scheduleFutureTask(transactionLogBloomCacher::cacheAll, Duration.ofMinutes(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public class LogBloomCacheMetadata {
private static final Logger LOG = LogManager.getLogger();

public static final int DEFAULT_VERSION = 1;
public static final int DEFAULT_VERSION = 2;

private static final String METADATA_FILENAME = "CACHE_METADATA.json";
private static final ObjectMapper MAPPER = new ObjectMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.ProcessableBlockHeader;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;

import java.io.File;
Expand Down Expand Up @@ -110,7 +111,9 @@ public CachingStatus generateLogBloomCache(final long start, final long stop) {
blockchain
.getBlockHeader(blockNum)
.ifPresent(
blockHeader -> cacheLogsBloomForBlockHeader(blockHeader, Optional.of(cacheFile)));
blockHeader ->
cacheLogsBloomForBlockHeader(
blockHeader, Optional.empty(), Optional.of(cacheFile)));
fillCacheFile(blockNum, blockNum + BLOCKS_PER_BLOOM_CACHE, cacheFile);
}
} catch (final Exception e) {
Expand Down Expand Up @@ -141,7 +144,9 @@ private void fillCacheFile(final long startBlock, final long stopBlock, final Fi
}

void cacheLogsBloomForBlockHeader(
final BlockHeader blockHeader, final Optional<File> reusedCacheFile) {
final BlockHeader blockHeader,
final Optional<BlockHeader> commonAncestorBlockHeader,
final Optional<File> reusedCacheFile) {
try {
if (cachingStatus.cachingCount.incrementAndGet() != 1) {
return;
Expand All @@ -151,6 +156,20 @@ void cacheLogsBloomForBlockHeader(
final File cacheFile = reusedCacheFile.orElse(calculateCacheFileName(blockNumber, cacheDir));
if (cacheFile.exists()) {
try {
final Optional<Long> ancestorBlockNumber =
commonAncestorBlockHeader.map(ProcessableBlockHeader::getNumber);
if (ancestorBlockNumber.isPresent()) {
// walk through the blocks from the common ancestor to the received block in order to
// reload the cache in case of reorg
for (long number = ancestorBlockNumber.get() + 1;
number < blockHeader.getNumber();
number++) {
Optional<BlockHeader> ancestorBlockHeader = blockchain.getBlockHeader(number);
if (ancestorBlockHeader.isPresent()) {
cacheSingleBlock(ancestorBlockHeader.get(), cacheFile);
}
}
}
Comment on lines +159 to +172
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Feels a bit weird to use .map and then .isPresent here. Would be worth considering using .ifPresent:

Suggested change
final Optional<Long> ancestorBlockNumber =
commonAncestorBlockHeader.map(ProcessableBlockHeader::getNumber);
if (ancestorBlockNumber.isPresent()) {
// walk through the blocks from the common ancestor to the received block in order to
// reload the cache in case of reorg
for (long number = ancestorBlockNumber.get() + 1;
number < blockHeader.getNumber();
number++) {
Optional<BlockHeader> ancestorBlockHeader = blockchain.getBlockHeader(number);
if (ancestorBlockHeader.isPresent()) {
cacheSingleBlock(ancestorBlockHeader.get(), cacheFile);
}
}
}
commonAncestorBlockHeader
.map(ProcessableBlockHeader::getNumber)
.ifPresent(ancestorBlockNumber -> {
// walk through the blocks from the common ancestor to the received block in order to
// reload the cache in case of reorg
for (long number = ancestorBlockNumber.get() + 1;
number < blockHeader.getNumber();
number++) {
blockchain.getBlockHeader(number).ifPresent(
header -> cacheSingleBlock(ancestorBlockHeader.get(), cacheFile));
}
});

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this isn't removing logs from the cache correctly when a reorg makes the chain shorter. It needs to delete all cache entries between the new chain head number and the old one if the new head number is less.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, if this index is only used as the initial filter, you wouldn't need to remove the later blocks when a reorg makes the chain shorter - you'd potentially check the blocks in this range when you didn't need to but it would just be treated as a false positive and sorted out once you're pulling logs out of the actual blocks.

In that case I think this should work. Will deploy it and test it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact in case of a shorter reorg the cache will be truncated the first time cacheSingleBlock is used. this is why I did not add anything else here.

Copy link
Contributor Author

@matkt matkt Sep 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always use ifpresent but in this case cacheSingleBlock can throw an exception and there is a catch here. if I use ifPresent I should put 2 catch with the same implementation. this is the main reason for using isPresent here

https://github.com/matkt/besu/blob/931f3c2a3c56fa489b62a0a872aa54edc412bce4/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/query/cache/TransactionLogBloomCacher.java#L174

cacheSingleBlock(blockHeader, cacheFile);
} catch (final InvalidCacheException e) {
populateLatestSegment(blockNumber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void shouldUpdateCacheWhenBlockAdded() throws IOException {
assertThat(logBloom.length()).isEqualTo(BLOOM_BITS_LENGTH * 3);

transactionLogBloomCacher.cacheLogsBloomForBlockHeader(
blockchain.getBlockHeader(3).get(), Optional.of(logBloom));
blockchain.getBlockHeader(3).get(), Optional.empty(), Optional.of(logBloom));

assertThat(logBloom.length()).isEqualTo(BLOOM_BITS_LENGTH * 4);
assertThat(cacheDir.getRoot().list().length).isEqualTo(1);
Expand All @@ -152,7 +152,7 @@ public void shouldReloadCacheWhenBLockIsMissing() throws IOException {
}

transactionLogBloomCacher.cacheLogsBloomForBlockHeader(
blockchain.getBlockHeader(4).get(), Optional.of(logBloom));
blockchain.getBlockHeader(4).get(), Optional.empty(), Optional.of(logBloom));

for (int i = 0; i < 5; i++) {
assertThat(blockHeaders.get(i).getLogsBloom().toArray())
Expand Down Expand Up @@ -186,16 +186,37 @@ public void shouldReloadCacheWhenFileIsInvalid() throws IOException {
public void shouldUpdateCacheWhenChainReorgFired() throws IOException {
final File logBloom = cacheDir.newFile("logBloom-0.cache");

final List<BlockHeader> firstBranch = new ArrayList<>();

for (int i = 0; i < 5; i++) {
createBlock(i);
firstBranch.add(createBlock(i));
}

transactionLogBloomCacher.cacheLogsBloomForBlockHeader(
blockchain.getBlockHeader(4).get(), Optional.of(logBloom));
blockchain.getBlockHeader(4).get(), Optional.empty(), Optional.of(logBloom));
assertThat(logBloom.length()).isEqualTo(BLOOM_BITS_LENGTH * 5);
for (int i = 0; i < 5; i++) {
assertThat(firstBranch.get(i).getLogsBloom().toArray())
.containsExactly(readLogBloomCache(logBloom, i));
}

final List<BlockHeader> forkBranch = new ArrayList<>();
forkBranch.add(firstBranch.get(0));
forkBranch.add(firstBranch.get(1));
for (int i = 2; i < 5; i++) {
forkBranch.add(createBlock(i, Optional.of("111111111111111111111111")));
}

transactionLogBloomCacher.cacheLogsBloomForBlockHeader(
blockchain.getBlockHeader(1).get(), Optional.of(logBloom));
blockchain.getBlockHeader(4).get(), blockchain.getBlockHeader(1), Optional.of(logBloom));
assertThat(logBloom.length()).isEqualTo(BLOOM_BITS_LENGTH * 5);
for (int i = 0; i < 5; i++) {
assertThat(forkBranch.get(i).getLogsBloom().toArray())
.containsExactly(readLogBloomCache(logBloom, i));
}

transactionLogBloomCacher.cacheLogsBloomForBlockHeader(
blockchain.getBlockHeader(1).get(), Optional.empty(), Optional.of(logBloom));
assertThat(logBloom.length()).isEqualTo(BLOOM_BITS_LENGTH * 2);

assertThat(cacheDir.getRoot().list().length).isEqualTo(1);
Expand All @@ -217,7 +238,12 @@ private byte[] readLogBloomCache(final File logBloom, final long number) throws
}

private BlockHeader createBlock(final long number) {
final Address testAddress = Address.fromHexString(String.format("%02X", number));
return createBlock(number, Optional.empty());
}

private BlockHeader createBlock(final long number, final Optional<String> message) {
final Address testAddress =
Address.fromHexString(message.orElse(String.format("%02X", number)));
final Bytes testMessage = Bytes.fromHexString(String.format("%02X", number));
final Log testLog = new Log(testAddress, testMessage, List.of());
final BlockHeader fakeHeader =
Expand All @@ -241,7 +267,6 @@ private BlockHeader createBlock(final long number) {
new MainnetBlockHeaderFunctions());
testHash = fakeHeader.getHash();
when(blockchain.getBlockHeader(number)).thenReturn(Optional.of(fakeHeader));

return fakeHeader;
}
}