From fab7f928fc29a37c3419eb0ea8f4bf64f89ce8e7 Mon Sep 17 00:00:00 2001 From: Fabio Di Fabio Date: Mon, 8 Apr 2024 13:03:07 +0200 Subject: [PATCH 1/8] Add tx count by type txpool metric Signed-off-by: Fabio Di Fabio --- .../transactions/TransactionPoolMetrics.java | 27 +++++++++++++++++++ .../layered/AbstractTransactionsLayer.java | 24 ++++++++++++----- 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolMetrics.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolMetrics.java index 20657304f0f..5848296ce3c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolMetrics.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolMetrics.java @@ -14,6 +14,7 @@ */ package org.hyperledger.besu.ethereum.eth.transactions; +import org.hyperledger.besu.datatypes.TransactionType; import org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason; import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.metrics.ReplaceableDoubleSupplier; @@ -27,6 +28,7 @@ import java.util.Map; import java.util.function.DoubleSupplier; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,12 +45,15 @@ public class TransactionPoolMetrics { private final LabelledMetric rejectedCounter; private final LabelledGauge spaceUsed; private final LabelledGauge transactionCount; + private final LabelledGauge transactionCountByType; private final LabelledGauge uniqueSenderCount; private final LabelledMetric expiredMessagesCounter; private final Map expiredMessagesRunnableCounters = new HashMap<>(); private final LabelledMetric alreadySeenTransactionsCounter; private final Map spaceUsedSuppliers = new HashMap<>(); private final Map transactionCountSuppliers = new HashMap<>(); + private final Map, ReplaceableDoubleSupplier> + transactionCountByTypeSuppliers = new HashMap<>(); private final Map uniqueSendersSuppliers = new HashMap<>(); public TransactionPoolMetrics(final MetricsSystem metricsSystem) { @@ -97,6 +102,14 @@ public TransactionPoolMetrics(final MetricsSystem metricsSystem) { "The number of transactions currently present in the layer", "layer"); + transactionCountByType = + metricsSystem.createLabelledGauge( + BesuMetricCategory.TRANSACTION_POOL, + "number_of_transactions_by_type", + "The number of transactions, of a specified type, currently present in the layer", + "layer", + "type"); + uniqueSenderCount = metricsSystem.createLabelledGauge( BesuMetricCategory.TRANSACTION_POOL, @@ -136,6 +149,20 @@ public void initSpaceUsed(final DoubleSupplier spaceUsedSupplier, final String l }); } + public void initTransactionCountByType( + final DoubleSupplier spaceUsedSupplier, final String layer, final TransactionType type) { + transactionCountByTypeSuppliers.compute( + Pair.of(layer, type), + (unused, existingSupplier) -> { + if (existingSupplier == null) { + final var newSupplier = new ReplaceableDoubleSupplier(spaceUsedSupplier); + transactionCountByType.labels(newSupplier, layer, type.name()); + return newSupplier; + } + return existingSupplier.replaceDoubleSupplier(spaceUsedSupplier); + }); + } + public void initTransactionCount( final DoubleSupplier transactionCountSupplier, final String layer) { transactionCountSuppliers.compute( diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java index e32eb0e2291..68ecb99c5c7 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java @@ -26,6 +26,7 @@ import org.hyperledger.besu.datatypes.Address; import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.datatypes.TransactionType; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.transactions.BlobCache; @@ -39,6 +40,7 @@ import org.hyperledger.besu.util.Subscribers; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -74,7 +76,7 @@ public abstract class AbstractTransactionsLayer implements TransactionsLayer { private OptionalLong nextLayerOnAddedListenerId = OptionalLong.empty(); private OptionalLong nextLayerOnDroppedListenerId = OptionalLong.empty(); protected long spaceUsed = 0; - + protected final int[] txCountByType = new int[TransactionType.values().length]; private final BlobCache blobCache; protected AbstractTransactionsLayer( @@ -91,6 +93,11 @@ protected AbstractTransactionsLayer( metrics.initSpaceUsed(this::getLayerSpaceUsed, name()); metrics.initTransactionCount(pendingTransactions::size, name()); metrics.initUniqueSenderCount(txsBySender::size, name()); + Arrays.stream(TransactionType.values()) + .forEach( + type -> + metrics.initTransactionCountByType( + () -> txCountByType[type.ordinal()], name(), type)); this.blobCache = blobCache; } @@ -101,6 +108,7 @@ public void reset() { pendingTransactions.clear(); txsBySender.clear(); spaceUsed = 0; + Arrays.fill(txCountByType, 0); nextLayer.reset(); } @@ -286,7 +294,7 @@ private void processAdded(final PendingTransaction addedTx) { pendingTransactions.put(addedTx.getHash(), addedTx); final var senderTxs = txsBySender.computeIfAbsent(addedTx.getSender(), s -> new TreeMap<>()); senderTxs.put(addedTx.getNonce(), addedTx); - increaseSpaceUsed(addedTx); + increaseCounters(addedTx); metrics.incrementAdded(addedTx, name()); internalAdd(senderTxs, addedTx); } @@ -332,7 +340,7 @@ private void evict(final long spaceToFree, final int txsToEvict) { protected void replaced(final PendingTransaction replacedTx) { pendingTransactions.remove(replacedTx.getHash()); - decreaseSpaceUsed(replacedTx); + decreaseCounters(replacedTx); metrics.incrementRemoved(replacedTx, REPLACED.label(), name()); internalReplaced(replacedTx); notifyTransactionDropped(replacedTx); @@ -368,7 +376,7 @@ protected PendingTransaction processRemove( final PendingTransaction removedTx = pendingTransactions.remove(transaction.getHash()); if (removedTx != null) { - decreaseSpaceUsed(removedTx); + decreaseCounters(removedTx); metrics.incrementRemoved(removedTx, removalReason.label(), name()); internalRemove(senderTxs, removedTx, removalReason); } @@ -381,7 +389,7 @@ protected PendingTransaction processEvict( final RemovalReason reason) { final PendingTransaction removedTx = pendingTransactions.remove(evictedTx.getHash()); if (removedTx != null) { - decreaseSpaceUsed(evictedTx); + decreaseCounters(evictedTx); metrics.incrementRemoved(evictedTx, reason.label(), name()); internalEvict(senderTxs, removedTx); } @@ -467,12 +475,14 @@ protected abstract void internalRemove( protected abstract PendingTransaction getEvictable(); - protected void increaseSpaceUsed(final PendingTransaction pendingTransaction) { + protected void increaseCounters(final PendingTransaction pendingTransaction) { spaceUsed += pendingTransaction.memorySize(); + ++txCountByType[pendingTransaction.getTransaction().getType().ordinal()]; } - protected void decreaseSpaceUsed(final PendingTransaction pendingTransaction) { + protected void decreaseCounters(final PendingTransaction pendingTransaction) { spaceUsed -= pendingTransaction.memorySize(); + --txCountByType[pendingTransaction.getTransaction().getType().ordinal()]; } protected abstract long cacheFreeSpace(); From 6257d7fca7ebd58143ad865dc20d07fc6807c504 Mon Sep 17 00:00:00 2001 From: Fabio Di Fabio Date: Mon, 8 Apr 2024 14:55:28 +0200 Subject: [PATCH 2/8] Update CHANGELOG Signed-off-by: Fabio Di Fabio --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 506de63a077..bb145bebff6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,7 @@ - Update Web3j dependencies [#6811](https://github.com/hyperledger/besu/pull/6811) - Add `tx-pool-blob-price-bump` option to configure the price bump percentage required to replace blob transactions (by default 100%) [#6874](https://github.com/hyperledger/besu/pull/6874) - Log detailed timing of block creation steps [#6880](https://github.com/hyperledger/besu/pull/6880) +- Expose transaction count by type metrics for the layered txpool [#6903](https://github.com/hyperledger/besu/pull/6903) ### Bug fixes - Fix txpool dump/restore race condition [#6665](https://github.com/hyperledger/besu/pull/6665) From a704b55ab263a759a187703e512ef758b7bc03c5 Mon Sep 17 00:00:00 2001 From: Fabio Di Fabio Date: Mon, 8 Apr 2024 19:05:43 +0200 Subject: [PATCH 3/8] Add an option to limit the number of txs in the prioritized layer by type Signed-off-by: Fabio Di Fabio --- .../cli/options/TransactionPoolOptions.java | 16 +++++ .../TransactionPoolConfiguration.java | 10 +++ .../BaseFeePrioritizedTransactions.java | 9 +++ ...stractPrioritizedTransactionsTestBase.java | 6 ++ .../BaseFeePrioritizedTransactionsTest.java | 61 +++++++++++++++++++ .../layered/BaseTransactionPoolTest.java | 1 + 6 files changed, 103 insertions(+) diff --git a/besu/src/main/java/org/hyperledger/besu/cli/options/TransactionPoolOptions.java b/besu/src/main/java/org/hyperledger/besu/cli/options/TransactionPoolOptions.java index 0ededa3c5a5..32bbf3167f3 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/options/TransactionPoolOptions.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/options/TransactionPoolOptions.java @@ -27,6 +27,7 @@ import org.hyperledger.besu.cli.util.CommandLineUtils; import org.hyperledger.besu.config.GenesisConfigOptions; import org.hyperledger.besu.datatypes.Address; +import org.hyperledger.besu.datatypes.TransactionType; import org.hyperledger.besu.datatypes.Wei; import org.hyperledger.besu.ethereum.eth.transactions.ImmutableTransactionPoolConfiguration; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; @@ -37,6 +38,7 @@ import java.io.File; import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.Set; import picocli.CommandLine; @@ -154,6 +156,8 @@ public class TransactionPoolOptions implements CLIOptions txPoolMaxPrioritizedByType = + TransactionPoolConfiguration.DEFAULT_MAX_PRIORITIZED_TRANSACTIONS_BY_TYPE; + @CommandLine.Option( names = {TX_POOL_MAX_FUTURE_BY_SENDER}, paramLabel = MANDATORY_INTEGER_FORMAT_HELP, @@ -296,6 +309,8 @@ public static TransactionPoolOptions fromConfig(final TransactionPoolConfigurati options.layeredOptions.txPoolLayerMaxCapacity = config.getPendingTransactionsLayerMaxCapacityBytes(); options.layeredOptions.txPoolMaxPrioritized = config.getMaxPrioritizedTransactions(); + options.layeredOptions.txPoolMaxPrioritizedByType = + config.getMaxPrioritizedTransactionsByType(); options.layeredOptions.txPoolMaxFutureBySender = config.getMaxFutureBySender(); options.sequencedOptions.txPoolLimitByAccountPercentage = config.getTxPoolLimitByAccountPercentage(); @@ -353,6 +368,7 @@ public TransactionPoolConfiguration toDomainObject() { .minGasPrice(minGasPrice) .pendingTransactionsLayerMaxCapacityBytes(layeredOptions.txPoolLayerMaxCapacity) .maxPrioritizedTransactions(layeredOptions.txPoolMaxPrioritized) + .maxPrioritizedTransactionsByType(layeredOptions.txPoolMaxPrioritizedByType) .maxFutureBySender(layeredOptions.txPoolMaxFutureBySender) .txPoolLimitByAccountPercentage(sequencedOptions.txPoolLimitByAccountPercentage) .txPoolMaxSize(sequencedOptions.txPoolMaxSize) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolConfiguration.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolConfiguration.java index 08610f507ab..18a8598ab86 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolConfiguration.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPoolConfiguration.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.ethereum.eth.transactions; import org.hyperledger.besu.datatypes.Address; +import org.hyperledger.besu.datatypes.TransactionType; import org.hyperledger.besu.datatypes.Wei; import org.hyperledger.besu.plugin.services.TransactionPoolValidatorService; import org.hyperledger.besu.plugin.services.txvalidator.PluginTransactionPoolValidator; @@ -24,6 +25,8 @@ import java.io.File; import java.time.Duration; +import java.util.EnumMap; +import java.util.Map; import java.util.Set; import org.immutables.value.Value; @@ -71,6 +74,8 @@ enum Implementation { File DEFAULT_SAVE_FILE = new File(DEFAULT_SAVE_FILE_NAME); long DEFAULT_PENDING_TRANSACTIONS_LAYER_MAX_CAPACITY_BYTES = 12_500_000L; int DEFAULT_MAX_PRIORITIZED_TRANSACTIONS = 2000; + EnumMap DEFAULT_MAX_PRIORITIZED_TRANSACTIONS_BY_TYPE = + new EnumMap<>(Map.of(TransactionType.BLOB, 6)); int DEFAULT_MAX_FUTURE_BY_SENDER = 200; Implementation DEFAULT_TX_POOL_IMPLEMENTATION = Implementation.LAYERED; Set
DEFAULT_PRIORITY_SENDERS = Set.of(); @@ -148,6 +153,11 @@ default int getMaxPrioritizedTransactions() { return DEFAULT_MAX_PRIORITIZED_TRANSACTIONS; } + @Value.Default + default Map getMaxPrioritizedTransactionsByType() { + return DEFAULT_MAX_PRIORITIZED_TRANSACTIONS_BY_TYPE; + } + @Value.Default default int getMaxFutureBySender() { return DEFAULT_MAX_FUTURE_BY_SENDER; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactions.java index e1d10e9ee2a..10db405c25c 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactions.java @@ -149,6 +149,15 @@ private Wei calculateNextBlockBaseFee(final FeeMarket feeMarket, final BlockHead @Override protected boolean promotionFilter(final PendingTransaction pendingTransaction) { + // check if there is space for that tx type + final var txType = pendingTransaction.getTransaction().getType(); + if (txCountByType[txType.ordinal()] + >= poolConfig + .getMaxPrioritizedTransactionsByType() + .getOrDefault(txType, Integer.MAX_VALUE)) { + return false; + } + // check if the tx is willing to pay at least the base fee if (nextBlockBaseFee .map(pendingTransaction.getTransaction().getMaxGasPrice()::lessThan) diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactionsTestBase.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactionsTestBase.java index 6fe7f9381c8..b4ba249b836 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactionsTestBase.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactionsTestBase.java @@ -18,6 +18,7 @@ import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ADDED; import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.DROPPED; +import org.hyperledger.besu.datatypes.TransactionType; import org.hyperledger.besu.datatypes.Wei; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.MiningParameters; @@ -30,8 +31,10 @@ import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolReplacementHandler; import java.util.ArrayList; +import java.util.EnumMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.function.BiFunction; import java.util.stream.IntStream; @@ -39,6 +42,8 @@ public abstract class AbstractPrioritizedTransactionsTestBase extends BaseTransactionPoolTest { protected static final int MAX_TRANSACTIONS = 5; + protected static final EnumMap MAX_TRANSACTIONS_BY_TYPE = + new EnumMap<>(Map.of(TransactionType.BLOB, 1)); protected final TransactionPoolMetrics txPoolMetrics = new TransactionPoolMetrics(metricsSystem); protected final EvictCollectorLayer evictCollector = new EvictCollectorLayer(txPoolMetrics); protected final MiningParameters miningParameters = @@ -49,6 +54,7 @@ public abstract class AbstractPrioritizedTransactionsTestBase extends BaseTransa getSorter( ImmutableTransactionPoolConfiguration.builder() .maxPrioritizedTransactions(MAX_TRANSACTIONS) + .maxPrioritizedTransactionsByType(MAX_TRANSACTIONS_BY_TYPE) .maxFutureBySender(MAX_TRANSACTIONS) .build(), miningParameters); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactionsTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactionsTest.java index 31785fa1898..c98ebf64951 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactionsTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactionsTest.java @@ -35,6 +35,7 @@ import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics; import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket; +import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -109,6 +110,8 @@ protected Transaction createTransactionReplacement( originalTransaction.getType(), originalTransaction.getNonce(), originalTransaction.getMaxGasPrice().multiply(2), + originalTransaction.getPayload().size(), + originalTransaction.getBlobCount(), keys); } @@ -188,6 +191,64 @@ public void txWithPriorityBelowCurrentMineableMinPriorityFeeIsPrioritized() { assertTransactionPrioritized(lowGasPriceTx); } + @Test + public void maxNumberOfTxsForTypeIsEnforced() { + final var limitedType = MAX_TRANSACTIONS_BY_TYPE.entrySet().iterator().next(); + final var maxNumber = limitedType.getValue(); + final var addedTxs = new ArrayList(maxNumber); + for (int i = 0; i < maxNumber; i++) { + final var tx = + createTransaction( + limitedType.getKey(), + 0, + DEFAULT_MIN_GAS_PRICE, + 0, + 1, + SIGNATURE_ALGORITHM.get().generateKeyPair()); + addedTxs.add(tx); + assertThat(prioritizeTransaction(tx)).isEqualTo(ADDED); + } + + final var overflowTx = + createTransaction( + limitedType.getKey(), + 0, + DEFAULT_MIN_GAS_PRICE, + 0, + 1, + SIGNATURE_ALGORITHM.get().generateKeyPair()); + assertThat(prioritizeTransaction(overflowTx)).isEqualTo(DROPPED); + + addedTxs.forEach(this::assertTransactionPrioritized); + assertTransactionNotPrioritized(overflowTx); + } + + @Test + public void maxNumberOfTxsForTypeWithReplacement() { + final var limitedType = MAX_TRANSACTIONS_BY_TYPE.entrySet().iterator().next(); + final var maxNumber = limitedType.getValue(); + final var addedTxs = new ArrayList(maxNumber); + for (int i = 0; i < maxNumber; i++) { + final var tx = createTransaction(limitedType.getKey(), i, DEFAULT_MIN_GAS_PRICE, 0, 1, KEYS1); + addedTxs.add(tx); + assertThat(prioritizeTransaction(tx)).isEqualTo(ADDED); + } + + final var replacedTx = addedTxs.get(0); + final var replacementTx = createTransactionReplacement(replacedTx, KEYS1); + final var txAddResult = prioritizeTransaction(replacementTx); + + assertThat(txAddResult.isReplacement()).isTrue(); + assertThat(txAddResult.maybeReplacedTransaction()) + .map(PendingTransaction::getTransaction) + .contains(replacedTx); + + addedTxs.remove(replacedTx); + addedTxs.forEach(this::assertTransactionPrioritized); + assertTransactionNotPrioritized(replacedTx); + assertTransactionPrioritized(replacementTx); + } + private void shouldPrioritizePriorityFeeThenTimeAddedToPoolSameTypeTxs( final TransactionType transactionType) { final PendingTransaction highGasPriceTransaction = diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseTransactionPoolTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseTransactionPoolTest.java index 160c60c66d9..c48b9f13c23 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseTransactionPoolTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseTransactionPoolTest.java @@ -147,6 +147,7 @@ protected TransactionTestFixture prepareTransaction( tx.maxFeePerGas(Optional.of(maxGasPrice)) .maxPriorityFeePerGas(Optional.of(maxGasPrice.divide(10))); if (type.supportsBlob() && blobCount > 0) { + tx.maxFeePerBlobGas(Optional.of(maxGasPrice)); final var versionHashes = IntStream.range(0, blobCount) .mapToObj(i -> new VersionedHash((byte) 1, Hash.ZERO)) From fadba40d4ced6a3b23a589f0d4a9092aedbaca4e Mon Sep 17 00:00:00 2001 From: Fabio Di Fabio Date: Tue, 9 Apr 2024 18:23:28 +0200 Subject: [PATCH 4/8] fix and tests Signed-off-by: Fabio Di Fabio --- .../cli/options/TransactionPoolOptions.java | 1 + .../options/TransactionPoolOptionsTest.java | 41 +++++++++++++++++++ ...stractPrioritizedTransactionsTestBase.java | 2 +- 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/besu/src/main/java/org/hyperledger/besu/cli/options/TransactionPoolOptions.java b/besu/src/main/java/org/hyperledger/besu/cli/options/TransactionPoolOptions.java index 2d3f96a3ad5..b6aa2bc645f 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/options/TransactionPoolOptions.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/options/TransactionPoolOptions.java @@ -182,6 +182,7 @@ static class Layered { @CommandLine.Option( names = {TX_POOL_MAX_PRIORITIZED_BY_TYPE}, paramLabel = "MAP", + split = ",", description = "Max number of pending transactions, of a specific type, that are prioritized and thus kept sorted (default: ${DEFAULT-VALUE})", arity = "1") diff --git a/besu/src/test/java/org/hyperledger/besu/cli/options/TransactionPoolOptionsTest.java b/besu/src/test/java/org/hyperledger/besu/cli/options/TransactionPoolOptionsTest.java index 6b7363eb915..a101803dbb0 100644 --- a/besu/src/test/java/org/hyperledger/besu/cli/options/TransactionPoolOptionsTest.java +++ b/besu/src/test/java/org/hyperledger/besu/cli/options/TransactionPoolOptionsTest.java @@ -21,11 +21,14 @@ import org.hyperledger.besu.cli.converter.DurationMillisConverter; import org.hyperledger.besu.datatypes.Address; +import org.hyperledger.besu.datatypes.TransactionType; import org.hyperledger.besu.datatypes.Wei; import org.hyperledger.besu.ethereum.eth.transactions.ImmutableTransactionPoolConfiguration; import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration; import org.hyperledger.besu.util.number.Percentage; +import java.io.IOException; +import java.nio.file.Path; import java.time.Duration; import org.junit.jupiter.api.Test; @@ -369,6 +372,44 @@ public void eth65TrxAnnouncedBufferingPeriodWithInvalidInputShouldFail2() { "-1"); } + @Test + public void maxPrioritizedTxsPerType() { + final int maxBlobs = 2; + final int maxFrontier = 200; + internalTestSuccess( + config -> { + assertThat(config.getMaxPrioritizedTransactionsByType().get(TransactionType.BLOB)) + .isEqualTo(maxBlobs); + assertThat(config.getMaxPrioritizedTransactionsByType().get(TransactionType.FRONTIER)) + .isEqualTo(maxFrontier); + }, + "--tx-pool-max-prioritized-by-type", + "BLOB=" + maxBlobs + ",FRONTIER=" + maxFrontier); + } + + @Test + public void maxPrioritizedTxsPerTypeConfigFile() throws IOException { + final int maxBlobs = 2; + final int maxFrontier = 200; + final Path tempConfigFilePath = + createTempFile( + "config", + String.format( + """ + tx-pool-max-prioritized-by-type=["BLOB=%s","FRONTIER=%s"] + """, + maxBlobs, maxFrontier)); + internalTestSuccess( + config -> { + assertThat(config.getMaxPrioritizedTransactionsByType().get(TransactionType.BLOB)) + .isEqualTo(maxBlobs); + assertThat(config.getMaxPrioritizedTransactionsByType().get(TransactionType.FRONTIER)) + .isEqualTo(maxFrontier); + }, + "--config-file", + tempConfigFilePath.toString()); + } + @Override protected TransactionPoolConfiguration createDefaultDomainObject() { return TransactionPoolConfiguration.DEFAULT; diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactionsTestBase.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactionsTestBase.java index b4ba249b836..095516487f0 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactionsTestBase.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactionsTestBase.java @@ -43,7 +43,7 @@ public abstract class AbstractPrioritizedTransactionsTestBase extends BaseTransactionPoolTest { protected static final int MAX_TRANSACTIONS = 5; protected static final EnumMap MAX_TRANSACTIONS_BY_TYPE = - new EnumMap<>(Map.of(TransactionType.BLOB, 1)); + new EnumMap<>(Map.of(TransactionType.BLOB, 2)); protected final TransactionPoolMetrics txPoolMetrics = new TransactionPoolMetrics(metricsSystem); protected final EvictCollectorLayer evictCollector = new EvictCollectorLayer(txPoolMetrics); protected final MiningParameters miningParameters = From 8523b0bb7b47ed9e00c91c99d4b94d0f10abdc46 Mon Sep 17 00:00:00 2001 From: Fabio Di Fabio Date: Thu, 11 Apr 2024 18:14:38 +0200 Subject: [PATCH 5/8] Enforce the limit by type when promoting txs Signed-off-by: Fabio Di Fabio --- .../eth/transactions/TransactionPool.java | 10 ++++++++++ .../AbstractPrioritizedTransactions.java | 18 +++++++++++++++++- .../layered/AbstractTransactionsLayer.java | 13 ++++++++++++- .../eth/transactions/layered/EndLayer.java | 3 ++- .../layered/ReadyTransactions.java | 11 +++++++++-- .../layered/SparseTransactions.java | 12 +++++++++--- .../layered/TransactionsLayer.java | 5 ++++- .../eth/transactions/layered/ReplayTest.java | 15 +++++++++++++++ 8 files changed, 78 insertions(+), 9 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java index b5171ac7d50..c069a771eb2 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/TransactionPool.java @@ -149,6 +149,16 @@ private void initLogForReplay() { .map(Address::toHexString) .collect(Collectors.joining(","))) .log(); + // log the max prioritized txs by type + LOG_FOR_REPLAY + .atTrace() + .setMessage("{}") + .addArgument( + () -> + configuration.getMaxPrioritizedTransactionsByType().entrySet().stream() + .map(e -> e.getKey().name() + "=" + e.getValue()) + .collect(Collectors.joining(","))) + .log(); } @VisibleForTesting diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java index 8929e221062..c7256ee499a 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.ethereum.eth.transactions.layered; import org.hyperledger.besu.datatypes.Address; +import org.hyperledger.besu.datatypes.TransactionType; import org.hyperledger.besu.ethereum.core.MiningParameters; import org.hyperledger.besu.ethereum.eth.transactions.BlobCache; import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction; @@ -123,10 +124,25 @@ protected void internalRemove( public List promote( final Predicate promotionFilter, final long freeSpace, - final int freeSlots) { + final int freeSlots, + final int[] maxPromotionsPerType) { return List.of(); } + @Override + protected int[] getMaxPromotionsPerType() { + final var allTypes = TransactionType.values(); + final var maxPromotionsPerType = new int[allTypes.length]; + for (int i = 0; i < allTypes.length; i++) { + maxPromotionsPerType[i] = + poolConfig + .getMaxPrioritizedTransactionsByType() + .getOrDefault(allTypes[i], Integer.MAX_VALUE) + - txCountByType[i]; + } + return maxPromotionsPerType; + } + @Override public Stream stream() { return orderByFee.descendingSet().stream(); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java index 68ecb99c5c7..1ff77fdc685 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java @@ -61,6 +61,13 @@ public abstract class AbstractTransactionsLayer implements TransactionsLayer { private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionsLayer.class); private static final NavigableMap EMPTY_SENDER_TXS = new TreeMap<>(); + private static final int[] UNLIMITED_PROMOTIONS_PER_TYPE = + new int[TransactionType.values().length]; + + static { + Arrays.fill(UNLIMITED_PROMOTIONS_PER_TYPE, Integer.MAX_VALUE); + } + protected final TransactionPoolConfiguration poolConfig; protected final TransactionsLayer nextLayer; protected final BiFunction @@ -425,11 +432,15 @@ final void promoteTransactions() { if (freeSlots > 0 && freeSpace > 0) { nextLayer - .promote(this::promotionFilter, cacheFreeSpace(), freeSlots) + .promote(this::promotionFilter, cacheFreeSpace(), freeSlots, getMaxPromotionsPerType()) .forEach(this::processAdded); } } + protected int[] getMaxPromotionsPerType() { + return UNLIMITED_PROMOTIONS_PER_TYPE; + } + private void confirmed(final Address sender, final long maxConfirmedNonce) { final var senderTxs = txsBySender.get(sender); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java index e79ce1d8a71..0fdf5663c88 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java @@ -122,7 +122,8 @@ public OptionalLong getCurrentNonceFor(final Address sender) { public List promote( final Predicate promotionFilter, final long freeSpace, - final int freeSlots) { + final int freeSlots, + final int[] maxPromotionsPerType) { return List.of(); } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java index a3dc195d1bf..72e91e376b0 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java @@ -17,6 +17,7 @@ import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.PROMOTED; import org.hyperledger.besu.datatypes.Address; +import org.hyperledger.besu.datatypes.TransactionType; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.transactions.BlobCache; @@ -146,19 +147,25 @@ public Stream stream() { public List promote( final Predicate promotionFilter, final long freeSpace, - final int freeSlots) { + final int freeSlots, + final int[] maxPromotionsPerType) { long accumulatedSpace = 0; final List promotedTxs = new ArrayList<>(); + final int[] promotedCountByType = new int[maxPromotionsPerType.length]; + final Predicate thereIsSpaceForType = + txType -> promotedCountByType[txType.ordinal()] < maxPromotionsPerType[txType.ordinal()]; // first find all txs that can be promoted search: for (final var senderFirstTx : orderByMaxFee.descendingSet()) { final var senderTxs = txsBySender.get(senderFirstTx.getSender()); for (final var candidateTx : senderTxs.values()) { - if (promotionFilter.test(candidateTx)) { + final var txType = candidateTx.getTransaction().getType(); + if (promotionFilter.test(candidateTx) && thereIsSpaceForType.test(txType)) { accumulatedSpace += candidateTx.memorySize(); if (promotedTxs.size() < freeSlots && accumulatedSpace <= freeSpace) { promotedTxs.add(candidateTx); + ++promotedCountByType[txType.ordinal()]; } else { // no room for more txs the search is over exit the loops break search; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java index e036f2fd215..c5f6d089a71 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java @@ -18,6 +18,7 @@ import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.PROMOTED; import org.hyperledger.besu.datatypes.Address; +import org.hyperledger.besu.datatypes.TransactionType; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.transactions.BlobCache; @@ -145,9 +146,13 @@ protected void internalBlockAdded(final BlockHeader blockHeader, final FeeMarket public List promote( final Predicate promotionFilter, final long freeSpace, - final int freeSlots) { + final int freeSlots, + final int[] maxPromotionsPerType) { long accumulatedSpace = 0; final List promotedTxs = new ArrayList<>(); + final int[] promotedCountByType = new int[maxPromotionsPerType.length]; + final Predicate thereIsSpaceForType = + txType -> promotedCountByType[txType.ordinal()] < maxPromotionsPerType[txType.ordinal()]; final var zeroGapSenders = orderByGap.get(0); @@ -156,11 +161,12 @@ public List promote( final var senderSeqTxs = getSequentialSubset(txsBySender.get(sender)); for (final var candidateTx : senderSeqTxs.values()) { - - if (promotionFilter.test(candidateTx)) { + final var txType = candidateTx.getTransaction().getType(); + if (promotionFilter.test(candidateTx) && thereIsSpaceForType.test(txType)) { accumulatedSpace += candidateTx.memorySize(); if (promotedTxs.size() < freeSlots && accumulatedSpace <= freeSpace) { promotedTxs.add(candidateTx); + ++promotedCountByType[txType.ordinal()]; } else { // no room for more txs the search is over exit the loops break search; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java index 688eb6721e8..a23dc437dbc 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java @@ -70,7 +70,10 @@ void blockAdded( OptionalLong getCurrentNonceFor(Address sender); List promote( - Predicate promotionFilter, final long freeSpace, final int freeSlots); + Predicate promotionFilter, + final long freeSpace, + final int freeSlots, + final int[] maxPromotionsPerType); long subscribeToAdded(PendingTransactionAddedListener listener); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReplayTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReplayTest.java index b8a94945ec6..63367b2fda0 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReplayTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReplayTest.java @@ -22,6 +22,7 @@ import org.hyperledger.besu.crypto.SignatureAlgorithmFactory; import org.hyperledger.besu.datatypes.Address; +import org.hyperledger.besu.datatypes.TransactionType; import org.hyperledger.besu.datatypes.Wei; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.MiningParameters; @@ -48,11 +49,13 @@ import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.Arrays; +import java.util.EnumMap; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.function.BiFunction; +import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; import com.google.common.base.Splitter; @@ -117,6 +120,7 @@ public void replay() throws IOException { final TransactionPoolConfiguration poolConfig = ImmutableTransactionPoolConfiguration.builder() .prioritySenders(readPrioritySenders(br.readLine())) + .maxPrioritizedTransactionsByType(readMaxPrioritizedByType(br.readLine())) .build(); final AbstractPrioritizedTransactions prioritizedTransactions = @@ -161,6 +165,17 @@ public void replay() throws IOException { } } + private Map readMaxPrioritizedByType(final String line) { + return Arrays.stream(line.split(",")) + .map(e -> e.split("=")) + .collect( + Collectors.toMap( + a -> TransactionType.valueOf(a[0]), + a -> Integer.parseInt(a[1]), + (a, b) -> a, + () -> new EnumMap<>(TransactionType.class))); + } + private List
readPrioritySenders(final String line) { return Arrays.stream(line.split(",")).map(Address::fromHexString).toList(); } From 1ea431da7cc7c9677c034f8170b565012dacd523 Mon Sep 17 00:00:00 2001 From: Fabio Di Fabio Date: Fri, 12 Apr 2024 16:25:05 +0200 Subject: [PATCH 6/8] tests Signed-off-by: Fabio Di Fabio --- .../AbstractPrioritizedTransactions.java | 16 ++ .../layered/AbstractTransactionsLayer.java | 8 + .../BaseFeePrioritizedTransactions.java | 8 - .../layered/BaseTransactionPoolTest.java | 24 ++- .../LayeredPendingTransactionsTest.java | 52 +++-- .../eth/transactions/layered/LayersTest.java | 189 +++++++++++++----- 6 files changed, 220 insertions(+), 77 deletions(-) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java index c7256ee499a..773469e52a1 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java @@ -88,6 +88,15 @@ protected void internalReplaced(final PendingTransaction replacedTx) { } private boolean hasPriority(final PendingTransaction pendingTransaction) { + // check if there is space for that tx type + final var txType = pendingTransaction.getTransaction().getType(); + if (txCountByType[txType.ordinal()] + >= poolConfig + .getMaxPrioritizedTransactionsByType() + .getOrDefault(txType, Integer.MAX_VALUE)) { + return false; + } + // if it does not pass the promotion filter, then has not priority if (!promotionFilter(pendingTransaction)) { return false; @@ -129,6 +138,13 @@ public List promote( return List.of(); } + /** + * Here the max number of txs of a specific type that can be promoted, is defined by the + * configuration, so we return the difference between the configured max and the current count of + * txs for each type + * + * @return an array containing the max amount of txs that can be promoted for each type + */ @Override protected int[] getMaxPromotionsPerType() { final var allTypes = TransactionType.values(); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java index 1ff77fdc685..1d2c0bb192d 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java @@ -437,6 +437,14 @@ final void promoteTransactions() { } } + /** + * How many txs of a specified type can be promoted? This make sense when a max number of txs of a + * type can be included in a single block (ex. blob txs), to avoid filling the layer with more txs + * than the useful ones. By default, there are no limits, but each layer can define its own + * policy. + * + * @return an array containing the max amount of txs that can be promoted for each type + */ protected int[] getMaxPromotionsPerType() { return UNLIMITED_PROMOTIONS_PER_TYPE; } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactions.java index 10db405c25c..d47e79048a2 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseFeePrioritizedTransactions.java @@ -149,14 +149,6 @@ private Wei calculateNextBlockBaseFee(final FeeMarket feeMarket, final BlockHead @Override protected boolean promotionFilter(final PendingTransaction pendingTransaction) { - // check if there is space for that tx type - final var txType = pendingTransaction.getTransaction().getType(); - if (txCountByType[txType.ordinal()] - >= poolConfig - .getMaxPrioritizedTransactionsByType() - .getOrDefault(txType, Integer.MAX_VALUE)) { - return false; - } // check if the tx is willing to pay at least the base fee if (nextBlockBaseFee diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseTransactionPoolTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseTransactionPoolTest.java index c48b9f13c23..4a53c799a16 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseTransactionPoolTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/BaseTransactionPoolTest.java @@ -61,6 +61,7 @@ public class BaseTransactionPoolTest { protected final Transaction transaction0 = createTransaction(0); protected final Transaction transaction1 = createTransaction(1); protected final Transaction transaction2 = createTransaction(2); + protected final Transaction blobTransaction0 = createEIP4844Transaction(0, KEYS1, 1, 1); protected final StubMetricsSystem metricsSystem = new StubMetricsSystem(); @@ -97,13 +98,29 @@ protected Transaction createEIP4844Transaction( TransactionType.BLOB, nonce, Wei.of(5000L).multiply(gasFeeMultiplier), 0, blobCount, keys); } + protected Transaction createTransactionOfSize( + final long nonce, final Wei maxGasPrice, final int txSize, final KeyPair keys) { + + final TransactionType txType = + TransactionType.values()[ + randomizeTxType.nextInt(txSize < blobTransaction0.getSize() ? 3 : 4)]; + + final Transaction baseTx = createTransaction(txType, nonce, maxGasPrice, 0, 1, keys); + final int payloadSize = txSize - baseTx.getSize(); + + return createTransaction(txType, nonce, maxGasPrice, payloadSize, 1, keys); + } + protected Transaction createTransaction( final long nonce, final Wei maxGasPrice, final int payloadSize, final KeyPair keys) { - // ToDo 4844: include BLOB tx here - final TransactionType txType = TransactionType.values()[randomizeTxType.nextInt(3)]; + final TransactionType txType = TransactionType.values()[randomizeTxType.nextInt(4)]; - return createTransaction(txType, nonce, maxGasPrice, payloadSize, keys); + return switch (txType) { + case FRONTIER, ACCESS_LIST, EIP1559 -> + createTransaction(txType, nonce, maxGasPrice, payloadSize, keys); + case BLOB -> createTransaction(txType, nonce, maxGasPrice, payloadSize, 1, keys); + }; } protected Transaction createTransaction( @@ -178,6 +195,7 @@ protected Transaction createTransactionReplacement( originalTransaction.getNonce(), originalTransaction.getMaxGasPrice().multiply(2), 0, + 1, keys); } diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactionsTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactionsTest.java index 1253246506b..196baffa3a9 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactionsTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayeredPendingTransactionsTest.java @@ -15,6 +15,7 @@ package org.hyperledger.besu.ethereum.eth.transactions.layered; import static org.assertj.core.api.Assertions.assertThat; +import static org.hyperledger.besu.datatypes.TransactionType.BLOB; import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ADDED; import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.ALREADY_KNOWN; import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.NONCE_TOO_FAR_IN_FUTURE_FOR_SENDER; @@ -55,6 +56,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.OptionalLong; import java.util.function.BiFunction; @@ -68,7 +70,8 @@ public class LayeredPendingTransactionsTest extends BaseTransactionPoolTest { protected static final int MAX_TRANSACTIONS = 5; - protected static final int MAX_CAPACITY_BYTES = 10_000; + protected static final int MAX_PRIORITIZED_BLOB_TRANSACTIONS = MAX_TRANSACTIONS + 1; + protected static final int MAX_CAPACITY_BYTES = 150_000; protected static final Wei DEFAULT_BASE_FEE = Wei.of(100); protected static final int LIMITED_TRANSACTIONS_BY_SENDER = 4; protected static final String REMOTE = "remote"; @@ -82,20 +85,31 @@ public class LayeredPendingTransactionsTest extends BaseTransactionPoolTest { private final TransactionPoolConfiguration poolConf = ImmutableTransactionPoolConfiguration.builder() .maxPrioritizedTransactions(MAX_TRANSACTIONS) + .maxPrioritizedTransactionsByType(Map.of(BLOB, MAX_PRIORITIZED_BLOB_TRANSACTIONS)) .maxFutureBySender(MAX_TRANSACTIONS) - .pendingTransactionsLayerMaxCapacityBytes(MAX_CAPACITY_BYTES) .build(); private final TransactionPoolConfiguration senderLimitedConfig = ImmutableTransactionPoolConfiguration.builder() .maxPrioritizedTransactions(MAX_TRANSACTIONS) + .maxPrioritizedTransactionsByType(Map.of(BLOB, MAX_PRIORITIZED_BLOB_TRANSACTIONS)) + .maxFutureBySender(LIMITED_TRANSACTIONS_BY_SENDER) + .build(); + + private final TransactionPoolConfiguration smallPoolConfig = + ImmutableTransactionPoolConfiguration.builder() + .maxPrioritizedTransactions(MAX_TRANSACTIONS) + .maxPrioritizedTransactionsByType(Map.of(BLOB, MAX_PRIORITIZED_BLOB_TRANSACTIONS)) .maxFutureBySender(LIMITED_TRANSACTIONS_BY_SENDER) .pendingTransactionsLayerMaxCapacityBytes(MAX_CAPACITY_BYTES) .build(); + private LayeredPendingTransactions senderLimitedTransactions; private LayeredPendingTransactions pendingTransactions; + private LayeredPendingTransactions smallPendingTransactions; private CreatedLayers senderLimitedLayers; private CreatedLayers layers; + private CreatedLayers smallLayers; private TransactionPoolMetrics txPoolMetrics; private static BlockHeader mockBlockHeader() { @@ -151,12 +165,16 @@ public void setup() { layers = createLayers(poolConf); senderLimitedLayers = createLayers(senderLimitedConfig); + smallLayers = createLayers(smallPoolConfig); pendingTransactions = new LayeredPendingTransactions(poolConf, layers.prioritizedTransactions); senderLimitedTransactions = new LayeredPendingTransactions( senderLimitedConfig, senderLimitedLayers.prioritizedTransactions); + + smallPendingTransactions = + new LayeredPendingTransactions(smallPoolConfig, smallLayers.prioritizedTransactions); } @Test @@ -211,41 +229,43 @@ public void getTransactionByHash() { public void evictTransactionsWhenSizeLimitExceeded() { final List firstTxs = new ArrayList<>(MAX_TRANSACTIONS); - pendingTransactions.subscribeDroppedTransactions(droppedListener); + smallPendingTransactions.subscribeDroppedTransactions(droppedListener); for (int i = 0; i < MAX_TRANSACTIONS; i++) { final Account sender = mock(Account.class); when(sender.getNonce()).thenReturn((long) i); final var tx = - createTransaction( + createTransactionOfSize( i, - DEFAULT_MIN_GAS_PRICE.multiply(2 * (i + 1)), - (int) poolConf.getPendingTransactionsLayerMaxCapacityBytes() + 1, + DEFAULT_BASE_FEE.add(i), + (int) smallPoolConfig.getPendingTransactionsLayerMaxCapacityBytes() + 1, SIGNATURE_ALGORITHM.get().generateKeyPair()); - pendingTransactions.addTransaction(createRemotePendingTransaction(tx), Optional.of(sender)); + smallPendingTransactions.addTransaction( + createRemotePendingTransaction(tx), Optional.of(sender)); firstTxs.add(tx); - assertTransactionPending(pendingTransactions, tx); + assertTransactionPending(smallPendingTransactions, tx); } - assertThat(pendingTransactions.size()).isEqualTo(MAX_TRANSACTIONS); + assertThat(smallPendingTransactions.size()).isEqualTo(MAX_TRANSACTIONS); final Transaction lastBigTx = - createTransaction( + createTransactionOfSize( 0, DEFAULT_MIN_GAS_PRICE.multiply(1000), - (int) poolConf.getPendingTransactionsLayerMaxCapacityBytes(), + (int) smallPoolConfig.getPendingTransactionsLayerMaxCapacityBytes(), SIGNATURE_ALGORITHM.get().generateKeyPair()); final Account lastSender = mock(Account.class); when(lastSender.getNonce()).thenReturn(0L); - pendingTransactions.addTransaction( + smallPendingTransactions.addTransaction( createRemotePendingTransaction(lastBigTx), Optional.of(lastSender)); - assertTransactionPending(pendingTransactions, lastBigTx); + assertTransactionPending(smallPendingTransactions, lastBigTx); - assertTransactionNotPending(pendingTransactions, firstTxs.get(0)); + assertTransactionNotPending(smallPendingTransactions, firstTxs.get(0)); assertThat( - getRemovedCount(REMOTE, NO_PRIORITY, DROPPED.label(), layers.evictedCollector.name())) + getRemovedCount( + REMOTE, NO_PRIORITY, DROPPED.label(), smallLayers.evictedCollector.name())) .isEqualTo(1); - assertThat(layers.evictedCollector.getEvictedTransactions()) + assertThat(smallLayers.evictedCollector.getEvictedTransactions()) .map(PendingTransaction::getTransaction) .contains(firstTxs.get(0)); verify(droppedListener).onTransactionDropped(firstTxs.get(0)); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java index c89540fcf76..f512f5bae3f 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java @@ -15,6 +15,10 @@ package org.hyperledger.besu.ethereum.eth.transactions.layered; import static org.assertj.core.api.Assertions.assertThat; +import static org.hyperledger.besu.datatypes.TransactionType.ACCESS_LIST; +import static org.hyperledger.besu.datatypes.TransactionType.BLOB; +import static org.hyperledger.besu.datatypes.TransactionType.EIP1559; +import static org.hyperledger.besu.datatypes.TransactionType.FRONTIER; import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayersTest.Sender.S1; import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayersTest.Sender.S2; import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayersTest.Sender.S3; @@ -27,6 +31,7 @@ import org.hyperledger.besu.crypto.KeyPair; import org.hyperledger.besu.datatypes.Address; +import org.hyperledger.besu.datatypes.TransactionType; import org.hyperledger.besu.datatypes.Wei; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.MiningParameters; @@ -51,7 +56,6 @@ import java.util.OptionalLong; import java.util.stream.Stream; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -60,51 +64,29 @@ public class LayersTest extends BaseTransactionPoolTest { private static final int MAX_PRIO_TRANSACTIONS = 3; private static final int MAX_FUTURE_FOR_SENDER = 10; - private final TransactionPoolConfiguration poolConfig = + private static final TransactionPoolConfiguration DEFAULT_TX_POOL_CONFIG = ImmutableTransactionPoolConfiguration.builder() .maxPrioritizedTransactions(MAX_PRIO_TRANSACTIONS) + .maxPrioritizedTransactionsByType(Map.of(BLOB, 1)) .maxFutureBySender(MAX_FUTURE_FOR_SENDER) .pendingTransactionsLayerMaxCapacityBytes( - new PendingTransaction.Remote(createEIP1559Transaction(0, KEYS1, 1)).memorySize() * 3) + new PendingTransaction.Remote( + new BaseTransactionPoolTest().createEIP1559Transaction(0, KEYS1, 1)) + .memorySize() + * 3L) .build(); - private final TransactionPoolMetrics txPoolMetrics = new TransactionPoolMetrics(metricsSystem); - - private final EvictCollectorLayer evictCollector = new EvictCollectorLayer(txPoolMetrics); - private final SparseTransactions sparseTransactions = - new SparseTransactions( - poolConfig, - evictCollector, - txPoolMetrics, - this::transactionReplacementTester, - new BlobCache()); - - private final ReadyTransactions readyTransactions = - new ReadyTransactions( - poolConfig, - sparseTransactions, - txPoolMetrics, - this::transactionReplacementTester, - new BlobCache()); - - private final BaseFeePrioritizedTransactions prioritizedTransactions = - new BaseFeePrioritizedTransactions( - poolConfig, - LayersTest::mockBlockHeader, - readyTransactions, - txPoolMetrics, - this::transactionReplacementTester, - FeeMarket.london(0L), - new BlobCache(), - MiningParameters.newDefault()); - - private final LayeredPendingTransactions pendingTransactions = - new LayeredPendingTransactions(poolConfig, prioritizedTransactions); - - @AfterEach - void reset() { - pendingTransactions.reset(); - } + private static final TransactionPoolConfiguration BLOB_TX_POOL_CONFIG = + ImmutableTransactionPoolConfiguration.builder() + .maxPrioritizedTransactions(MAX_PRIO_TRANSACTIONS) + .maxPrioritizedTransactionsByType(Map.of(BLOB, 1)) + .maxFutureBySender(MAX_FUTURE_FOR_SENDER) + .pendingTransactionsLayerMaxCapacityBytes( + new PendingTransaction.Remote( + new BaseTransactionPoolTest().createEIP4844Transaction(0, KEYS1, 1, 1)) + .memorySize() + * 3L) + .build(); @ParameterizedTest @MethodSource("providerAddTransactions") @@ -166,7 +148,51 @@ void prioritySenders(final Scenario scenario) { assertScenario(scenario); } + @ParameterizedTest + @MethodSource("providerMaxPrioritizedByType") + void maxPrioritizedByType(final Scenario scenario) { + assertScenario(scenario, BLOB_TX_POOL_CONFIG); + } + private void assertScenario(final Scenario scenario) { + assertScenario(scenario, DEFAULT_TX_POOL_CONFIG); + } + + private void assertScenario( + final Scenario scenario, final TransactionPoolConfiguration poolConfig) { + final TransactionPoolMetrics txPoolMetrics = new TransactionPoolMetrics(metricsSystem); + + final EvictCollectorLayer evictCollector = new EvictCollectorLayer(txPoolMetrics); + final SparseTransactions sparseTransactions = + new SparseTransactions( + poolConfig, + evictCollector, + txPoolMetrics, + (pt1, pt2) -> transactionReplacementTester(poolConfig, pt1, pt2), + new BlobCache()); + + final ReadyTransactions readyTransactions = + new ReadyTransactions( + poolConfig, + sparseTransactions, + txPoolMetrics, + (pt1, pt2) -> transactionReplacementTester(poolConfig, pt1, pt2), + new BlobCache()); + + final BaseFeePrioritizedTransactions prioritizedTransactions = + new BaseFeePrioritizedTransactions( + poolConfig, + LayersTest::mockBlockHeader, + readyTransactions, + txPoolMetrics, + (pt1, pt2) -> transactionReplacementTester(poolConfig, pt1, pt2), + FeeMarket.london(0L), + new BlobCache(), + MiningParameters.newDefault()); + + final LayeredPendingTransactions pendingTransactions = + new LayeredPendingTransactions(poolConfig, prioritizedTransactions); + scenario.execute( pendingTransactions, prioritizedTransactions, @@ -1178,17 +1204,49 @@ static Stream providerPrioritySenders() { .expectedDroppedForSender(S3, 0))); } + static Stream providerMaxPrioritizedByType() { + return Stream.of( + Arguments.of( + new Scenario("first blob tx is prioritized") + .addForSender(S1, BLOB, 0) + .expectedPrioritizedForSender(S1, 0)), + Arguments.of( + new Scenario("multiple senders only first blob tx is prioritized") + .addForSender(S1, BLOB, 0) + .addForSender(S2, BLOB, 0) + .expectedPrioritizedForSender(S1, 0) + .expectedReadyForSender(S2, 0)), + Arguments.of( + new Scenario("same sender following blob txs are moved to ready") + .addForSender(S1, BLOB, 0, 1, 2) + .expectedPrioritizedForSender(S1, 0) + .expectedReadyForSender(S1, 1, 2)), + Arguments.of( + new Scenario("promoting txs respect prioritized count limit") + .addForSender(S1, BLOB, 0, 1, 2) + .expectedPrioritizedForSender(S1, 0) + .expectedReadyForSender(S1, 1, 2) + .confirmedForSenders(S1, 0) + .expectedPrioritizedForSender(S1, 1) + .expectedReadyForSender(S1, 2)), + Arguments.of( + new Scenario("promoting to ready is unbounded") + .addForSender(S1, BLOB, 0, 1, 2, 3, 4, 5, 6) + .expectedPrioritizedForSender(S1, 0) + .expectedReadyForSender(S1, 1, 2, 3) + .expectedSparseForSender(S1, 4, 5, 6) + .confirmedForSenders(S1, 3) + .expectedPrioritizedForSender(S1, 4) + .expectedReadyForSender(S1, 5, 6) + .expectedSparseForSenders())); + } + private static BlockHeader mockBlockHeader() { final BlockHeader blockHeader = mock(BlockHeader.class); when(blockHeader.getBaseFee()).thenReturn(Optional.of(Wei.ONE)); return blockHeader; } - private boolean transactionReplacementTester( - final PendingTransaction pt1, final PendingTransaction pt2) { - return transactionReplacementTester(poolConfig, pt1, pt2); - } - private static boolean transactionReplacementTester( final TransactionPoolConfiguration poolConfig, final PendingTransaction pt1, @@ -1233,10 +1291,14 @@ void accept( } Scenario addForSender(final Sender sender, final long... nonce) { + return addForSender(sender, EIP1559, nonce); + } + + Scenario addForSender(final Sender sender, final TransactionType type, final long... nonce) { Arrays.stream(nonce) .forEach( n -> { - final var pendingTx = getOrCreate(sender, n); + final var pendingTx = getOrCreate(sender, type, n); actions.add( (pending, prio, ready, sparse, dropped) -> { final Account mockSender = mock(Account.class); @@ -1288,22 +1350,49 @@ void execute( assertExpectedDropped(dropped, lastExpectedDropped); } - private PendingTransaction getOrCreate(final Sender sender, final long nonce) { + private PendingTransaction getOrCreate( + final Sender sender, final TransactionType type, final long nonce) { return txsBySender .get(sender) - .computeIfAbsent(nonce, n -> createEIP1559PendingTransactions(sender, n)); + .computeIfAbsent( + nonce, + n -> + switch (type) { + case FRONTIER -> createFrontierPendingTransaction(sender, n); + case ACCESS_LIST -> createAccessListPendingTransaction(sender, n); + case EIP1559 -> createEIP1559PendingTransaction(sender, n); + case BLOB -> createBlobPendingTransaction(sender, n); + }); } private PendingTransaction get(final Sender sender, final long nonce) { return txsBySender.get(sender).get(nonce); } - private PendingTransaction createEIP1559PendingTransactions( + private PendingTransaction createFrontierPendingTransaction( + final Sender sender, final long nonce) { + return createRemotePendingTransaction( + createTransaction(FRONTIER, nonce, Wei.ONE, 0, sender.key), sender.hasPriority); + } + + private PendingTransaction createAccessListPendingTransaction( + final Sender sender, final long nonce) { + return createRemotePendingTransaction( + createTransaction(ACCESS_LIST, nonce, Wei.ONE, 0, sender.key), sender.hasPriority); + } + + private PendingTransaction createEIP1559PendingTransaction( final Sender sender, final long nonce) { return createRemotePendingTransaction( createEIP1559Transaction(nonce, sender.key, sender.gasFeeMultiplier), sender.hasPriority); } + private PendingTransaction createBlobPendingTransaction(final Sender sender, final long nonce) { + return createRemotePendingTransaction( + createEIP4844Transaction(nonce, sender.key, sender.gasFeeMultiplier, 1), + sender.hasPriority); + } + public Scenario expectedPrioritizedForSender(final Sender sender, final long... nonce) { lastExpectedPrioritized = expectedForSender(sender, nonce); final var expectedCopy = List.copyOf(lastExpectedPrioritized); @@ -1467,7 +1556,7 @@ public Scenario removeForSender(final Sender sender, final long... nonce) { Arrays.stream(nonce) .forEach( n -> { - final var pendingTx = getOrCreate(sender, n); + final var pendingTx = getOrCreate(sender, EIP1559, n); actions.add( (pending, prio, ready, sparse, dropped) -> prio.remove(pendingTx, INVALIDATED)); }); From 95167f2dfd8eeb6b3646c1e8e9e916ce68e463c3 Mon Sep 17 00:00:00 2001 From: Fabio Di Fabio Date: Fri, 12 Apr 2024 18:26:40 +0200 Subject: [PATCH 7/8] Add CHANGELOG entry Signed-off-by: Fabio Di Fabio --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 045968989cc..f97a5a7ca0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ - Expose bad block events via the BesuEvents plugin API [#6848](https://github.com/hyperledger/besu/pull/6848) - Add RPC errors metric [#6919](https://github.com/hyperledger/besu/pull/6919/) - Add `rlp decode` subcommand to decode IBFT/QBFT extraData to validator list [#6895](https://github.com/hyperledger/besu/pull/6895) +- Layered txpool tuning for blob transactions [#6940](https://github.com/hyperledger/besu/pull/6940) ### Bug fixes - Fix txpool dump/restore race condition [#6665](https://github.com/hyperledger/besu/pull/6665) From 670116949f61aee1be6695cd853114e5cd6da1a1 Mon Sep 17 00:00:00 2001 From: Fabio Di Fabio Date: Fri, 12 Apr 2024 19:56:07 +0200 Subject: [PATCH 8/8] apply the limit when filling nonce gaps Signed-off-by: Fabio Di Fabio --- .../src/test/resources/everything_config.toml | 1 + .../AbstractPrioritizedTransactions.java | 10 ++-- .../layered/AbstractTransactionsLayer.java | 46 ++++++++++++------- .../eth/transactions/layered/EndLayer.java | 5 +- .../layered/ReadyTransactions.java | 10 ++-- .../layered/SparseTransactions.java | 10 ++-- .../layered/TransactionsLayer.java | 4 +- .../eth/transactions/layered/LayersTest.java | 7 +++ 8 files changed, 54 insertions(+), 39 deletions(-) diff --git a/besu/src/test/resources/everything_config.toml b/besu/src/test/resources/everything_config.toml index f2b112eccb1..a92eac34e5c 100644 --- a/besu/src/test/resources/everything_config.toml +++ b/besu/src/test/resources/everything_config.toml @@ -185,6 +185,7 @@ tx-pool-save-file="txpool.dump" ## Layered tx-pool-layer-max-capacity=12345678 tx-pool-max-prioritized=9876 +tx-pool-max-prioritized-by-type=["BLOB=10","FRONTIER=100"] tx-pool-max-future-by-sender=321 ## Legacy/Sequenced tx-pool-retention-hours=999 diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java index 773469e52a1..b728ed08638 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractPrioritizedTransactions.java @@ -134,7 +134,7 @@ public List promote( final Predicate promotionFilter, final long freeSpace, final int freeSlots, - final int[] maxPromotionsPerType) { + final int[] remainingPromotionsPerType) { return List.of(); } @@ -146,17 +146,17 @@ public List promote( * @return an array containing the max amount of txs that can be promoted for each type */ @Override - protected int[] getMaxPromotionsPerType() { + protected int[] getRemainingPromotionsPerType() { final var allTypes = TransactionType.values(); - final var maxPromotionsPerType = new int[allTypes.length]; + final var remainingPromotionsPerType = new int[allTypes.length]; for (int i = 0; i < allTypes.length; i++) { - maxPromotionsPerType[i] = + remainingPromotionsPerType[i] = poolConfig .getMaxPrioritizedTransactionsByType() .getOrDefault(allTypes[i], Integer.MAX_VALUE) - txCountByType[i]; } - return maxPromotionsPerType; + return remainingPromotionsPerType; } @Override diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java index 1d2c0bb192d..2885edc42a3 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/AbstractTransactionsLayer.java @@ -177,7 +177,7 @@ public TransactionAddedResult add(final PendingTransaction pendingTransaction, f if (!maybeFull()) { // if there is space try to see if the added tx filled some gaps - tryFillGap(addStatus, pendingTransaction); + tryFillGap(addStatus, pendingTransaction, getRemainingPromotionsPerType()); } notifyTransactionAdded(pendingTransaction); @@ -214,16 +214,21 @@ private boolean maybeFull() { } private void tryFillGap( - final TransactionAddedResult addStatus, final PendingTransaction pendingTransaction) { + final TransactionAddedResult addStatus, + final PendingTransaction pendingTransaction, + final int[] remainingPromotionsPerType) { // it makes sense to fill gaps only if the add is not a replacement and this layer does not // allow gaps if (!addStatus.isReplacement() && !gapsAllowed()) { final PendingTransaction promotedTx = - nextLayer.promoteFor(pendingTransaction.getSender(), pendingTransaction.getNonce()); + nextLayer.promoteFor( + pendingTransaction.getSender(), + pendingTransaction.getNonce(), + remainingPromotionsPerType); if (promotedTx != null) { processAdded(promotedTx); if (!maybeFull()) { - tryFillGap(ADDED, promotedTx); + tryFillGap(ADDED, promotedTx, remainingPromotionsPerType); } } } @@ -258,22 +263,30 @@ protected abstract void internalNotifyAdded( final PendingTransaction pendingTransaction); @Override - public PendingTransaction promoteFor(final Address sender, final long nonce) { + public PendingTransaction promoteFor( + final Address sender, final long nonce, final int[] remainingPromotionsPerType) { final var senderTxs = txsBySender.get(sender); if (senderTxs != null) { long expectedNonce = nonce + 1; if (senderTxs.firstKey() == expectedNonce) { - final PendingTransaction promotedTx = senderTxs.pollFirstEntry().getValue(); - processRemove(senderTxs, promotedTx.getTransaction(), PROMOTED); - metrics.incrementRemoved(promotedTx, "promoted", name()); - - if (senderTxs.isEmpty()) { - txsBySender.remove(sender); + final var candidateTx = senderTxs.firstEntry().getValue(); + final var txType = candidateTx.getTransaction().getType(); + + if (remainingPromotionsPerType[txType.ordinal()] > 0) { + senderTxs.pollFirstEntry(); + processRemove(senderTxs, candidateTx.getTransaction(), PROMOTED); + metrics.incrementRemoved(candidateTx, "promoted", name()); + + if (senderTxs.isEmpty()) { + txsBySender.remove(sender); + } + --remainingPromotionsPerType[txType.ordinal()]; + return candidateTx; } - return promotedTx; + return null; } } - return nextLayer.promoteFor(sender, nonce); + return nextLayer.promoteFor(sender, nonce, remainingPromotionsPerType); } private TransactionAddedResult addToNextLayer( @@ -432,7 +445,8 @@ final void promoteTransactions() { if (freeSlots > 0 && freeSpace > 0) { nextLayer - .promote(this::promotionFilter, cacheFreeSpace(), freeSlots, getMaxPromotionsPerType()) + .promote( + this::promotionFilter, cacheFreeSpace(), freeSlots, getRemainingPromotionsPerType()) .forEach(this::processAdded); } } @@ -445,8 +459,8 @@ final void promoteTransactions() { * * @return an array containing the max amount of txs that can be promoted for each type */ - protected int[] getMaxPromotionsPerType() { - return UNLIMITED_PROMOTIONS_PER_TYPE; + protected int[] getRemainingPromotionsPerType() { + return Arrays.copyOf(UNLIMITED_PROMOTIONS_PER_TYPE, UNLIMITED_PROMOTIONS_PER_TYPE.length); } private void confirmed(final Address sender, final long maxConfirmedNonce) { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java index 0fdf5663c88..d511a89c557 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/EndLayer.java @@ -123,7 +123,7 @@ public List promote( final Predicate promotionFilter, final long freeSpace, final int freeSlots, - final int[] maxPromotionsPerType) { + final int[] remainingPromotionsPerType) { return List.of(); } @@ -153,7 +153,8 @@ protected void notifyTransactionDropped(final PendingTransaction pendingTransact } @Override - public PendingTransaction promoteFor(final Address sender, final long nonce) { + public PendingTransaction promoteFor( + final Address sender, final long nonce, final int[] remainingPromotionsPerType) { return null; } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java index 72e91e376b0..570099d3392 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/ReadyTransactions.java @@ -17,7 +17,6 @@ import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.PROMOTED; import org.hyperledger.besu.datatypes.Address; -import org.hyperledger.besu.datatypes.TransactionType; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.transactions.BlobCache; @@ -148,12 +147,9 @@ public List promote( final Predicate promotionFilter, final long freeSpace, final int freeSlots, - final int[] maxPromotionsPerType) { + final int[] remainingPromotionsPerType) { long accumulatedSpace = 0; final List promotedTxs = new ArrayList<>(); - final int[] promotedCountByType = new int[maxPromotionsPerType.length]; - final Predicate thereIsSpaceForType = - txType -> promotedCountByType[txType.ordinal()] < maxPromotionsPerType[txType.ordinal()]; // first find all txs that can be promoted search: @@ -161,11 +157,11 @@ public List promote( final var senderTxs = txsBySender.get(senderFirstTx.getSender()); for (final var candidateTx : senderTxs.values()) { final var txType = candidateTx.getTransaction().getType(); - if (promotionFilter.test(candidateTx) && thereIsSpaceForType.test(txType)) { + if (promotionFilter.test(candidateTx) && remainingPromotionsPerType[txType.ordinal()] > 0) { accumulatedSpace += candidateTx.memorySize(); if (promotedTxs.size() < freeSlots && accumulatedSpace <= freeSpace) { promotedTxs.add(candidateTx); - ++promotedCountByType[txType.ordinal()]; + --remainingPromotionsPerType[txType.ordinal()]; } else { // no room for more txs the search is over exit the loops break search; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java index c5f6d089a71..2a545564227 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/SparseTransactions.java @@ -18,7 +18,6 @@ import static org.hyperledger.besu.ethereum.eth.transactions.layered.TransactionsLayer.RemovalReason.PROMOTED; import org.hyperledger.besu.datatypes.Address; -import org.hyperledger.besu.datatypes.TransactionType; import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.Transaction; import org.hyperledger.besu.ethereum.eth.transactions.BlobCache; @@ -147,12 +146,9 @@ public List promote( final Predicate promotionFilter, final long freeSpace, final int freeSlots, - final int[] maxPromotionsPerType) { + final int[] remainingPromotionsPerType) { long accumulatedSpace = 0; final List promotedTxs = new ArrayList<>(); - final int[] promotedCountByType = new int[maxPromotionsPerType.length]; - final Predicate thereIsSpaceForType = - txType -> promotedCountByType[txType.ordinal()] < maxPromotionsPerType[txType.ordinal()]; final var zeroGapSenders = orderByGap.get(0); @@ -162,11 +158,11 @@ public List promote( for (final var candidateTx : senderSeqTxs.values()) { final var txType = candidateTx.getTransaction().getType(); - if (promotionFilter.test(candidateTx) && thereIsSpaceForType.test(txType)) { + if (promotionFilter.test(candidateTx) && remainingPromotionsPerType[txType.ordinal()] > 0) { accumulatedSpace += candidateTx.memorySize(); if (promotedTxs.size() < freeSlots && accumulatedSpace <= freeSpace) { promotedTxs.add(candidateTx); - ++promotedCountByType[txType.ordinal()]; + --remainingPromotionsPerType[txType.ordinal()]; } else { // no room for more txs the search is over exit the loops break search; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java index a23dc437dbc..0630ec1f934 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/transactions/layered/TransactionsLayer.java @@ -73,7 +73,7 @@ List promote( Predicate promotionFilter, final long freeSpace, final int freeSlots, - final int[] maxPromotionsPerType); + final int[] remainingPromotionsPerType); long subscribeToAdded(PendingTransactionAddedListener listener); @@ -83,7 +83,7 @@ List promote( void unsubscribeFromDropped(long id); - PendingTransaction promoteFor(Address sender, long nonce); + PendingTransaction promoteFor(Address sender, long nonce, final int[] remainingPromotionsPerType); void notifyAdded(PendingTransaction pendingTransaction); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java index f512f5bae3f..162b22d3dea 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/transactions/layered/LayersTest.java @@ -1229,6 +1229,13 @@ static Stream providerMaxPrioritizedByType() { .confirmedForSenders(S1, 0) .expectedPrioritizedForSender(S1, 1) .expectedReadyForSender(S1, 2)), + Arguments.of( + new Scenario("filling gaps respect prioritized count limit") + .addForSender(S1, BLOB, 1) + .expectedSparseForSender(S1, 1) + .addForSender(S1, BLOB, 0) + .expectedPrioritizedForSender(S1, 0) + .expectedSparseForSender(S1, 1)), Arguments.of( new Scenario("promoting to ready is unbounded") .addForSender(S1, BLOB, 0, 1, 2, 3, 4, 5, 6)