Skip to content

Commit

Permalink
refactoring addPendingTransactions to small code pieces
Browse files Browse the repository at this point in the history
  • Loading branch information
AionJayT committed Feb 28, 2020
1 parent 1e2741d commit 32215da
Showing 1 changed file with 85 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -286,95 +286,45 @@ private List<TxResponse> addPendingTransactions(
List<AionTransaction> newLargeNonceTx = new ArrayList<>();
List<TxResponse> txResponses = new ArrayList<>();

int fetchLimit = calculateTxFetchNumberLimit();
for (AionTransaction tx : transactions) {
BigInteger bestPendingStateNonce = bestPendingStateNonce(tx.getSenderAddress());
int cmp = tx.getNonceBI().compareTo(bestPendingStateNonce);

TxResponse response;
if (pendingTxCache.isInCache(tx.getSenderAddress(), tx.getNonceBI())) {
txResponses.add(TxResponse.ALREADY_CACHED);
response = TxResponse.ALREADY_CACHED;
} else {
// This case happens when we have already received a tx with a larger nonce
// from the address txFrom
if (cmp > 0) {
TxResponse response = addTransactionToCache(tx, false);
txResponses.add(response);

if (response.equals(TxResponse.CACHED_NONCE)) {
newLargeNonceTx.add(tx);
}
}
// This case happens when this transaction has been received before, but was
// cached due to the non large nonce or the transaction is temporary full.
else if (cmp == 0) {
if (txPool.isFull()) {
TxResponse response = addTransactionToCache(tx, true);
txResponses.add(response);
BigInteger bestPendingStateNonce = bestPendingStateNonce(tx.getSenderAddress());
int cmp = tx.getNonceBI().compareTo(bestPendingStateNonce);

boolean isFutureNonce = (cmp > 0);
boolean isBestPendingNonce = (cmp == 0);
boolean isSealed = (tx.getNonceBI().compareTo(bestRepoNonce(tx.getSenderAddress())) < 0);

if (response.equals(TxResponse.CACHED_POOLMAX)) {
newLargeNonceTx.add(tx);
}
if (isFutureNonce) {
response = processTxToCachePool(tx, false, newLargeNonceTx);
} else if (isBestPendingNonce) {
if (txPool.isFull()) {
response = processTxToCachePool(tx, true, newLargeNonceTx);
} else {
Map<BigInteger, AionTransaction> cachedTxWithSender = pendingTxCache.getCacheTxBySender(tx.getSenderAddress());

if (cachedTxWithSender == null) {
TxResponse response = addPendingTransactionInner(tx);
txResponses.add(response);

if (response.equals(TxResponse.SUCCESS)) {
newPending.add(tx);
addPendingTxToBackupDatabase(tx);
}
} else {
LOGGER_TX.debug(
"add Transaction from cache, sender: {}, size: {}",
tx.getSenderAddress(),
cachedTxWithSender.size());

TxResponse response = addPendingTransactionInner(tx);
txResponses.add(response);

if (response.equals(TxResponse.SUCCESS)) {
newPending.add(tx);
addPendingTxToBackupDatabase(tx);

int limit = calculateTxFetchNumberLimit();
BigInteger newCachedTxNonce = tx.getNonceBI().add(BigInteger.ONE);
AionTransaction newCachedTx = cachedTxWithSender.get(newCachedTxNonce);
while (response.equals(TxResponse.SUCCESS)
&& newCachedTx != null
&& limit-- > 0
&& !txPool.isFull()) {

LOGGER_TX.debug("add Transaction from cache, sender: {}, nonce: {}", tx.getSenderAddress(), newCachedTxNonce);

response = addPendingTransactionInner(tx);
if (response.equals(TxResponse.SUCCESS)) {
newPending.add(tx);
addPendingTxToBackupDatabase(tx);
newCachedTxNonce = newCachedTxNonce.add(BigInteger.ONE);
newCachedTx = cachedTxWithSender.get(newCachedTxNonce);
}
}
}
}
response = processTxToTxPool(tx, fetchLimit, newPending);
}
}
else if (tx.getNonceBI().compareTo(bestRepoNonce(tx.getSenderAddress())) < 0) {
// This should mean that the transaction has already been sealed in the repo
txResponses.add(TxResponse.ALREADY_SEALED);
} else if (isSealed) {
response = TxResponse.ALREADY_SEALED;
} else {
// This case happens when this tx was received before, but never sealed,
// typically because of low energy
TxResponse implResponse = addPendingTransactionInner(tx);
if (implResponse.equals(TxResponse.SUCCESS)) {
newPending.add(tx);
txResponses.add(TxResponse.REPAID);
response = TxResponse.REPAID;
addPendingTxToBackupDatabase(tx);
} else {
txResponses.add(implResponse);
response = implResponse;
}
}
}

txResponses.add(response);
LOGGER_TX.debug("add tx [{}], result:[{}]", tx, response.getMessage());
}

LOGGER_TX.info(
Expand All @@ -384,18 +334,72 @@ else if (tx.getNonceBI().compareTo(bestRepoNonce(tx.getSenderAddress())) < 0) {
newLargeNonceTx.size(),
txPool.size());

executeCallback(newPending, newLargeNonceTx);

return txResponses;
}

private void executeCallback(List<AionTransaction> newPending, List<AionTransaction> newLargeNonceTx) {
if (!newPending.isEmpty()) {
pendingTxCallback.pendingTxReceivedCallback(newPending);
pendingTxReceivedforMining.set(true);
}

if (!testingMode && (!newPending.isEmpty() || !newLargeNonceTx.isEmpty())) {
transactionBroadcastCallback.broadcastTransactions(
Stream.concat(newPending.stream(), newLargeNonceTx.stream())
.collect(Collectors.toList()));
Stream.concat(newPending.stream(), newLargeNonceTx.stream())
.collect(Collectors.toList()));
}
}

return txResponses;
private TxResponse processTxToTxPool(AionTransaction tx, int fetchLimit, List<AionTransaction> newPending) {

TxResponse response = addPendingTransactionInner(tx);

if (response.equals(TxResponse.SUCCESS)) {
newPending.add(tx);
addPendingTxToBackupDatabase(tx);

Map<BigInteger, AionTransaction> cachedTxWithSender = pendingTxCache.getCacheTxBySender(tx.getSenderAddress());
if (cachedTxWithSender != null) {
LOGGER_TX.debug(
"add Transaction from cache, sender: {}, size: {}",
tx.getSenderAddress(),
cachedTxWithSender.size());

BigInteger newCachedTxNonce = tx.getNonceBI().add(BigInteger.ONE);
AionTransaction newCachedTx = cachedTxWithSender.get(newCachedTxNonce);
while (response.equals(TxResponse.SUCCESS)
&& newCachedTx != null
&& fetchLimit-- > 0
&& !txPool.isFull()) {

LOGGER_TX.debug("add Transaction from cache, sender: {}, nonce: {}", newCachedTx.getSenderAddress(), newCachedTxNonce);

response = addPendingTransactionInner(newCachedTx);
if (response.equals(TxResponse.SUCCESS)) {
newPending.add(newCachedTx);
addPendingTxToBackupDatabase(newCachedTx);
pendingTxCache.removeTransaction(newCachedTx.getSenderAddress(), newCachedTxNonce);

newCachedTxNonce = newCachedTxNonce.add(BigInteger.ONE);
newCachedTx = cachedTxWithSender.get(newCachedTxNonce);
}
}
}
}

return response;
}

private TxResponse processTxToCachePool(AionTransaction tx, boolean isPoolFull, List<AionTransaction> newLargeNonceTx) {
TxResponse response = addTransactionToCache(tx, isPoolFull);

if (response.equals(TxResponse.CACHED_NONCE) || response.equals(TxResponse.CACHED_POOLMAX)) {
newLargeNonceTx.add(tx);
}

return response;
}

private void addPendingTxToBackupDatabase(AionTransaction tx) {
Expand All @@ -410,6 +414,12 @@ private void removeBackupDBPendingTx(byte[] txHash) {
}
}

/**
* For fetch the cached transaction into the txPool, we want to fetch the transactions from each
* cached account equally, therefore, we use this formula to limit how many transactions can be fetched
* to the txPool in each transaction execution.
* @return the max transaction number of each account should fetch from the cachePool.
*/
private int calculateTxFetchNumberLimit() {
int cachedAccount = pendingTxCache.getCacheTxAccount().size();
return cachedAccount == 0 ? 1 : Math.max((txPool.maxPoolSize / 4) / cachedAccount, 1);
Expand All @@ -418,32 +428,15 @@ private int calculateTxFetchNumberLimit() {
private TxResponse addTransactionToCache(AionTransaction tx, boolean transactionPoolIsFull) {

if (pendingTxCache.addCacheTx(tx) == null) {
LOGGER_TX.debug(
"addPendingTransactions failed due to the account cache is full: from: {}, nonce: {}",
tx.getSenderAddress(),
tx.getNonceBI());

return TxResponse.CACHED_ACCOUNTMAX;
} else {
if (poolBackUpEnable) {
backupPendingCacheAdd.put(tx.getTransactionHash(), tx.getEncoded());
}

if (transactionPoolIsFull) {
LOGGER_TX.debug(
"addPendingTransactions addToCache due to poolMax: from: {}, nonce: {}",
tx.getSenderAddress(),
tx.getNonceBI());

// Transaction cached because the pool is full
return TxResponse.CACHED_POOLMAX;
} else {
LOGGER_TX.debug(
"addPendingTransactions addToCache due to largeNonce: from: {}, nonce: {}",
tx.getSenderAddress(),
tx.getNonceBI());

// Transaction cached due to large nonce
return TxResponse.CACHED_NONCE;
}
}
Expand Down

0 comments on commit 32215da

Please sign in to comment.