From 271d5789567c8a39241db21cb0919819392a1a9b Mon Sep 17 00:00:00 2001 From: Ratan Rai Sur Date: Wed, 13 Nov 2019 17:03:38 -0500 Subject: [PATCH] Log Event Streaming for Plugin API (#186) Signed-off-by: Ratan Rai Sur --- .../dsl/node/ThreadBesuNodeRunner.java | 1 + .../org/hyperledger/besu/RunnerBuilder.java | 2 +- .../org/hyperledger/besu/cli/BesuCommand.java | 1 + .../besu/services/BesuEventsImpl.java | 42 +++++++++++ .../besu/services/BesuEventsImplTest.java | 71 ++++++++++++++++--- .../logs/LogsSubscriptionService.java | 30 +++----- .../logs/LogsSubscriptionServiceTest.java | 2 +- .../besu/ethereum/chain/Blockchain.java | 15 ++++ .../besu/ethereum/core/Address.java | 4 ++ .../hyperledger/besu/ethereum/core/Hash.java | 4 ++ .../besu/ethereum/core/LogTopic.java | 5 ++ .../besu/ethereum/core/LogWithMetadata.java | 50 ++++++------- plugin-api/build.gradle | 2 +- .../besu/plugin/data/LogWithMetadata.java | 57 +++++++++++++++ .../besu/plugin/services/BesuEvents.java | 54 +++++++++++--- .../hyperledger/besu/util/bytes/Bytes32.java | 5 ++ .../besu/util/bytes/BytesValue.java | 4 ++ 17 files changed, 281 insertions(+), 68 deletions(-) create mode 100644 plugin-api/src/main/java/org/hyperledger/besu/plugin/data/LogWithMetadata.java diff --git a/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ThreadBesuNodeRunner.java b/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ThreadBesuNodeRunner.java index ab4918f2364..b46edbf73dd 100644 --- a/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ThreadBesuNodeRunner.java +++ b/acceptance-tests/dsl/src/main/java/org/hyperledger/besu/tests/acceptance/dsl/node/ThreadBesuNodeRunner.java @@ -167,6 +167,7 @@ public void startNode(final BesuNode node) { besuPluginContext.addService( BesuEvents.class, new BesuEventsImpl( + besuController.getProtocolContext().getBlockchain(), besuController.getProtocolManager().getBlockBroadcaster(), besuController.getTransactionPool(), besuController.getSyncState())); diff --git a/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java b/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java index 609330e6da7..5fe15eb54a3 100644 --- a/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java @@ -646,7 +646,7 @@ private void createLogsSubscriptionService( final LogsSubscriptionService logsSubscriptionService = new LogsSubscriptionService(subscriptionManager); - blockchain.observeBlockAdded(logsSubscriptionService); + blockchain.observeLogs(logsSubscriptionService); } private void createSyncingSubscriptionService( diff --git a/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java b/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java index e6de377a15b..7c269ea5b7b 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java @@ -937,6 +937,7 @@ private BesuCommand startPlugins() { besuPluginContext.addService( BesuEvents.class, new BesuEventsImpl( + besuController.getProtocolContext().getBlockchain(), besuController.getProtocolManager().getBlockBroadcaster(), besuController.getTransactionPool(), besuController.getSyncState())); diff --git a/besu/src/main/java/org/hyperledger/besu/services/BesuEventsImpl.java b/besu/src/main/java/org/hyperledger/besu/services/BesuEventsImpl.java index aa6570d20cd..04db4b46a13 100644 --- a/besu/src/main/java/org/hyperledger/besu/services/BesuEventsImpl.java +++ b/besu/src/main/java/org/hyperledger/besu/services/BesuEventsImpl.java @@ -14,25 +14,37 @@ */ package org.hyperledger.besu.services; +import static java.util.stream.Collectors.toUnmodifiableList; + +import org.hyperledger.besu.ethereum.api.query.LogsQuery; +import org.hyperledger.besu.ethereum.chain.Blockchain; +import org.hyperledger.besu.ethereum.core.LogTopic; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster; import org.hyperledger.besu.ethereum.eth.sync.state.SyncState; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; +import org.hyperledger.besu.plugin.data.Address; import org.hyperledger.besu.plugin.data.BlockHeader; import org.hyperledger.besu.plugin.data.PropagatedBlockContext; import org.hyperledger.besu.plugin.data.Quantity; +import org.hyperledger.besu.plugin.data.UnformattedData; import org.hyperledger.besu.plugin.services.BesuEvents; +import java.util.List; import java.util.function.Supplier; public class BesuEventsImpl implements BesuEvents { + private Blockchain blockchain; private final BlockBroadcaster blockBroadcaster; private final TransactionPool transactionPool; private final SyncState syncState; public BesuEventsImpl( + final Blockchain blockchain, final BlockBroadcaster blockBroadcaster, final TransactionPool transactionPool, final SyncState syncState) { + this.blockchain = blockchain; this.blockBroadcaster = blockBroadcaster; this.transactionPool = transactionPool; this.syncState = syncState; @@ -83,6 +95,36 @@ public void removeSyncStatusListener(final long listenerIdentifier) { syncState.unsubscribeSyncStatus(listenerIdentifier); } + @Override + public long addLogListener( + final List
addresses, + final List> topics, + final LogListener logListener) { + final List besuAddresses = + addresses.stream() + .map(org.hyperledger.besu.ethereum.core.Address::fromPlugin) + .collect(toUnmodifiableList()); + final List> besuTopics = + topics.stream() + .map( + subList -> subList.stream().map(LogTopic::fromPlugin).collect(toUnmodifiableList())) + .collect(toUnmodifiableList()); + + final LogsQuery logsQuery = new LogsQuery(besuAddresses, besuTopics); + + return blockchain.observeLogs( + logWithMetadata -> { + if (logsQuery.matches(LogWithMetadata.fromPlugin(logWithMetadata))) { + logListener.onLogEmitted(logWithMetadata); + } + }); + } + + @Override + public void removeLogListener(final long listenerIdentifier) { + blockchain.removeObserver(listenerIdentifier); + } + private static PropagatedBlockContext blockPropagatedContext( final Supplier blockHeaderSupplier, final Supplier totalDifficultySupplier) { diff --git a/besu/src/test/java/org/hyperledger/besu/services/BesuEventsImplTest.java b/besu/src/test/java/org/hyperledger/besu/services/BesuEventsImplTest.java index 9eba7c60959..29934ad45fb 100644 --- a/besu/src/test/java/org/hyperledger/besu/services/BesuEventsImplTest.java +++ b/besu/src/test/java/org/hyperledger/besu/services/BesuEventsImplTest.java @@ -22,9 +22,11 @@ import org.hyperledger.besu.crypto.SECP256K1.KeyPair; import org.hyperledger.besu.ethereum.ProtocolContext; +import org.hyperledger.besu.ethereum.chain.DefaultBlockchain; import org.hyperledger.besu.ethereum.chain.MutableBlockchain; import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.core.BlockBody; +import org.hyperledger.besu.ethereum.core.BlockDataGenerator; import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture; import org.hyperledger.besu.ethereum.core.TransactionTestFixture; import org.hyperledger.besu.ethereum.core.Wei; @@ -39,19 +41,25 @@ import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolFactory; +import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec; import org.hyperledger.besu.ethereum.mainnet.TransactionValidator; import org.hyperledger.besu.ethereum.mainnet.ValidationResult; +import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueStoragePrefixedKeyBlockchainStorage; import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive; import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; +import org.hyperledger.besu.plugin.data.LogWithMetadata; import org.hyperledger.besu.plugin.data.PropagatedBlockContext; import org.hyperledger.besu.plugin.data.SyncStatus; import org.hyperledger.besu.plugin.data.Transaction; +import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage; import org.hyperledger.besu.testutil.TestClock; import org.hyperledger.besu.util.uint.UInt256; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; @@ -76,28 +84,29 @@ public class BesuEventsImplTest { @Mock private EthContext mockEthContext; @Mock private EthMessages mockEthMessages; @Mock private EthScheduler mockEthScheduler; - @Mock private MutableBlockchain mockBlockchain; @Mock private TransactionValidator mockTransactionValidator; @Mock private ProtocolSpec mockProtocolSpec; @Mock private WorldStateArchive mockWorldStateArchive; @Mock private WorldState mockWorldState; - private org.hyperledger.besu.ethereum.core.BlockHeader fakeBlockHeader; private TransactionPool transactionPool; private BlockBroadcaster blockBroadcaster; private BesuEventsImpl serviceImpl; + private MutableBlockchain blockchain; + private final BlockDataGenerator gen = new BlockDataGenerator(); @Before public void setUp() { - fakeBlockHeader = - new org.hyperledger.besu.ethereum.core.BlockHeader( - null, null, null, null, null, null, null, null, 1, 1, 1, 1, null, null, 1, null); - - when(mockBlockchain.getBlockHeader(any())).thenReturn(Optional.of(fakeBlockHeader)); + blockchain = + DefaultBlockchain.createMutable( + gen.genesisBlock(), + new KeyValueStoragePrefixedKeyBlockchainStorage( + new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions()), + new NoOpMetricsSystem()); when(mockEthContext.getEthMessages()).thenReturn(mockEthMessages); when(mockEthContext.getEthPeers()).thenReturn(mockEthPeers); when(mockEthContext.getScheduler()).thenReturn(mockEthScheduler); when(mockEthPeers.streamAvailablePeers()).thenReturn(Stream.empty()).thenReturn(Stream.empty()); - when(mockProtocolContext.getBlockchain()).thenReturn(mockBlockchain); + when(mockProtocolContext.getBlockchain()).thenReturn(blockchain); when(mockProtocolContext.getWorldStateArchive()).thenReturn(mockWorldStateArchive); when(mockProtocolSchedule.getByBlockNumber(anyLong())).thenReturn(mockProtocolSpec); when(mockProtocolSpec.getTransactionValidator()).thenReturn(mockTransactionValidator); @@ -107,6 +116,7 @@ public void setUp() { when(mockWorldStateArchive.get(any())).thenReturn(Optional.of(mockWorldState)); blockBroadcaster = new BlockBroadcaster(mockEthContext); + syncState = new SyncState(blockchain, mockEthPeers); transactionPool = TransactionPoolFactory.createTransactionPool( mockProtocolSchedule, @@ -117,9 +127,8 @@ public void setUp() { syncState, Wei.ZERO, TransactionPoolConfiguration.builder().txPoolMaxSize(1).build()); - syncState = new SyncState(mockBlockchain, mockEthPeers); - serviceImpl = new BesuEventsImpl(blockBroadcaster, transactionPool, syncState); + serviceImpl = new BesuEventsImpl(blockchain, blockBroadcaster, transactionPool, syncState); } @Test @@ -157,7 +166,10 @@ public void syncStatusEventDoesNotFireAfterUnsubscribe() { } private void setSyncTarget() { - syncState.setSyncTarget(mock(EthPeer.class), fakeBlockHeader); + syncState.setSyncTarget( + mock(EthPeer.class), + new org.hyperledger.besu.ethereum.core.BlockHeader( + null, null, null, null, null, null, null, null, 1, 1, 1, 1, null, null, 1, null)); } private void clearSyncTarget() { @@ -270,6 +282,43 @@ public void transactionDroppedEventDoesNotFireAfterUnsubscribe() { assertThat(result.get()).isNull(); } + @Test + public void logEventFiresAfterSubscribe() { + final List result = new ArrayList<>(); + blockchain.observeLogs(result::add); + + assertThat(result).isEmpty(); + final var block = + gen.block( + new BlockDataGenerator.BlockOptions() + .setParentHash(blockchain.getGenesisBlock().getHash())); + blockchain.appendBlock(block, gen.receipts(block)); + assertThat(result).hasSize(4); + } + + @Test + public void logEventDoesNotFireAfterUnsubscribe() { + final List result = new ArrayList<>(); + final long id = blockchain.observeLogs(result::add); + + assertThat(result).isEmpty(); + final var block = + gen.block( + new BlockDataGenerator.BlockOptions() + .setParentHash(blockchain.getGenesisBlock().getHash())); + blockchain.appendBlock(block, gen.receipts(block)); + assertThat(result).hasSize(4); + + result.clear(); + + serviceImpl.removeLogListener(id); + final var block2 = + gen.block(new BlockDataGenerator.BlockOptions().setParentHash(block.getHash())); + blockchain.appendBlock(block2, gen.receipts(block2)); + + assertThat(result).isEmpty(); + } + private Block generateBlock() { final BlockBody body = new BlockBody(Collections.emptyList(), Collections.emptyList()); return new Block(new BlockHeaderTestFixture().buildHeader(), body); diff --git a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/logs/LogsSubscriptionService.java b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/logs/LogsSubscriptionService.java index b4f49d323f7..92f954a9ce4 100644 --- a/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/logs/LogsSubscriptionService.java +++ b/ethereum/api/src/main/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/logs/LogsSubscriptionService.java @@ -17,13 +17,11 @@ import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.LogResult; import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager; import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscriptionType; -import org.hyperledger.besu.ethereum.chain.BlockAddedEvent; -import org.hyperledger.besu.ethereum.chain.BlockAddedObserver; -import org.hyperledger.besu.ethereum.chain.Blockchain; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; -import java.util.List; +import java.util.function.Consumer; -public class LogsSubscriptionService implements BlockAddedObserver { +public class LogsSubscriptionService implements Consumer { private final SubscriptionManager subscriptionManager; @@ -32,22 +30,12 @@ public LogsSubscriptionService(final SubscriptionManager subscriptionManager) { } @Override - public void onBlockAdded(final BlockAddedEvent event, final Blockchain __) { - final List logsSubscriptions = - subscriptionManager.subscriptionsOfType(SubscriptionType.LOGS, LogsSubscription.class); - - event - .getLogsWithMetadata() + public void accept(final LogWithMetadata logWithMetadata) { + subscriptionManager.subscriptionsOfType(SubscriptionType.LOGS, LogsSubscription.class).stream() + .filter(logsSubscription -> logsSubscription.getLogsQuery().matches(logWithMetadata)) .forEach( - logWithMetadata -> - logsSubscriptions.stream() - .filter( - logsSubscription -> - logsSubscription.getLogsQuery().matches(logWithMetadata)) - .forEach( - logsSubscription -> - subscriptionManager.sendMessage( - logsSubscription.getSubscriptionId(), - new LogResult(logWithMetadata)))); + logsSubscription -> + subscriptionManager.sendMessage( + logsSubscription.getSubscriptionId(), new LogResult(logWithMetadata))); } } diff --git a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/logs/LogsSubscriptionServiceTest.java b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/logs/LogsSubscriptionServiceTest.java index 25503ba2dde..96833dee975 100644 --- a/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/logs/LogsSubscriptionServiceTest.java +++ b/ethereum/api/src/test/java/org/hyperledger/besu/ethereum/api/jsonrpc/websocket/subscription/logs/LogsSubscriptionServiceTest.java @@ -70,7 +70,7 @@ public class LogsSubscriptionServiceTest { @Before public void before() { logsSubscriptionService = new LogsSubscriptionService(subscriptionManager); - blockchain.observeBlockAdded(logsSubscriptionService); + blockchain.observeLogs(logsSubscriptionService); } @Test diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/Blockchain.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/Blockchain.java index ff4ea8b9516..f4db89b20e5 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/Blockchain.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/chain/Blockchain.java @@ -18,12 +18,14 @@ import org.hyperledger.besu.ethereum.core.BlockBody; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.Hash; +import org.hyperledger.besu.ethereum.core.LogWithMetadata; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.core.TransactionReceipt; import org.hyperledger.besu.util.uint.UInt256; import java.util.List; import java.util.Optional; +import java.util.function.Consumer; /** An interface for reading data from the blockchain. */ public interface Blockchain { @@ -184,6 +186,19 @@ default boolean contains(final Hash blockHash) { */ long observeBlockAdded(BlockAddedObserver observer); + /** + * Adds an observer that will get called on for every added and removed log when a new block is + * added. + * + *

No guarantees are made about the order in which the observers are invoked. + * + * @param logObserver the observer to call + * @return the observer ID that can be used to remove it later. + */ + default long observeLogs(final Consumer logObserver) { + return observeBlockAdded(((event, __) -> event.getLogsWithMetadata().forEach(logObserver))); + } + /** * Removes an previously added observer of any type. * diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Address.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Address.java index 8ce41fe3ed9..bbc14af4ed0 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Address.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Address.java @@ -176,6 +176,10 @@ public static Address privateContractAddress( }))); } + public static Address fromPlugin(final org.hyperledger.besu.plugin.data.Address logger) { + return logger instanceof Address ? (Address) logger : wrap(BytesValue.fromPlugin(logger)); + } + @Override public Address copy() { final BytesValue copiedStorage = wrapped.copy(); diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Hash.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Hash.java index 70f663057e2..45e72d78057 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Hash.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/Hash.java @@ -68,6 +68,10 @@ public static Hash fromHexStringLenient(final String str) { return new Hash(Bytes32.fromHexStringLenient(str)); } + public static Hash fromPlugin(final org.hyperledger.besu.plugin.data.Hash blockHash) { + return blockHash instanceof Hash ? (Hash) blockHash : wrap(Bytes32.fromPlugin(blockHash)); + } + @Override public byte[] getByteArray() { return super.getByteArray(); diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/LogTopic.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/LogTopic.java index 0eb1579f353..2f3a0039905 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/LogTopic.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/LogTopic.java @@ -18,6 +18,7 @@ import org.hyperledger.besu.ethereum.rlp.RLPInput; import org.hyperledger.besu.ethereum.rlp.RLPOutput; +import org.hyperledger.besu.plugin.data.UnformattedData; import org.hyperledger.besu.util.bytes.BytesValue; import org.hyperledger.besu.util.bytes.DelegatingBytesValue; @@ -57,6 +58,10 @@ public static LogTopic readFrom(final RLPInput in) { return new LogTopic(in.readBytesValue()); } + public static LogTopic fromPlugin(final UnformattedData data) { + return data instanceof LogTopic ? (LogTopic) data : wrap(BytesValue.fromPlugin(data)); + } + /** * Writes the log topic to the provided RLP output. * diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/LogWithMetadata.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/LogWithMetadata.java index 79a616f22a3..3aecfac29c7 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/LogWithMetadata.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/core/LogWithMetadata.java @@ -16,6 +16,8 @@ */ package org.hyperledger.besu.ethereum.core; +import static java.util.stream.Collectors.toUnmodifiableList; + import org.hyperledger.besu.util.bytes.BytesValue; import java.util.ArrayList; @@ -23,16 +25,14 @@ import com.google.common.base.MoreObjects; -public class LogWithMetadata extends Log { +public class LogWithMetadata extends Log + implements org.hyperledger.besu.plugin.data.LogWithMetadata { private final int logIndex; private final long blockNumber; private final Hash blockHash; private final Hash transactionHash; private final int transactionIndex; - private final Address address; - private final BytesValue data; - private final List topics; private final boolean removed; public LogWithMetadata( @@ -51,9 +51,6 @@ public LogWithMetadata( this.blockHash = blockHash; this.transactionHash = transactionHash; this.transactionIndex = transactionIndex; - this.address = address; - this.data = data; - this.topics = topics; this.removed = removed; } @@ -101,41 +98,32 @@ public static List generate( // The index of this log within the entire ordered list of logs associated with the block this log // belongs to. + @Override public int getLogIndex() { return logIndex; } + @Override public long getBlockNumber() { return blockNumber; } + @Override public Hash getBlockHash() { return blockHash; } + @Override public Hash getTransactionHash() { return transactionHash; } + @Override public int getTransactionIndex() { return transactionIndex; } @Override - public Address getLogger() { - return address; - } - - @Override - public BytesValue getData() { - return data; - } - - @Override - public List getTopics() { - return topics; - } - public boolean isRemoved() { return removed; } @@ -148,10 +136,24 @@ public String toString() { .add("blockHash", blockHash) .add("transactionHash", transactionHash) .add("transactionIndex", transactionIndex) - .add("address", address) - .add("data", data) - .add("topics", topics) + .add("address", getLogger()) + .add("data", getData()) + .add("topics", getTopics()) .add("removed", removed) .toString(); } + + public static LogWithMetadata fromPlugin( + final org.hyperledger.besu.plugin.data.LogWithMetadata pluginObject) { + return new LogWithMetadata( + pluginObject.getLogIndex(), + pluginObject.getBlockNumber(), + Hash.fromPlugin(pluginObject.getBlockHash()), + Hash.fromPlugin(pluginObject.getTransactionHash()), + pluginObject.getTransactionIndex(), + Address.fromPlugin(pluginObject.getLogger()), + BytesValue.fromPlugin(pluginObject.getData()), + pluginObject.getTopics().stream().map(LogTopic::fromPlugin).collect(toUnmodifiableList()), + pluginObject.isRemoved()); + } } diff --git a/plugin-api/build.gradle b/plugin-api/build.gradle index 6560ecc17ff..e8680d80837 100644 --- a/plugin-api/build.gradle +++ b/plugin-api/build.gradle @@ -56,7 +56,7 @@ Calculated : ${currentHash} tasks.register('checkAPIChanges', FileStateChecker) { description = "Checks that the API for the Plugin-API project does not change without deliberate thought" files = sourceSets.main.allJava.files - knownHash = 'Uz20ItZSHoOEJdONKKwUtoyZxkCVF+bbw20IMmZfSZg=' + knownHash = 'P9SmXjXCQeQOsnXM9pfXKUkJkrsX48MafIDEx+tHmFs=' } check.dependsOn('checkAPIChanges') diff --git a/plugin-api/src/main/java/org/hyperledger/besu/plugin/data/LogWithMetadata.java b/plugin-api/src/main/java/org/hyperledger/besu/plugin/data/LogWithMetadata.java new file mode 100644 index 00000000000..44a76f1a8c6 --- /dev/null +++ b/plugin-api/src/main/java/org/hyperledger/besu/plugin/data/LogWithMetadata.java @@ -0,0 +1,57 @@ +/* + * Copyright ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.plugin.data; + +import org.hyperledger.besu.plugin.Unstable; + +import java.util.List; + +/** A Log entry from a transaction execution. */ +@Unstable +public interface LogWithMetadata { + + /** + * The address of the contract writing this log message. + * + * @return The loggers address. + */ + Address getLogger(); + + /** + * The list of 32 byte log topics, possibly empty. + * + * @return The list, possibly zero length, of log topics. + */ + List getTopics(); + + /** + * The data, of possibly unlimited length, for this log entry. + * + * @return The log data. + */ + UnformattedData getData(); + + int getLogIndex(); + + long getBlockNumber(); + + Hash getBlockHash(); + + Hash getTransactionHash(); + + int getTransactionIndex(); + + boolean isRemoved(); +} diff --git a/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/BesuEvents.java b/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/BesuEvents.java index 62ef32ae3f5..5ae591ab2ab 100644 --- a/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/BesuEvents.java +++ b/plugin-api/src/main/java/org/hyperledger/besu/plugin/services/BesuEvents.java @@ -15,10 +15,14 @@ package org.hyperledger.besu.plugin.services; import org.hyperledger.besu.plugin.Unstable; +import org.hyperledger.besu.plugin.data.Address; +import org.hyperledger.besu.plugin.data.LogWithMetadata; import org.hyperledger.besu.plugin.data.PropagatedBlockContext; import org.hyperledger.besu.plugin.data.SyncStatus; import org.hyperledger.besu.plugin.data.Transaction; +import org.hyperledger.besu.plugin.data.UnformattedData; +import java.util.List; import java.util.Optional; /** @@ -43,14 +47,14 @@ public interface BesuEvents { * Add a listener watching new blocks propagated. * * @param blockPropagatedListener The listener that will accept a BlockHeader as the event. - * @return an object to be used as an identifier when de-registering the event. + * @return an id to be used as an identifier when de-registering the event. */ long addBlockPropagatedListener(BlockPropagatedListener blockPropagatedListener); /** * Remove the blockAdded listener from besu notifications. * - * @param listenerIdentifier The instance that was returned from addBlockAddedListener; + * @param listenerIdentifier The id that was returned from addBlockAddedListener; */ void removeBlockPropagatedListener(long listenerIdentifier); @@ -59,14 +63,14 @@ public interface BesuEvents { * * @param transactionAddedListener The listener that will accept the Transaction object as the * event. - * @return an object to be used as an identifier when de-registering the event. + * @return an id to be used as an identifier when de-registering the event. */ long addTransactionAddedListener(TransactionAddedListener transactionAddedListener); /** * Remove the blockAdded listener from besu notifications. * - * @param listenerIdentifier The instance that was returned from addTransactionAddedListener; + * @param listenerIdentifier The id that was returned from addTransactionAddedListener; */ void removeTransactionAddedListener(long listenerIdentifier); @@ -75,14 +79,14 @@ public interface BesuEvents { * * @param transactionDroppedListener The listener that will accept the Transaction object as the * event. - * @return an object to be used as an identifier when de-registering the event. + * @return an id to be used as an identifier when de-registering the event. */ long addTransactionDroppedListener(TransactionDroppedListener transactionDroppedListener); /** * Remove the transactionDropped listener from besu notifications. * - * @param listenerIdentifier The instance that was returned from addTransactionDroppedListener; + * @param listenerIdentifier The id that was returned from addTransactionDroppedListener; */ void removeTransactionDroppedListener(long listenerIdentifier); @@ -90,17 +94,37 @@ public interface BesuEvents { * Add a listener watching the synchronizer status. * * @param syncStatusListener The listener that will accept the SyncStatus object as the event. - * @return an object to be used as an identifier when de-registering the event. + * @return The id to be used as an identifier when de-registering the event. */ long addSyncStatusListener(SyncStatusListener syncStatusListener); /** - * Remove the logs listener from besu notifications. + * Remove the sync status listener from besu notifications. * - * @param listenerIdentifier The instance that was returned from addTransactionDroppedListener; + * @param listenerIdentifier The id that was returned from addTransactionDroppedListener; */ void removeSyncStatusListener(long listenerIdentifier); + /** + * Add a listener that consumes every log (both added and removed) that matches the filter + * parameters when a new block is added to the blockchain. + * + * @param addresses The addresses from which the log filter will be created + * @param topics The topics from which the log filter will be created + * @param logListener The listener that will accept the log. + * @return The id of the listener to be referred to used to remove the listener. + */ + long addLogListener( + List

addresses, List> topics, LogListener logListener); + + /** + * Remove the log listener with the associated id. + * + * @param listenerIdentifier The id of the listener that was returned when the listener was + * created. + */ + void removeLogListener(long listenerIdentifier); + /** The listener interface for receiving new block propagated events. */ interface BlockPropagatedListener { @@ -148,4 +172,16 @@ interface SyncStatusListener { */ void onSyncStatusChanged(Optional syncStatus); } + + /** The listener interface for receiving log events. */ + interface LogListener { + + /** + * Invoked for each log (both added and removed) when a new block is added to the blockchain. + * + * @param logWithMetadata the log with associated metadata. see + * https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterchanges + */ + void onLogEmitted(LogWithMetadata logWithMetadata); + } } diff --git a/util/src/main/java/org/hyperledger/besu/util/bytes/Bytes32.java b/util/src/main/java/org/hyperledger/besu/util/bytes/Bytes32.java index f6c85371b8c..1a2bf34b839 100644 --- a/util/src/main/java/org/hyperledger/besu/util/bytes/Bytes32.java +++ b/util/src/main/java/org/hyperledger/besu/util/bytes/Bytes32.java @@ -16,6 +16,7 @@ import static com.google.common.base.Preconditions.checkArgument; +import org.hyperledger.besu.plugin.data.UnformattedData; import org.hyperledger.besu.util.uint.Int256; import org.hyperledger.besu.util.uint.UInt256; import org.hyperledger.besu.util.uint.UInt256Bytes; @@ -168,6 +169,10 @@ static Bytes32 fromHexStringStrict(final String str) { return wrap(BytesValues.fromRawHexString(str, -1, false)); } + static Bytes32 fromPlugin(final UnformattedData data) { + return data instanceof Bytes32 ? (Bytes32) data : wrap(data.getByteArray()); + } + @Override default int size() { return SIZE; diff --git a/util/src/main/java/org/hyperledger/besu/util/bytes/BytesValue.java b/util/src/main/java/org/hyperledger/besu/util/bytes/BytesValue.java index 6f1d8f29202..2a7d00ec55d 100644 --- a/util/src/main/java/org/hyperledger/besu/util/bytes/BytesValue.java +++ b/util/src/main/java/org/hyperledger/besu/util/bytes/BytesValue.java @@ -126,6 +126,10 @@ public BytesValue slice(final int i, final int length) { }; } + static BytesValue fromPlugin(final UnformattedData data) { + return data instanceof BytesValue ? (BytesValue) data : wrap(data.getByteArray()); + } + default BytesValue concat(final BytesValue value) { return wrap(this, value); }