Skip to content

Commit

Permalink
refined pendingTxCacheV1 for adjusting the object initialization posi…
Browse files Browse the repository at this point in the history
…tion and map get/put
  • Loading branch information
AionJayT committed Feb 25, 2020
1 parent a7ef164 commit 5e3dcd5
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private void backupPendingTx() {
blockchain.getRepository().removeTxBatch(backupPendingPoolRemove, true);
}

for (AionTransaction tx : pendingTxCache.getRemovedTransactionForPoolBackup()) {
for (AionTransaction tx : pendingTxCache.pollRemovedTransactionForPoolBackup()) {
backupPendingCacheRemove.add(tx.getTransactionHash());
}

Expand Down Expand Up @@ -752,7 +752,7 @@ private void clearOutdated(final long blockNumber) {
currentBestBlock.get());
}

List<AionTransaction> clearedTxFromCache = pendingTxCache.getRemovedTransactionForPoolBackup();
List<AionTransaction> clearedTxFromCache = pendingTxCache.pollRemovedTransactionForPoolBackup();
for (AionTransaction tx : clearedTxFromCache) {
removeBackupDBCachedTx(tx.getTransactionHash());
fireTxUpdate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -32,29 +33,28 @@ public final class PendingTxCacheV1 {
public static final int ACCOUNT_CACHE_MAX = 2_000;
public static final int TX_PER_ACCOUNT_MAX = 500;
public static final int CACHE_TIMEOUT = 3_600;
protected static final Logger LOG = AionLoggerFactory.getLogger(LogEnum.TX.name());
private static final Logger LOG = AionLoggerFactory.getLogger(LogEnum.TX.name());
private final LRUMap<AionAddress, SortedMap<BigInteger, AionTransaction>> cacheTxMap;
private final SortedMap<Long, Set<AionTransaction>> timeOutMap;
private final Lock lock = new ReentrantLock();
private List<AionTransaction> removedTransactionForPoolBackup;
private final List<AionTransaction> removedTransactionForPoolBackup;

/** @implNote the default constructor */
PendingTxCacheV1() {
public PendingTxCacheV1() {
cacheTxMap = new LRUMap<>(ACCOUNT_CACHE_MAX);
timeOutMap = new TreeMap<>();
removedTransactionForPoolBackup = null;
}

/**
* @implNote the constructor with poolBackup option
* @implNote the constructor with the backupTransactions option
*
* @param poolBackup the flag to enable/disable the removedTxHash set
* @param backupTransactions the flag to enable/disable the removedTxHash set
*/
public PendingTxCacheV1(boolean poolBackup) {
public PendingTxCacheV1(boolean backupTransactions) {
cacheTxMap = new LRUMap<>(ACCOUNT_CACHE_MAX);
timeOutMap = new TreeMap<>();
if (poolBackup) {
removedTransactionForPoolBackup = new ArrayList<>();
}
removedTransactionForPoolBackup = backupTransactions ? new ArrayList<>() : null;
}

private static long getExpiredTime(long longValue) {
Expand All @@ -69,72 +69,60 @@ private static long getExpiredTime(long longValue) {
public AionTransaction addCacheTx(AionTransaction tx) {
Objects.requireNonNull(tx);

long time = getExpiredTime(tx.getTimeStampBI().longValue());

lock.lock();
try {
long time = getExpiredTime(tx.getTimeStampBI().longValue());
AionAddress sender = tx.getSenderAddress();
if (cacheTxMap.isFull() && !cacheTxMap.containsKey(sender)) {
AionAddress removeAddress = cacheTxMap.firstKey();
Map<BigInteger, AionTransaction> removedTxMap = cacheTxMap.remove(removeAddress);
for (AionTransaction removedTx : removedTxMap.values()) {
removeTxInTimeoutMap(removedTx);
if (removedTransactionForPoolBackup != null) {
removedTransactionForPoolBackup.add(removedTx);
}
addTransactionToRemovedTransactionForPoolBackup(removedTx);
}
}

SortedMap<BigInteger, AionTransaction> cachedTxBySender = cacheTxMap.get(sender);
if (cachedTxBySender == null) {
TreeMap<BigInteger, AionTransaction> newMap = new TreeMap<>();
newMap.put(tx.getNonceBI(), tx);
cacheTxMap.put(sender, newMap);
} else {
if (cachedTxBySender.size() < TX_PER_ACCOUNT_MAX) {
cachedTxBySender.put(tx.getNonceBI(), tx);
cacheTxMap.put(sender, cachedTxBySender);

if (LOG.isTraceEnabled()) {
LOG.trace(
"PendingTx added {}, cachedTxSize:{} by the sender:{}",
tx,
cachedTxBySender.size(),
sender);
}
} else {
LOG.info(
"Cannot add tx:{} into the cache, reached the account cached limit.",
tx);
return null;
}
}
SortedMap<BigInteger, AionTransaction> cachedTxBySender = cacheTxMap.getOrDefault(sender, new TreeMap<>());
if (cachedTxBySender.size() < TX_PER_ACCOUNT_MAX) {
cachedTxBySender.put(tx.getNonceBI(), tx);
cacheTxMap.putIfAbsent(sender, cachedTxBySender);

Set<AionTransaction> txSet = timeOutMap.get(time);
if (txSet == null) {
Set<AionTransaction> newSet = new HashSet<>();
newSet.add(tx);
timeOutMap.put(time, newSet);
LOG.trace(
"PendingTx added {}, cachedTxSize:{} by the sender:{}",
tx,
cachedTxBySender.size(),
sender);
} else {
txSet.add(tx);
LOG.info(
"Cannot add tx:{} into the cache, reached the account cached limit.",
tx);
return null;
}

Set<AionTransaction> txSet = timeOutMap.getOrDefault(time, new HashSet<>());
txSet.add(tx);
timeOutMap.putIfAbsent(time, txSet);

return tx;
} finally {
lock.unlock();
}
}

private void addTransactionToRemovedTransactionForPoolBackup(AionTransaction removedTx) {
if (removedTransactionForPoolBackup == null) {
return;
}

removedTransactionForPoolBackup.add(removedTx);
}

private void removeTxInTimeoutMap(AionTransaction tx) {
long expiredTime = getExpiredTime(tx.getTimeStampBI().longValue());
Set<AionTransaction> set = timeOutMap.get(expiredTime);
if (set != null) {
set.remove(tx);
if (set.isEmpty()) {
timeOutMap.remove(expiredTime);
} else {
timeOutMap.put(expiredTime, set);
}
set.remove(tx);
if (set.isEmpty()) {
timeOutMap.remove(expiredTime);
}
}

Expand All @@ -145,25 +133,21 @@ private void removeTxInTimeoutMap(AionTransaction tx) {
*/
public List<AionTransaction> flush(Map<AionAddress, BigInteger> nonceMap) {
Objects.requireNonNull(nonceMap);
Objects.requireNonNull(nonceMap.entrySet());

List<AionTransaction> txList = new ArrayList<>();
lock.lock();
try {
List<AionTransaction> txList = new ArrayList<>();
for (Entry<AionAddress, BigInteger> e : nonceMap.entrySet()) {
AionAddress address = e.getKey();
SortedMap<BigInteger, AionTransaction> accountCachedTx = cacheTxMap.get(address);
if (accountCachedTx != null) {
BigInteger nonce = e.getValue();
Objects.requireNonNull(nonce);

Map<BigInteger, AionTransaction> flushedTxMap = accountCachedTx.headMap(nonce);
if (!flushedTxMap.isEmpty()) {
for (AionTransaction t : flushedTxMap.values()) {
removeTxInTimeoutMap(t);

if (removedTransactionForPoolBackup != null) {
removedTransactionForPoolBackup.add(t);
}
for (AionTransaction tx : flushedTxMap.values()) {
removeTxInTimeoutMap(tx);
addTransactionToRemovedTransactionForPoolBackup(tx);
}

txList.addAll(flushedTxMap.values());
Expand All @@ -187,7 +171,7 @@ public List<AionTransaction> flush(Map<AionAddress, BigInteger> nonceMap) {
}

private List<AionTransaction> flushTimeoutTx() {
List<AionTransaction> txList = new ArrayList<>();
List<AionTransaction> timeoutTransactions = new ArrayList<>();
long current = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
Map<Long, Set<AionTransaction>> timeoutTxs = timeOutMap.headMap(current);
if (!timeoutTxs.isEmpty()) {
Expand All @@ -198,25 +182,24 @@ private List<AionTransaction> flushTimeoutTx() {
SortedMap<BigInteger, AionTransaction> map = cacheTxMap.get(sender);
BigInteger nonce = tx.getNonceBI();
if (map.containsKey(nonce)) {
txList.add(map.remove(nonce));
cacheTxMap.put(sender, map);
timeoutTransactions.add(map.remove(nonce));
}
}
}
}
}

return txList;
return timeoutTransactions;
}

/**
* @implNote get total transactions have been cached.
* @return the total transaction numbers.
*/
public int cacheTxSize() {
int size = 0;
lock.lock();
try {
int size = 0;
for (Map<BigInteger, AionTransaction> accountMap : cacheTxMap.values()) {
size += accountMap.values().size();
}
Expand Down Expand Up @@ -279,22 +262,22 @@ public Map<BigInteger, AionTransaction> getCacheTxBySender(AionAddress sender) {
}

/**
* @implNote get the list of the transactions have been removed in the cache instance and clear the
* @implNote poll the list of the transactions have been removed in the cache instance and clear the
* removedList
* @return the list of the transaction
*/
public List<AionTransaction> getRemovedTransactionForPoolBackup() {
if (removedTransactionForPoolBackup != null) {
lock.lock();
try {
public List<AionTransaction> pollRemovedTransactionForPoolBackup() {
lock.lock();
try {
if (removedTransactionForPoolBackup == null) {
return Collections.emptyList();
} else {
List<AionTransaction> removedTx = new ArrayList<>(removedTransactionForPoolBackup);
removedTransactionForPoolBackup.clear();
return removedTx;
} finally {
lock.unlock();
}
} else {
return new ArrayList<>();
} finally {
lock.unlock();
}
}

Expand All @@ -305,16 +288,17 @@ public List<AionTransaction> getRemovedTransactionForPoolBackup() {
*/
public List<AionTransaction> getNewPendingTransactions(Map<AionAddress, BigInteger> nonceMap) {
Objects.requireNonNull(nonceMap);
Objects.requireNonNull(nonceMap.entrySet());

List<AionTransaction> txList = new ArrayList<>();
lock.lock();

try {
List<AionTransaction> txList = new ArrayList<>();
for (Entry<AionAddress, BigInteger> e : nonceMap.entrySet()) {
AionAddress address = e.getKey();
SortedMap<BigInteger, AionTransaction> accountCachedTx = cacheTxMap.get(address);
if (accountCachedTx != null) {
BigInteger nonce = e.getValue();
Objects.requireNonNull(nonce);

while (accountCachedTx.containsKey(nonce)) {
txList.add(accountCachedTx.get(nonce));
Expand Down Expand Up @@ -344,8 +328,8 @@ public void removeTransaction(AionAddress sender, BigInteger nonce) {
LOG.debug("remove cachedTransaction: sender:{}, nonce:{}", sender, nonce);
Map<BigInteger, AionTransaction> accountInfo = cacheTxMap.get(sender);
AionTransaction removedTx = accountInfo.remove(nonce);
if (removedTx != null && removedTransactionForPoolBackup != null) {
removedTransactionForPoolBackup.add(removedTx);
if (removedTx != null) {
addTransactionToRemovedTransactionForPoolBackup(removedTx);
}

if (accountInfo.isEmpty()) {
Expand All @@ -355,6 +339,5 @@ public void removeTransaction(AionAddress sender, BigInteger nonce) {
} finally{
lock.unlock();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public void fullFlush2SendersUnderFullCachedInstanceTest() {
@Test
public void getRemovedTxHashWithoutPoolBackupTest() {
PendingTxCacheV1 cache = new PendingTxCacheV1();
assertNotNull(cache.getRemovedTransactionForPoolBackup());
assertNotNull(cache.pollRemovedTransactionForPoolBackup());
}

@Test
Expand All @@ -288,7 +288,7 @@ public void getRemovedTxHashWithPoolBackupTest() {
new ArrayList<>(cache.getCacheTxBySender(new AionAddress(key.get(0).getAddress())).values());
assertEquals(8, cachedTxs.size());

List<AionTransaction> removedTxHash = cache.getRemovedTransactionForPoolBackup();
List<AionTransaction> removedTxHash = cache.pollRemovedTransactionForPoolBackup();
assertEquals(2, removedTxHash.size());
for (int i = 0; i < removedTxHash.size(); i++) {
assertEquals(newCache.get(i), removedTxHash.get(i));
Expand Down Expand Up @@ -317,13 +317,13 @@ public void clearRemovedTxHashForPoolBackupTest() {
new ArrayList<>(cache.getCacheTxBySender(new AionAddress(key.get(0).getAddress())).values());
assertEquals(8, cachedTxs.size());

List<AionTransaction> removedTxHash = cache.getRemovedTransactionForPoolBackup();
List<AionTransaction> removedTxHash = cache.pollRemovedTransactionForPoolBackup();
assertEquals(2, removedTxHash.size());
for (int i = 0; i < removedTxHash.size(); i++) {
assertEquals(newCache.get(i), removedTxHash.get(i));
}

assertEquals(0, cache.getRemovedTransactionForPoolBackup().size());
assertEquals(0, cache.pollRemovedTransactionForPoolBackup().size());
}

@Test
Expand Down

0 comments on commit 5e3dcd5

Please sign in to comment.