Skip to content

Commit

Permalink
Log Event Streaming for Plugin API (#186)
Browse files Browse the repository at this point in the history

Signed-off-by: Ratan Rai Sur <ratan.r.sur@gmail.com>
  • Loading branch information
RatanRSur authored Nov 13, 2019
1 parent e60aa3f commit 271d578
Show file tree
Hide file tree
Showing 17 changed files with 281 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public void startNode(final BesuNode node) {
besuPluginContext.addService(
BesuEvents.class,
new BesuEventsImpl(
besuController.getProtocolContext().getBlockchain(),
besuController.getProtocolManager().getBlockBroadcaster(),
besuController.getTransactionPool(),
besuController.getSyncState()));
Expand Down
2 changes: 1 addition & 1 deletion besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ private void createLogsSubscriptionService(
final LogsSubscriptionService logsSubscriptionService =
new LogsSubscriptionService(subscriptionManager);

blockchain.observeBlockAdded(logsSubscriptionService);
blockchain.observeLogs(logsSubscriptionService);
}

private void createSyncingSubscriptionService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,7 @@ private BesuCommand startPlugins() {
besuPluginContext.addService(
BesuEvents.class,
new BesuEventsImpl(
besuController.getProtocolContext().getBlockchain(),
besuController.getProtocolManager().getBlockBroadcaster(),
besuController.getTransactionPool(),
besuController.getSyncState()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,37 @@
*/
package org.hyperledger.besu.services;

import static java.util.stream.Collectors.toUnmodifiableList;

import org.hyperledger.besu.ethereum.api.query.LogsQuery;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.LogTopic;
import org.hyperledger.besu.ethereum.core.LogWithMetadata;
import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.plugin.data.Address;
import org.hyperledger.besu.plugin.data.BlockHeader;
import org.hyperledger.besu.plugin.data.PropagatedBlockContext;
import org.hyperledger.besu.plugin.data.Quantity;
import org.hyperledger.besu.plugin.data.UnformattedData;
import org.hyperledger.besu.plugin.services.BesuEvents;

import java.util.List;
import java.util.function.Supplier;

public class BesuEventsImpl implements BesuEvents {
private Blockchain blockchain;
private final BlockBroadcaster blockBroadcaster;
private final TransactionPool transactionPool;
private final SyncState syncState;

public BesuEventsImpl(
final Blockchain blockchain,
final BlockBroadcaster blockBroadcaster,
final TransactionPool transactionPool,
final SyncState syncState) {
this.blockchain = blockchain;
this.blockBroadcaster = blockBroadcaster;
this.transactionPool = transactionPool;
this.syncState = syncState;
Expand Down Expand Up @@ -83,6 +95,36 @@ public void removeSyncStatusListener(final long listenerIdentifier) {
syncState.unsubscribeSyncStatus(listenerIdentifier);
}

@Override
public long addLogListener(
final List<Address> addresses,
final List<List<UnformattedData>> topics,
final LogListener logListener) {
final List<org.hyperledger.besu.ethereum.core.Address> besuAddresses =
addresses.stream()
.map(org.hyperledger.besu.ethereum.core.Address::fromPlugin)
.collect(toUnmodifiableList());
final List<List<LogTopic>> besuTopics =
topics.stream()
.map(
subList -> subList.stream().map(LogTopic::fromPlugin).collect(toUnmodifiableList()))
.collect(toUnmodifiableList());

final LogsQuery logsQuery = new LogsQuery(besuAddresses, besuTopics);

return blockchain.observeLogs(
logWithMetadata -> {
if (logsQuery.matches(LogWithMetadata.fromPlugin(logWithMetadata))) {
logListener.onLogEmitted(logWithMetadata);
}
});
}

@Override
public void removeLogListener(final long listenerIdentifier) {
blockchain.removeObserver(listenerIdentifier);
}

private static PropagatedBlockContext blockPropagatedContext(
final Supplier<BlockHeader> blockHeaderSupplier,
final Supplier<Quantity> totalDifficultySupplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@

import org.hyperledger.besu.crypto.SECP256K1.KeyPair;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.DefaultBlockchain;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.core.TransactionTestFixture;
import org.hyperledger.besu.ethereum.core.Wei;
Expand All @@ -39,19 +41,25 @@
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolFactory;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.mainnet.TransactionValidator;
import org.hyperledger.besu.ethereum.mainnet.ValidationResult;
import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueStoragePrefixedKeyBlockchainStorage;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.data.LogWithMetadata;
import org.hyperledger.besu.plugin.data.PropagatedBlockContext;
import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.data.Transaction;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import org.hyperledger.besu.testutil.TestClock;
import org.hyperledger.besu.util.uint.UInt256;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
Expand All @@ -76,28 +84,29 @@ public class BesuEventsImplTest {
@Mock private EthContext mockEthContext;
@Mock private EthMessages mockEthMessages;
@Mock private EthScheduler mockEthScheduler;
@Mock private MutableBlockchain mockBlockchain;
@Mock private TransactionValidator mockTransactionValidator;
@Mock private ProtocolSpec<Void> mockProtocolSpec;
@Mock private WorldStateArchive mockWorldStateArchive;
@Mock private WorldState mockWorldState;
private org.hyperledger.besu.ethereum.core.BlockHeader fakeBlockHeader;
private TransactionPool transactionPool;
private BlockBroadcaster blockBroadcaster;
private BesuEventsImpl serviceImpl;
private MutableBlockchain blockchain;
private final BlockDataGenerator gen = new BlockDataGenerator();

@Before
public void setUp() {
fakeBlockHeader =
new org.hyperledger.besu.ethereum.core.BlockHeader(
null, null, null, null, null, null, null, null, 1, 1, 1, 1, null, null, 1, null);

when(mockBlockchain.getBlockHeader(any())).thenReturn(Optional.of(fakeBlockHeader));
blockchain =
DefaultBlockchain.createMutable(
gen.genesisBlock(),
new KeyValueStoragePrefixedKeyBlockchainStorage(
new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions()),
new NoOpMetricsSystem());
when(mockEthContext.getEthMessages()).thenReturn(mockEthMessages);
when(mockEthContext.getEthPeers()).thenReturn(mockEthPeers);
when(mockEthContext.getScheduler()).thenReturn(mockEthScheduler);
when(mockEthPeers.streamAvailablePeers()).thenReturn(Stream.empty()).thenReturn(Stream.empty());
when(mockProtocolContext.getBlockchain()).thenReturn(mockBlockchain);
when(mockProtocolContext.getBlockchain()).thenReturn(blockchain);
when(mockProtocolContext.getWorldStateArchive()).thenReturn(mockWorldStateArchive);
when(mockProtocolSchedule.getByBlockNumber(anyLong())).thenReturn(mockProtocolSpec);
when(mockProtocolSpec.getTransactionValidator()).thenReturn(mockTransactionValidator);
Expand All @@ -107,6 +116,7 @@ public void setUp() {
when(mockWorldStateArchive.get(any())).thenReturn(Optional.of(mockWorldState));

blockBroadcaster = new BlockBroadcaster(mockEthContext);
syncState = new SyncState(blockchain, mockEthPeers);
transactionPool =
TransactionPoolFactory.createTransactionPool(
mockProtocolSchedule,
Expand All @@ -117,9 +127,8 @@ public void setUp() {
syncState,
Wei.ZERO,
TransactionPoolConfiguration.builder().txPoolMaxSize(1).build());
syncState = new SyncState(mockBlockchain, mockEthPeers);

serviceImpl = new BesuEventsImpl(blockBroadcaster, transactionPool, syncState);
serviceImpl = new BesuEventsImpl(blockchain, blockBroadcaster, transactionPool, syncState);
}

@Test
Expand Down Expand Up @@ -157,7 +166,10 @@ public void syncStatusEventDoesNotFireAfterUnsubscribe() {
}

private void setSyncTarget() {
syncState.setSyncTarget(mock(EthPeer.class), fakeBlockHeader);
syncState.setSyncTarget(
mock(EthPeer.class),
new org.hyperledger.besu.ethereum.core.BlockHeader(
null, null, null, null, null, null, null, null, 1, 1, 1, 1, null, null, 1, null));
}

private void clearSyncTarget() {
Expand Down Expand Up @@ -270,6 +282,43 @@ public void transactionDroppedEventDoesNotFireAfterUnsubscribe() {
assertThat(result.get()).isNull();
}

@Test
public void logEventFiresAfterSubscribe() {
final List<LogWithMetadata> result = new ArrayList<>();
blockchain.observeLogs(result::add);

assertThat(result).isEmpty();
final var block =
gen.block(
new BlockDataGenerator.BlockOptions()
.setParentHash(blockchain.getGenesisBlock().getHash()));
blockchain.appendBlock(block, gen.receipts(block));
assertThat(result).hasSize(4);
}

@Test
public void logEventDoesNotFireAfterUnsubscribe() {
final List<LogWithMetadata> result = new ArrayList<>();
final long id = blockchain.observeLogs(result::add);

assertThat(result).isEmpty();
final var block =
gen.block(
new BlockDataGenerator.BlockOptions()
.setParentHash(blockchain.getGenesisBlock().getHash()));
blockchain.appendBlock(block, gen.receipts(block));
assertThat(result).hasSize(4);

result.clear();

serviceImpl.removeLogListener(id);
final var block2 =
gen.block(new BlockDataGenerator.BlockOptions().setParentHash(block.getHash()));
blockchain.appendBlock(block2, gen.receipts(block2));

assertThat(result).isEmpty();
}

private Block generateBlock() {
final BlockBody body = new BlockBody(Collections.emptyList(), Collections.emptyList());
return new Block(new BlockHeaderTestFixture().buildHeader(), body);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.LogResult;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscriptionType;
import org.hyperledger.besu.ethereum.chain.BlockAddedEvent;
import org.hyperledger.besu.ethereum.chain.BlockAddedObserver;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.LogWithMetadata;

import java.util.List;
import java.util.function.Consumer;

public class LogsSubscriptionService implements BlockAddedObserver {
public class LogsSubscriptionService implements Consumer<LogWithMetadata> {

private final SubscriptionManager subscriptionManager;

Expand All @@ -32,22 +30,12 @@ public LogsSubscriptionService(final SubscriptionManager subscriptionManager) {
}

@Override
public void onBlockAdded(final BlockAddedEvent event, final Blockchain __) {
final List<LogsSubscription> logsSubscriptions =
subscriptionManager.subscriptionsOfType(SubscriptionType.LOGS, LogsSubscription.class);

event
.getLogsWithMetadata()
public void accept(final LogWithMetadata logWithMetadata) {
subscriptionManager.subscriptionsOfType(SubscriptionType.LOGS, LogsSubscription.class).stream()
.filter(logsSubscription -> logsSubscription.getLogsQuery().matches(logWithMetadata))
.forEach(
logWithMetadata ->
logsSubscriptions.stream()
.filter(
logsSubscription ->
logsSubscription.getLogsQuery().matches(logWithMetadata))
.forEach(
logsSubscription ->
subscriptionManager.sendMessage(
logsSubscription.getSubscriptionId(),
new LogResult(logWithMetadata))));
logsSubscription ->
subscriptionManager.sendMessage(
logsSubscription.getSubscriptionId(), new LogResult(logWithMetadata)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class LogsSubscriptionServiceTest {
@Before
public void before() {
logsSubscriptionService = new LogsSubscriptionService(subscriptionManager);
blockchain.observeBlockAdded(logsSubscriptionService);
blockchain.observeLogs(logsSubscriptionService);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.core.LogWithMetadata;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.util.uint.UInt256;

import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;

/** An interface for reading data from the blockchain. */
public interface Blockchain {
Expand Down Expand Up @@ -184,6 +186,19 @@ default boolean contains(final Hash blockHash) {
*/
long observeBlockAdded(BlockAddedObserver observer);

/**
* Adds an observer that will get called on for every added and removed log when a new block is
* added.
*
* <p><i>No guarantees are made about the order in which the observers are invoked.</i>
*
* @param logObserver the observer to call
* @return the observer ID that can be used to remove it later.
*/
default long observeLogs(final Consumer<LogWithMetadata> logObserver) {
return observeBlockAdded(((event, __) -> event.getLogsWithMetadata().forEach(logObserver)));
}

/**
* Removes an previously added observer of any type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ public static Address privateContractAddress(
})));
}

public static Address fromPlugin(final org.hyperledger.besu.plugin.data.Address logger) {
return logger instanceof Address ? (Address) logger : wrap(BytesValue.fromPlugin(logger));
}

@Override
public Address copy() {
final BytesValue copiedStorage = wrapped.copy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public static Hash fromHexStringLenient(final String str) {
return new Hash(Bytes32.fromHexStringLenient(str));
}

public static Hash fromPlugin(final org.hyperledger.besu.plugin.data.Hash blockHash) {
return blockHash instanceof Hash ? (Hash) blockHash : wrap(Bytes32.fromPlugin(blockHash));
}

@Override
public byte[] getByteArray() {
return super.getByteArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.hyperledger.besu.ethereum.rlp.RLPInput;
import org.hyperledger.besu.ethereum.rlp.RLPOutput;
import org.hyperledger.besu.plugin.data.UnformattedData;
import org.hyperledger.besu.util.bytes.BytesValue;
import org.hyperledger.besu.util.bytes.DelegatingBytesValue;

Expand Down Expand Up @@ -57,6 +58,10 @@ public static LogTopic readFrom(final RLPInput in) {
return new LogTopic(in.readBytesValue());
}

public static LogTopic fromPlugin(final UnformattedData data) {
return data instanceof LogTopic ? (LogTopic) data : wrap(BytesValue.fromPlugin(data));
}

/**
* Writes the log topic to the provided RLP output.
*
Expand Down
Loading

0 comments on commit 271d578

Please sign in to comment.