Skip to content

Commit

Permalink
Merge pull request #4816 from chimp1984/reduce-persistence-interval
Browse files Browse the repository at this point in the history
Fix issues with missing persistence for trade state
  • Loading branch information
ripcurlx authored Nov 19, 2020
2 parents 247c82b + a4db09f commit ce265e4
Show file tree
Hide file tree
Showing 67 changed files with 221 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,25 +143,25 @@ private static void onWriteCompleted(ResultHandler completeHandler,

public enum Source {
// For data stores we received from the network and which could be rebuilt. We store only for avoiding too much network traffic.
NETWORK(1, TimeUnit.HOURS.toSeconds(1), false),
NETWORK(1, TimeUnit.MINUTES.toMillis(5), false),

// For data stores which are created from private local data. This data could only be rebuilt from backup files.
PRIVATE(10, TimeUnit.SECONDS.toSeconds(30), true),
PRIVATE(10, 200, true),

// For data stores which are created from private local data. Loss of that data would not have any critical consequences.
PRIVATE_LOW_PRIO(4, TimeUnit.HOURS.toSeconds(2), false);
// For data stores which are created from private local data. Loss of that data would not have critical consequences.
PRIVATE_LOW_PRIO(4, TimeUnit.MINUTES.toMillis(1), false);


@Getter
private final int numMaxBackupFiles;
@Getter
private final long delayInSec;
private final long delay;
@Getter
private final boolean flushAtShutDown;

Source(int numMaxBackupFiles, long delayInSec, boolean flushAtShutDown) {
Source(int numMaxBackupFiles, long delay, boolean flushAtShutDown) {
this.numMaxBackupFiles = numMaxBackupFiles;
this.delayInSec = delayInSec;
this.delay = delay;
this.flushAtShutDown = flushAtShutDown;
}
}
Expand Down Expand Up @@ -352,7 +352,7 @@ public void requestPersistence() {
timer = UserThread.runAfter(() -> {
persistNow(null);
UserThread.execute(() -> timer = null);
}, source.delayInSec, TimeUnit.SECONDS);
}, source.delay, TimeUnit.MILLISECONDS);
}
}

Expand Down Expand Up @@ -454,7 +454,7 @@ public String toString() {
",\n dir=" + dir +
",\n storageFile=" + storageFile +
",\n persistable=" + persistable +
",\n priority=" + source +
",\n source=" + source +
",\n usedTempFilePath=" + usedTempFilePath +
",\n persistenceRequested=" + persistenceRequested +
"\n}";
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/bisq/core/app/WalletAppSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ void setRejectedTxErrorMessageHandler(Consumer<String> rejectedTxErrorMessageHan
String finalDetails = details;
UserThread.runAfter(() -> {
trade.setErrorMessage(newValue.getMessage());
tradeManager.requestPersistence();
if (rejectedTxErrorMessageHandler != null) {
rejectedTxErrorMessageHandler.accept(Res.get("popup.warning.trade.txRejected",
finalDetails, trade.getShortId(), txId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ protected void onPeerOpenedDisputeMessage(PeerOpenedDisputeMessage peerOpenedDis
if (!storedDisputeOptional.isPresent()) {
disputeList.add(dispute);
trade.setDisputeState(getDisputeStateStartedByPeer());
tradeManager.requestPersistence();
errorMessage = null;
} else {
// valid case if both have opened a dispute and agent was not online.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,10 @@ public void onDisputeResultMessage(DisputeResultMessage disputeResultMessage) {
trade.getDisputeState() == Trade.DisputeState.MEDIATION_STARTED_BY_PEER) {
trade.getProcessModel().setBuyerPayoutAmountFromMediation(disputeResult.getBuyerPayoutAmount().value);
trade.getProcessModel().setSellerPayoutAmountFromMediation(disputeResult.getSellerPayoutAmount().value);
tradeManager.requestPersistence();

trade.setDisputeState(Trade.DisputeState.MEDIATION_CLOSED);

tradeManager.requestPersistence();
}
} else {
Optional<OpenOffer> openOfferOptional = openOfferManager.getOpenOfferById(tradeId);
Expand Down Expand Up @@ -243,6 +244,7 @@ public void onAcceptMediationResult(Trade trade,
DisputeProtocol tradeProtocol = (DisputeProtocol) tradeManager.getTradeProtocol(trade);

trade.setMediationResultState(MediationResultState.MEDIATION_RESULT_ACCEPTED);
tradeManager.requestPersistence();

// If we have not got yet the peers signature we sign and send to the peer our signature.
// Otherwise we sign and complete with the peers signature the payout tx.
Expand All @@ -265,5 +267,6 @@ public void onAcceptMediationResult(Trade trade,

public void rejectMediationResult(Trade trade) {
trade.setMediationResultState(MediationResultState.MEDIATION_RESULT_REJECTED);
tradeManager.requestPersistence();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ public void onDisputeResultMessage(DisputeResultMessage disputeResultMessage) {
if (trade.getDisputeState() == Trade.DisputeState.REFUND_REQUESTED ||
trade.getDisputeState() == Trade.DisputeState.REFUND_REQUEST_STARTED_BY_PEER) {
trade.setDisputeState(Trade.DisputeState.REFUND_REQUEST_CLOSED);
tradeManager.requestPersistence();
}
} else {
Optional<OpenOffer> openOfferOptional = openOfferManager.getOpenOfferById(tradeId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,5 +164,7 @@ public void addSystemMsg(Trade trade) {
trade.getDate().getTime());
chatMessage.setSystemMessage(true);
trade.getChatMessages().add(chatMessage);

requestPersistence();
}
}
7 changes: 3 additions & 4 deletions core/src/main/java/bisq/core/trade/Trade.java
Original file line number Diff line number Diff line change
Expand Up @@ -700,10 +700,6 @@ public void addAndPersistChatMessage(ChatMessage chatMessage) {
}
}

public void appendErrorMessage(String msg) {
errorMessage = errorMessage == null ? msg : errorMessage + "\n" + msg;
}

public boolean mediationResultAppliedPenaltyToSeller() {
// If mediated payout is same or more then normal payout we enable otherwise a penalty was applied
// by mediators and we keep the confirm disabled to avoid that the seller can complete the trade
Expand Down Expand Up @@ -1099,6 +1095,9 @@ public void onFailure(@NotNull Throwable t) {
private void setConfirmedState() {
// we only apply the state if we are not already further in the process
if (!isDepositConfirmed()) {
// As setState is called here from the trade itself we cannot trigger a requestPersistence call.
// But as we get setupConfidenceListener called at startup anyway there is no issue if it would not be
// persisted in case the shutdown routine did not persist the trade.
setState(State.DEPOSIT_CONFIRMED_IN_BLOCK_CHAIN);
}
}
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/java/bisq/core/trade/TradeManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,11 +342,13 @@ private void initPersistedTrades() {
private void initPersistedTrade(Trade trade) {
initTradeAndProtocol(trade, getTradeProtocol(trade));
trade.updateDepositTxFromWallet();
requestPersistence();
}

private void initTradeAndProtocol(Trade trade, TradeProtocol tradeProtocol) {
tradeProtocol.initialize(processModelServiceProvider, this, trade.getOffer());
trade.initialize(processModelServiceProvider);
requestPersistence();
}

public void requestPersistence() {
Expand Down Expand Up @@ -431,6 +433,7 @@ public void onTakeOffer(Coin amount,

((TakerProtocol) tradeProtocol).onTakeOffer();
tradeResultHandler.handleResult(trade);
requestPersistence();
}
},
errorMessageHandler);
Expand Down Expand Up @@ -544,10 +547,13 @@ private void updateTradePeriodState() {
Date halfTradePeriodDate = trade.getHalfTradePeriodDate();
if (maxTradePeriodDate != null && halfTradePeriodDate != null) {
Date now = new Date();
if (now.after(maxTradePeriodDate))
if (now.after(maxTradePeriodDate)) {
trade.setTradePeriodState(Trade.TradePeriodState.TRADE_PERIOD_OVER);
else if (now.after(halfTradePeriodDate))
requestPersistence();
} else if (now.after(halfTradePeriodDate)) {
trade.setTradePeriodState(Trade.TradePeriodState.SECOND_HALF);
requestPersistence();
}
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ public void onAllServicesInitialized() {

public void add(Tradable tradable) {
if (closedTradables.add(tradable)) {
persistenceManager.requestPersistence();
requestPersistence();
}
}

public void remove(Tradable tradable) {
if (closedTradables.remove(tradable)) {
persistenceManager.requestPersistence();
requestPersistence();
}
}

Expand All @@ -117,4 +117,8 @@ public Stream<Trade> getTradesStreamWithFundsLockedIn() {
return getClosedTrades().stream()
.filter(Trade::isFundsLockedIn);
}

private void requestPersistence() {
persistenceManager.requestPersistence();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ public void onAllServicesInitialized() {

public void add(Trade trade) {
if (failedTrades.add(trade)) {
persistenceManager.requestPersistence();
requestPersistence();
}
}

public void removeTrade(Trade trade) {
if (failedTrades.remove(trade)) {
persistenceManager.requestPersistence();
requestPersistence();
}
}

Expand Down Expand Up @@ -129,7 +129,7 @@ public void unFailTrade(Trade trade) {
if (unFailTradeCallback.apply(trade)) {
log.info("Unfailing trade {}", trade.getId());
if (failedTrades.remove(trade)) {
persistenceManager.requestPersistence();
requestPersistence();
}
}
}
Expand All @@ -151,4 +151,8 @@ public String checkUnFail(Trade trade) {
}
return blockingTrades.toString();
}

private void requestPersistence() {
persistenceManager.requestPersistence();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ public void onTakeOffer() {
BuyerAsTakerCreatesDepositTxInputs.class,
TakerSendInputsForDepositTxRequest.class)
.withTimeout(30))
.run(() -> processModel.setTempTradingPeerNodeAddress(trade.getTradingPeerNodeAddress()))
.run(() -> {
processModel.setTempTradingPeerNodeAddress(trade.getTradingPeerNodeAddress());
processModel.getTradeManager().requestPersistence();
})
.executeTasks();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ public void onPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler er
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(event, errorMessage);
})))
.run(() -> trade.setState(Trade.State.BUYER_CONFIRMED_IN_UI_FIAT_PAYMENT_INITIATED))
.run(() -> {
trade.setState(Trade.State.BUYER_CONFIRMED_IN_UI_FIAT_PAYMENT_INITIATED);
processModel.getTradeManager().requestPersistence();
})
.executeTasks();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,13 @@ public FluentProtocol executeTasks() {
NodeAddress peer = condition.getPeer();
if (peer != null) {
tradeProtocol.processModel.setTempTradingPeerNodeAddress(peer);
tradeProtocol.processModel.getTradeManager().requestPersistence();
}

TradeMessage message = condition.getMessage();
if (message != null) {
tradeProtocol.processModel.setTradeMessage(message);
tradeProtocol.processModel.getTradeManager().requestPersistence();
}

TradeTaskRunner taskRunner = setup.getTaskRunner(message, condition.getEvent());
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/bisq/core/trade/protocol/ProcessModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ void setPaymentStartedAckMessage(AckMessage ackMessage) {

public void setPaymentStartedMessageState(MessageState paymentStartedMessageStateProperty) {
this.paymentStartedMessageStateProperty.set(paymentStartedMessageStateProperty);
if (tradeManager != null) {
tradeManager.requestPersistence();
}
}

void setDepositTxSentAckMessage(AckMessage ackMessage) {
Expand All @@ -305,6 +308,9 @@ void setDepositTxSentAckMessage(AckMessage ackMessage) {

public void setDepositTxMessageState(MessageState messageState) {
this.depositTxMessageStateProperty.set(messageState);
if (tradeManager != null) {
tradeManager.requestPersistence();
}
}

void witnessDebugLog(Trade trade) {
Expand Down
12 changes: 10 additions & 2 deletions core/src/main/java/bisq/core/trade/protocol/SellerProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ protected void handle(DelayedPayoutTxSignatureResponse message, NodeAddress peer
///////////////////////////////////////////////////////////////////////////////////////////

protected void handle(CounterCurrencyTransferStartedMessage message, NodeAddress peer) {
expect(phase(Trade.Phase.DEPOSIT_CONFIRMED)
// We are more tolerant with expected phase and allow also DEPOSIT_PUBLISHED as it can be the case
// that the wallet is still syncing and so the DEPOSIT_CONFIRMED state to yet triggered when we received
// a mailbox message with CounterCurrencyTransferStartedMessage.
// TODO A better fix would be to add a listener for the wallet sync state and process
// the mailbox msg once wallet is ready and trade state set.
expect(anyPhase(Trade.Phase.DEPOSIT_CONFIRMED, Trade.Phase.DEPOSIT_PUBLISHED)
.with(message)
.from(peer)
.preCondition(trade.getPayoutTx() == null,
Expand Down Expand Up @@ -141,7 +146,10 @@ public void onPaymentReceived(ResultHandler resultHandler, ErrorMessageHandler e
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(event, errorMessage);
})))
.run(() -> trade.setState(Trade.State.SELLER_CONFIRMED_IN_UI_FIAT_PAYMENT_RECEIPT))
.run(() -> {
trade.setState(Trade.State.SELLER_CONFIRMED_IN_UI_FIAT_PAYMENT_RECEIPT);
processModel.getTradeManager().requestPersistence();
})
.executeTasks();
}

Expand Down
18 changes: 15 additions & 3 deletions core/src/main/java/bisq/core/trade/protocol/TradeProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

import java.security.PublicKey;

import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -76,9 +78,17 @@ protected void onInitialized() {
if (!trade.isWithdrawn()) {
processModel.getP2PService().addDecryptedDirectMessageListener(this);
}
processModel.getP2PService().addDecryptedMailboxListener(this);
processModel.getP2PService().getMailBoxMessages()
.forEach(this::handleDecryptedMessageWithPubKey);

// We delay a bit here as the trade gets updated from the wallet to update the trade
// state (deposit confirmed) and that happens after our method is called.
// TODO To fix that in a better way we would need to change the order of some routines
// from the TradeManager, but as we are close to a release I dont want to risk a bigger
// change and leave that for a later PR
UserThread.runAfter(() -> {
processModel.getP2PService().addDecryptedMailboxListener(this);
processModel.getP2PService().getMailBoxMessages()
.forEach(this::handleDecryptedMessageWithPubKey);
}, 100, TimeUnit.MILLISECONDS);
}

public void onWithdrawCompleted() {
Expand Down Expand Up @@ -297,6 +307,8 @@ protected void startTimeout(long timeoutSec) {
log.error("Timeout reached. TradeID={}, state={}, timeoutSec={}",
trade.getId(), trade.stateProperty().get(), timeoutSec);
trade.setErrorMessage("Timeout reached. Protocol did not complete in " + timeoutSec + " sec.");

processModel.getTradeManager().requestPersistence();
cleanup();
}, timeoutSec);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ protected void run() {
Transaction delayedPayoutTx = checkNotNull(trade.getDelayedPayoutTx());
WalletService.maybeAddSelfTxToWallet(delayedPayoutTx, processModel.getBtcWalletService().getWallet());

processModel.getTradeManager().requestPersistence();

complete();
} catch (Throwable t) {
failed(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ private void applyConfidence(TransactionConfidence confidence) {
if (trade.getPayoutTx() == null) {
Transaction walletTx = processModel.getTradeWalletService().getWalletTx(confidence.getTransactionHash());
trade.setPayoutTx(walletTx);
processModel.getTradeManager().requestPersistence();
BtcWalletService.printTx("payoutTx received from network", walletTx);
setState();
} else {
Expand Down
Loading

0 comments on commit ce265e4

Please sign in to comment.