Skip to content

Commit

Permalink
Add threadPool for submitting staking block on-time
Browse files Browse the repository at this point in the history
  • Loading branch information
AionJayT committed Mar 20, 2020
1 parent 016118b commit b1063dd
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@
* to generate new blocks to mine. As for receiving blocks, this class interacts with {@link
* SyncMgr} to manage the importing of blocks from network.
*/
public class AionBlockchainImpl implements IAionBlockchain {
public class AionBlockchainImpl implements IAionBlockchain {

private static final Logger LOG = LoggerFactory.getLogger(LogEnum.CONS.name());
private static final Logger GEN_LOG = LoggerFactory.getLogger(LogEnum.GEN.name());
Expand Down Expand Up @@ -1474,7 +1474,7 @@ private StakingBlock createNewStakingBlock(
blockPreSeal(parentHdr, block);

if (signingPublicKey != null) {
stakingBlockTemplate.put(
stakingBlockTemplate.putIfAbsent(
ByteArrayWrapper.wrap(block.getHeader().getMineHash()), block);
}

Expand Down
7 changes: 7 additions & 0 deletions modAionImpl/src/org/aion/zero/impl/types/StakingBlock.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.aion.zero.impl.types;

import static org.aion.zero.impl.types.StakingBlockHeader.DEFAULT_SIGNATURE;

import com.google.common.annotations.VisibleForTesting;
import java.math.BigInteger;
import java.util.Arrays;
Expand Down Expand Up @@ -442,4 +444,9 @@ public void updateHeaderDifficulty(byte[] diff) {
.withDifficulty(diff)
.build();
}

public boolean isSealed() {
return !Arrays.equals(this.header.getSigningPublicKey(), ByteUtil.EMPTY_WORD)
&& !Arrays.equals(this.header.getSignature(), DEFAULT_SIGNATURE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public class StakingBlockHeader implements BlockHeader {
public static final int PUBKEY_LENGTH = 32;

public static final byte[] GENESIS_SEED = new byte[SEED_LENGTH];

public static final byte[] DEFAULT_SIGNATURE = new byte[SIG_LENGTH];
/**
* private constructor. use builder to construct the header class.
*/
Expand Down Expand Up @@ -659,7 +659,7 @@ public Builder withDefaultExtraData() {
}

public Builder withDefaultSignature() {
signature = new byte[SIG_LENGTH];
signature = DEFAULT_SIGNATURE;
return this;
}

Expand Down
7 changes: 5 additions & 2 deletions modApiServer/src/org/aion/api/server/http/RpcServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public abstract class RpcServer {
protected boolean stuckThreadDetectorEnabled;

private AccountManager accountManager;
private ChainHolder chainHolder;

/**
* to explicitly force any subclasses to check for null values, access to the following
Expand Down Expand Up @@ -61,7 +62,7 @@ protected RpcServer(RpcServerBuilder<?> builder) {
Collections.unmodifiableList(Objects.requireNonNull(builder.disabledMethods));

accountManager = builder.accountManager;
ChainHolder chainHolder = new AionChainHolder(AionImpl.inst(), accountManager);
chainHolder = new AionChainHolder(AionImpl.inst(), accountManager);

rpcProcessor =
new RpcProcessor(enabledEndpoints,
Expand Down Expand Up @@ -108,5 +109,7 @@ protected Optional<Integer> getRequestQueueSize() {

public abstract void start();

public abstract void stop();
public void stop() {
chainHolder.shutDown();
}
}
76 changes: 66 additions & 10 deletions modApiServer/src/org/aion/api/server/rpc3/AionChainHolder.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand All @@ -16,6 +19,7 @@
import org.aion.mcf.blockchain.BlockHeader.BlockSealType;
import org.aion.mcf.db.Repository;
import org.aion.types.AionAddress;
import org.aion.util.bytes.ByteUtil;
import org.aion.zero.impl.blockchain.AionBlockchainImpl;
import org.aion.zero.impl.blockchain.AionImpl;
import org.aion.zero.impl.blockchain.IAionChain;
Expand All @@ -35,6 +39,7 @@ public class AionChainHolder implements ChainHolder {
private final AccountManagerInterface accountManager;
private final FutureBlockRule futureBlockRule;
private final Logger logger;
private final ExecutorService blockSubmitThread;

public AionChainHolder(IAionChain chain,
AccountManagerInterface accountManager) {
Expand All @@ -50,7 +55,8 @@ public AionChainHolder(IAionChain chain,
currentTemplate = new AtomicReference<>(null);
this.accountManager = accountManager;
this.futureBlockRule = new FutureBlockRule();
logger = AionLoggerFactory.getLogger(LogEnum.API.name());
logger = AionLoggerFactory.getLogger(LogEnum.CONS.name());
blockSubmitThread = Executors.newFixedThreadPool(2);
}

@Override
Expand Down Expand Up @@ -96,19 +102,63 @@ public boolean submitSignature(byte[] signature, byte[] sealHash) {
if (!isUnityForkEnabled()) throw new UnsupportedOperationException();
else {
StakingBlock stakingBlock = chain.getBlockchain().getCachingStakingBlockTemplate(sealHash);
stakingBlock.seal(signature, stakingBlock.getHeader().getSigningPublicKey());

//AKI-648 reject the block add into the kernel if the timestamp of the block is in the future.
boolean isValidTimestamp = futureBlockRule.validate(stakingBlock.getHeader(), new ArrayList<>());
logger.debug(
"submitSignature: sig[{}], sealHash[{}], block[{}]",
ByteUtil.toHexString(signature),
ByteUtil.toHexString(sealHash),
stakingBlock);

final boolean sealed = isValidTimestamp && addNewBlock(stakingBlock);
if (!stakingBlock.isSealed()
&& Arrays.equals(sealHash, stakingBlock.getHeader().getMineHash())) {
stakingBlock.seal(signature, stakingBlock.getHeader().getSigningPublicKey());

if (sealed) {
addSealedBlockToPool(stakingBlock);
logSealedBlock(stakingBlock);
}else {
return true;
} else {
logFailedSealedBlock(stakingBlock);
return false;
}
return sealed;
}
}

private void addSealedBlockToPool(StakingBlock stakingBlock) {
try {
blockSubmitThread.submit(
() -> {
long currentTimeMillis = System.currentTimeMillis();
long blockTimeMillis =
TimeUnit.SECONDS.toMillis(stakingBlock.getTimestamp());
long diff = blockTimeMillis - currentTimeMillis;

if (diff > TimeUnit.MINUTES.toMillis(10)) {
logger.info(
"discard the sealed block due to long timestamp(ms): blockTimeStamp[{}], exceed the current time[{}]",
blockTimeMillis,
diff);
return;
} else if (diff > 0) {
logger.info("the block {} runnable sleep(ms): {}", stakingBlock.getNumber(), diff);
try {
Thread.sleep(diff);
} catch (InterruptedException e) {
logger.error(e.toString());
}
}

boolean success = addNewBlock(stakingBlock);

logger.info(
"staking block {} to the blockchain DB <num={}, hash={}, diff={}, tx={}>",
success ? "sealed" : "cannot seal",
stakingBlock.getNumber(),
stakingBlock.getShortHash(),
stakingBlock.getDifficultyBI(),
stakingBlock.getTransactionsList().size());
});
} catch (Exception e) {
logger.error(e.toString());
}
}

Expand Down Expand Up @@ -232,6 +282,12 @@ public List<AionAddress> listAccounts() {
.map(AionAddress::new).collect(Collectors.toUnmodifiableList());
}

@Override
public void shutDown() {
logger.info("rpcChainHolder shutting down.");
blockSubmitThread.shutdownNow();
}

@Override
public AionBlock getBestPOWBlock() {
return this.chain.getBlockchain().getBestMiningBlock();
Expand All @@ -249,7 +305,7 @@ public boolean addressExists(AionAddress address) {

private void logSealedBlock(Block block){
//log that the block was sealed
AionLoggerFactory.getLogger(LogEnum.CONS.toString()).info(
logger.info(
"{} block submitted via api <num={}, hash={}, diff={}, tx={}>",
block.getHeader().getSealType().equals(BlockSealType.SEAL_POW_BLOCK) ? "Mining": "Staking",
block.getNumber(),
Expand All @@ -260,7 +316,7 @@ private void logSealedBlock(Block block){

private void logFailedSealedBlock(Block block){
//log that the block could not be sealed
AionLoggerFactory.getLogger(LogEnum.CONS.toString()).info(
logger.debug(
"Unable to submit {} block via api <num={}, hash={}, diff={}, tx={}>",
block.getHeader().getSealType().equals(BlockSealType.SEAL_POW_BLOCK) ? "mining": "staking",
block.getNumber(),
Expand Down
2 changes: 2 additions & 0 deletions modApiServer/src/org/aion/api/server/rpc3/ChainHolder.java
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,6 @@ public interface ChainHolder {
* @return the list of accounts in the kernel's keystore
*/
List<AionAddress> listAccounts();

void shutDown();
}

0 comments on commit b1063dd

Please sign in to comment.