Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Layered txpool tuning for blob transactions #6940

Merged
merged 15 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
- Expose bad block events via the BesuEvents plugin API [#6848](https://github.com/hyperledger/besu/pull/6848)
- Add RPC errors metric [#6919](https://github.com/hyperledger/besu/pull/6919/)
- Add `rlp decode` subcommand to decode IBFT/QBFT extraData to validator list [#6895](https://github.com/hyperledger/besu/pull/6895)
- Layered txpool tuning for blob transactions [#6940](https://github.com/hyperledger/besu/pull/6940)

### Bug fixes
- Fix txpool dump/restore race condition [#6665](https://github.com/hyperledger/besu/pull/6665)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.hyperledger.besu.cli.util.CommandLineUtils;
import org.hyperledger.besu.config.GenesisConfigOptions;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.TransactionType;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.eth.transactions.ImmutableTransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
Expand All @@ -37,6 +38,7 @@
import java.io.File;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;

import picocli.CommandLine;
Expand Down Expand Up @@ -155,6 +157,8 @@ public class TransactionPoolOptions implements CLIOptions<TransactionPoolConfigu
static class Layered {
private static final String TX_POOL_LAYER_MAX_CAPACITY = "--tx-pool-layer-max-capacity";
private static final String TX_POOL_MAX_PRIORITIZED = "--tx-pool-max-prioritized";
private static final String TX_POOL_MAX_PRIORITIZED_BY_TYPE =
"--tx-pool-max-prioritized-by-type";
private static final String TX_POOL_MAX_FUTURE_BY_SENDER = "--tx-pool-max-future-by-sender";

@CommandLine.Option(
Expand All @@ -175,6 +179,16 @@ static class Layered {
Integer txPoolMaxPrioritized =
TransactionPoolConfiguration.DEFAULT_MAX_PRIORITIZED_TRANSACTIONS;

@CommandLine.Option(
names = {TX_POOL_MAX_PRIORITIZED_BY_TYPE},
paramLabel = "MAP<TYPE,INTEGER>",
split = ",",
description =
"Max number of pending transactions, of a specific type, that are prioritized and thus kept sorted (default: ${DEFAULT-VALUE})",
arity = "1")
Map<TransactionType, Integer> txPoolMaxPrioritizedByType =
TransactionPoolConfiguration.DEFAULT_MAX_PRIORITIZED_TRANSACTIONS_BY_TYPE;
macfarla marked this conversation as resolved.
Show resolved Hide resolved

@CommandLine.Option(
names = {TX_POOL_MAX_FUTURE_BY_SENDER},
paramLabel = MANDATORY_INTEGER_FORMAT_HELP,
Expand Down Expand Up @@ -297,6 +311,8 @@ public static TransactionPoolOptions fromConfig(final TransactionPoolConfigurati
options.layeredOptions.txPoolLayerMaxCapacity =
config.getPendingTransactionsLayerMaxCapacityBytes();
options.layeredOptions.txPoolMaxPrioritized = config.getMaxPrioritizedTransactions();
options.layeredOptions.txPoolMaxPrioritizedByType =
config.getMaxPrioritizedTransactionsByType();
options.layeredOptions.txPoolMaxFutureBySender = config.getMaxFutureBySender();
options.sequencedOptions.txPoolLimitByAccountPercentage =
config.getTxPoolLimitByAccountPercentage();
Expand Down Expand Up @@ -354,6 +370,7 @@ public TransactionPoolConfiguration toDomainObject() {
.minGasPrice(minGasPrice)
.pendingTransactionsLayerMaxCapacityBytes(layeredOptions.txPoolLayerMaxCapacity)
.maxPrioritizedTransactions(layeredOptions.txPoolMaxPrioritized)
.maxPrioritizedTransactionsByType(layeredOptions.txPoolMaxPrioritizedByType)
.maxFutureBySender(layeredOptions.txPoolMaxFutureBySender)
.txPoolLimitByAccountPercentage(sequencedOptions.txPoolLimitByAccountPercentage)
.txPoolMaxSize(sequencedOptions.txPoolMaxSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@

import org.hyperledger.besu.cli.converter.DurationMillisConverter;
import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.TransactionType;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.eth.transactions.ImmutableTransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.util.number.Percentage;

import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -369,6 +372,44 @@ public void eth65TrxAnnouncedBufferingPeriodWithInvalidInputShouldFail2() {
"-1");
}

@Test
public void maxPrioritizedTxsPerType() {
final int maxBlobs = 2;
final int maxFrontier = 200;
internalTestSuccess(
config -> {
assertThat(config.getMaxPrioritizedTransactionsByType().get(TransactionType.BLOB))
.isEqualTo(maxBlobs);
assertThat(config.getMaxPrioritizedTransactionsByType().get(TransactionType.FRONTIER))
.isEqualTo(maxFrontier);
},
"--tx-pool-max-prioritized-by-type",
"BLOB=" + maxBlobs + ",FRONTIER=" + maxFrontier);
}

@Test
public void maxPrioritizedTxsPerTypeConfigFile() throws IOException {
final int maxBlobs = 2;
final int maxFrontier = 200;
final Path tempConfigFilePath =
createTempFile(
"config",
String.format(
"""
tx-pool-max-prioritized-by-type=["BLOB=%s","FRONTIER=%s"]
""",
maxBlobs, maxFrontier));
internalTestSuccess(
config -> {
assertThat(config.getMaxPrioritizedTransactionsByType().get(TransactionType.BLOB))
.isEqualTo(maxBlobs);
assertThat(config.getMaxPrioritizedTransactionsByType().get(TransactionType.FRONTIER))
.isEqualTo(maxFrontier);
},
"--config-file",
tempConfigFilePath.toString());
}

@Override
protected TransactionPoolConfiguration createDefaultDomainObject() {
return TransactionPoolConfiguration.DEFAULT;
Expand Down
1 change: 1 addition & 0 deletions besu/src/test/resources/everything_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ tx-pool-save-file="txpool.dump"
## Layered
tx-pool-layer-max-capacity=12345678
tx-pool-max-prioritized=9876
tx-pool-max-prioritized-by-type=["BLOB=10","FRONTIER=100"]
tx-pool-max-future-by-sender=321
## Legacy/Sequenced
tx-pool-retention-hours=999
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,16 @@ private void initLogForReplay() {
.map(Address::toHexString)
.collect(Collectors.joining(",")))
.log();
// log the max prioritized txs by type
LOG_FOR_REPLAY
.atTrace()
.setMessage("{}")
.addArgument(
() ->
configuration.getMaxPrioritizedTransactionsByType().entrySet().stream()
.map(e -> e.getKey().name() + "=" + e.getValue())
.collect(Collectors.joining(",")))
.log();
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.eth.transactions;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.TransactionType;
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.plugin.services.TransactionPoolValidatorService;
import org.hyperledger.besu.plugin.services.txvalidator.PluginTransactionPoolValidator;
Expand All @@ -24,6 +25,8 @@

import java.io.File;
import java.time.Duration;
import java.util.EnumMap;
import java.util.Map;
import java.util.Set;

import org.immutables.value.Value;
Expand Down Expand Up @@ -71,6 +74,8 @@ enum Implementation {
File DEFAULT_SAVE_FILE = new File(DEFAULT_SAVE_FILE_NAME);
long DEFAULT_PENDING_TRANSACTIONS_LAYER_MAX_CAPACITY_BYTES = 12_500_000L;
int DEFAULT_MAX_PRIORITIZED_TRANSACTIONS = 2000;
EnumMap<TransactionType, Integer> DEFAULT_MAX_PRIORITIZED_TRANSACTIONS_BY_TYPE =
new EnumMap<>(Map.of(TransactionType.BLOB, 6));
int DEFAULT_MAX_FUTURE_BY_SENDER = 200;
Implementation DEFAULT_TX_POOL_IMPLEMENTATION = Implementation.LAYERED;
Set<Address> DEFAULT_PRIORITY_SENDERS = Set.of();
Expand Down Expand Up @@ -148,6 +153,11 @@ default int getMaxPrioritizedTransactions() {
return DEFAULT_MAX_PRIORITIZED_TRANSACTIONS;
}

@Value.Default
default Map<TransactionType, Integer> getMaxPrioritizedTransactionsByType() {
return DEFAULT_MAX_PRIORITIZED_TRANSACTIONS_BY_TYPE;
}

@Value.Default
default int getMaxFutureBySender() {
return DEFAULT_MAX_FUTURE_BY_SENDER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.TransactionType;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
Expand Down Expand Up @@ -87,6 +88,15 @@ protected void internalReplaced(final PendingTransaction replacedTx) {
}

private boolean hasPriority(final PendingTransaction pendingTransaction) {
// check if there is space for that tx type
final var txType = pendingTransaction.getTransaction().getType();
if (txCountByType[txType.ordinal()]
>= poolConfig
.getMaxPrioritizedTransactionsByType()
.getOrDefault(txType, Integer.MAX_VALUE)) {
return false;
}

// if it does not pass the promotion filter, then has not priority
if (!promotionFilter(pendingTransaction)) {
return false;
Expand Down Expand Up @@ -123,10 +133,32 @@ protected void internalRemove(
public List<PendingTransaction> promote(
final Predicate<PendingTransaction> promotionFilter,
final long freeSpace,
final int freeSlots) {
final int freeSlots,
final int[] remainingPromotionsPerType) {
return List.of();
}

/**
* Here the max number of txs of a specific type that can be promoted, is defined by the
* configuration, so we return the difference between the configured max and the current count of
* txs for each type
*
* @return an array containing the max amount of txs that can be promoted for each type
*/
@Override
protected int[] getRemainingPromotionsPerType() {
final var allTypes = TransactionType.values();
final var remainingPromotionsPerType = new int[allTypes.length];
for (int i = 0; i < allTypes.length; i++) {
remainingPromotionsPerType[i] =
poolConfig
.getMaxPrioritizedTransactionsByType()
.getOrDefault(allTypes[i], Integer.MAX_VALUE)
- txCountByType[i];
}
return remainingPromotionsPerType;
}

@Override
public Stream<PendingTransaction> stream() {
return orderByFee.descendingSet().stream();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@
public abstract class AbstractTransactionsLayer implements TransactionsLayer {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTransactionsLayer.class);
private static final NavigableMap<Long, PendingTransaction> EMPTY_SENDER_TXS = new TreeMap<>();
private static final int[] UNLIMITED_PROMOTIONS_PER_TYPE =
new int[TransactionType.values().length];

static {
Arrays.fill(UNLIMITED_PROMOTIONS_PER_TYPE, Integer.MAX_VALUE);
}

protected final TransactionPoolConfiguration poolConfig;
protected final TransactionsLayer nextLayer;
protected final BiFunction<PendingTransaction, PendingTransaction, Boolean>
Expand Down Expand Up @@ -170,7 +177,7 @@ public TransactionAddedResult add(final PendingTransaction pendingTransaction, f

if (!maybeFull()) {
// if there is space try to see if the added tx filled some gaps
tryFillGap(addStatus, pendingTransaction);
tryFillGap(addStatus, pendingTransaction, getRemainingPromotionsPerType());
}

notifyTransactionAdded(pendingTransaction);
Expand Down Expand Up @@ -207,16 +214,21 @@ private boolean maybeFull() {
}

private void tryFillGap(
final TransactionAddedResult addStatus, final PendingTransaction pendingTransaction) {
final TransactionAddedResult addStatus,
final PendingTransaction pendingTransaction,
final int[] remainingPromotionsPerType) {
// it makes sense to fill gaps only if the add is not a replacement and this layer does not
// allow gaps
if (!addStatus.isReplacement() && !gapsAllowed()) {
final PendingTransaction promotedTx =
nextLayer.promoteFor(pendingTransaction.getSender(), pendingTransaction.getNonce());
nextLayer.promoteFor(
pendingTransaction.getSender(),
pendingTransaction.getNonce(),
remainingPromotionsPerType);
if (promotedTx != null) {
processAdded(promotedTx);
if (!maybeFull()) {
tryFillGap(ADDED, promotedTx);
tryFillGap(ADDED, promotedTx, remainingPromotionsPerType);
}
}
}
Expand Down Expand Up @@ -251,22 +263,30 @@ protected abstract void internalNotifyAdded(
final PendingTransaction pendingTransaction);

@Override
public PendingTransaction promoteFor(final Address sender, final long nonce) {
public PendingTransaction promoteFor(
final Address sender, final long nonce, final int[] remainingPromotionsPerType) {
final var senderTxs = txsBySender.get(sender);
if (senderTxs != null) {
long expectedNonce = nonce + 1;
if (senderTxs.firstKey() == expectedNonce) {
final PendingTransaction promotedTx = senderTxs.pollFirstEntry().getValue();
processRemove(senderTxs, promotedTx.getTransaction(), PROMOTED);
metrics.incrementRemoved(promotedTx, "promoted", name());

if (senderTxs.isEmpty()) {
txsBySender.remove(sender);
final var candidateTx = senderTxs.firstEntry().getValue();
final var txType = candidateTx.getTransaction().getType();

if (remainingPromotionsPerType[txType.ordinal()] > 0) {
senderTxs.pollFirstEntry();
processRemove(senderTxs, candidateTx.getTransaction(), PROMOTED);
metrics.incrementRemoved(candidateTx, "promoted", name());

if (senderTxs.isEmpty()) {
txsBySender.remove(sender);
}
--remainingPromotionsPerType[txType.ordinal()];
return candidateTx;
}
return promotedTx;
return null;
}
}
return nextLayer.promoteFor(sender, nonce);
return nextLayer.promoteFor(sender, nonce, remainingPromotionsPerType);
}

private TransactionAddedResult addToNextLayer(
Expand Down Expand Up @@ -425,11 +445,24 @@ final void promoteTransactions() {

if (freeSlots > 0 && freeSpace > 0) {
nextLayer
.promote(this::promotionFilter, cacheFreeSpace(), freeSlots)
.promote(
this::promotionFilter, cacheFreeSpace(), freeSlots, getRemainingPromotionsPerType())
.forEach(this::processAdded);
}
}

/**
* How many txs of a specified type can be promoted? This make sense when a max number of txs of a
* type can be included in a single block (ex. blob txs), to avoid filling the layer with more txs
* than the useful ones. By default, there are no limits, but each layer can define its own
* policy.
*
* @return an array containing the max amount of txs that can be promoted for each type
*/
protected int[] getRemainingPromotionsPerType() {
return Arrays.copyOf(UNLIMITED_PROMOTIONS_PER_TYPE, UNLIMITED_PROMOTIONS_PER_TYPE.length);
}

private void confirmed(final Address sender, final long maxConfirmedNonce) {
final var senderTxs = txsBySender.get(sender);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ private Wei calculateNextBlockBaseFee(final FeeMarket feeMarket, final BlockHead

@Override
protected boolean promotionFilter(final PendingTransaction pendingTransaction) {

// check if the tx is willing to pay at least the base fee
if (nextBlockBaseFee
.map(pendingTransaction.getTransaction().getMaxGasPrice()::lessThan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ public OptionalLong getCurrentNonceFor(final Address sender) {
public List<PendingTransaction> promote(
final Predicate<PendingTransaction> promotionFilter,
final long freeSpace,
final int freeSlots) {
final int freeSlots,
final int[] remainingPromotionsPerType) {
return List.of();
}

Expand Down Expand Up @@ -152,7 +153,8 @@ protected void notifyTransactionDropped(final PendingTransaction pendingTransact
}

@Override
public PendingTransaction promoteFor(final Address sender, final long nonce) {
public PendingTransaction promoteFor(
final Address sender, final long nonce, final int[] remainingPromotionsPerType) {
return null;
}

Expand Down
Loading
Loading