diff --git a/client/asset/eth/eth.go b/client/asset/eth/eth.go index 431f6ae4e4..d3cc2dc1be 100644 --- a/client/asset/eth/eth.go +++ b/client/asset/eth/eth.go @@ -7,7 +7,6 @@ import ( "bytes" "context" "crypto/sha256" - "encoding/binary" "encoding/hex" "encoding/json" "errors" @@ -26,7 +25,6 @@ import ( "time" "decred.org/dcrdex/client/asset" - "decred.org/dcrdex/client/asset/kvdb" "decred.org/dcrdex/dex" "decred.org/dcrdex/dex/config" "decred.org/dcrdex/dex/encode" @@ -349,93 +347,6 @@ type txPoolFetcher interface { pendingTransactions() ([]*types.Transaction, error) } -// monitoredTx is used to keep track of redemption transactions that have not -// yet been confirmed. If a transaction has to be replaced due to the fee -// being too low or another transaction being mined with the same nonce, -// the replacement transaction's ID is recorded in the replacementTx field. -// replacedTx is used to maintain a doubly linked list, which allows deletion -// of transactions that were replaced after a transaction is confirmed. -type monitoredTx struct { - tx *types.Transaction - blockSubmitted uint64 - - // This mutex must be held during the entire process of confirming - // a transaction. This is to avoid confirmations of the same - // transactions happening concurrently resulting in more than one - // replacement for the same transaction. - mtx sync.Mutex - replacementTx *common.Hash - // replacedTx could be set when the tx is created, be immutable, and not - // need the mutex, but since Redeem doesn't know if the transaction is a - // replacement or a new one, this variable is set in recordReplacementTx. - replacedTx *common.Hash - errorsBroadcasted uint16 -} - -// MarshalBinary marshals a monitoredTx into a byte array. -// It satisfies the encoding.BinaryMarshaler interface for monitoredTx. -func (m *monitoredTx) MarshalBinary() (data []byte, err error) { - b := encode.BuildyBytes{0} - txB, err := m.tx.MarshalBinary() - if err != nil { - return nil, fmt.Errorf("error marshaling tx: %v", err) - } - b = b.AddData(txB) - - blockB := make([]byte, 8) - binary.BigEndian.PutUint64(blockB, m.blockSubmitted) - b = b.AddData(blockB) - - if m.replacementTx != nil { - replacementTxHash := m.replacementTx[:] - b = b.AddData(replacementTxHash) - } - - return b, nil -} - -// UnmarshalBinary loads a data from a marshalled byte array into a -// monitoredTx. -func (m *monitoredTx) UnmarshalBinary(data []byte) error { - ver, pushes, err := encode.DecodeBlob(data) - if err != nil { - return err - } - if ver != 0 { - return fmt.Errorf("unknown version %d", ver) - } - if len(pushes) != 2 && len(pushes) != 3 { - return fmt.Errorf("wrong number of pushes %d", len(pushes)) - } - m.tx = &types.Transaction{} - if err := m.tx.UnmarshalBinary(pushes[0]); err != nil { - return fmt.Errorf("error reading tx: %w", err) - } - - m.blockSubmitted = binary.BigEndian.Uint64(pushes[1]) - - if len(pushes) == 3 { - var replacementTxHash common.Hash - copy(replacementTxHash[:], pushes[2]) - m.replacementTx = &replacementTxHash - } - - return nil -} - -// pendingTx is used to track unconfirmed transactions that should be considered -// for balance calculations, for node types that don't support viewing txpool -// transactions directly. -type pendingTx struct { - assetID uint32 - out uint64 // eth or token - in uint64 // eth or token - maxFees uint64 // eth - nonce uint64 - stamp time.Time - lastCheck uint64 // block height -} - type pendingApproval struct { txHash common.Hash onConfirm func() @@ -462,6 +373,8 @@ var _ asset.DynamicSwapper = (*ETHWallet)(nil) var _ asset.DynamicSwapper = (*TokenWallet)(nil) var _ asset.Authenticator = (*ETHWallet)(nil) var _ asset.TokenApprover = (*TokenWallet)(nil) +var _ asset.WalletHistorian = (*ETHWallet)(nil) +var _ asset.WalletHistorian = (*TokenWallet)(nil) type baseWallet struct { // The asset subsystem starts with Connect(ctx). This ctx will be initialized @@ -496,13 +409,9 @@ type baseWallet struct { monitoredTxsMtx sync.RWMutex monitoredTxs map[common.Hash]*monitoredTx - monitoredTxDB kvdb.KeyValueDB - pendingTxMtx sync.RWMutex - pendingTxs map[common.Hash]*pendingTx - // We could store pending txs to a database too, so that we can track these - // through restarts, but these are only used for balance calcs and are not - // as critical as monitoredTxs. + pendingTxsMtx sync.RWMutex + pendingTxs map[uint64]*extendedWalletTx // nonce -> tx // nonceSendMtx should be locked for the node.txOpts -> tx send sequence // for all txs, to ensure nonce correctness. @@ -512,6 +421,11 @@ type baseWallet struct { sync.Mutex m map[uint32]*cachedBalance } + + txDB txDB + // All processes which may write to txDB must add an entry to this + // WaitGroup, and they must all complete before the db is closed. + txDbWg sync.WaitGroup } // assetWallet is a wallet backend for Ethereum and Eth tokens. The backend is @@ -554,7 +468,7 @@ type assetWallet struct { evmify func(uint64) *big.Int atomize func(*big.Int) uint64 - // pendingTxCheckBal is protected by the pendingTxMtx. We use this field + // pendingTxCheckBal is protected by the pendingTxsMtx. We use this field // as a secondary check to see if we need to request confirmations for // pending txs, since tips are cached for up to 10 seconds. We check the // status of pending txs if the tip has changed OR if the balance has @@ -769,7 +683,6 @@ func NewEVMWallet(cfg *EVMWalletConfig) (w *ETHWallet, err error) { if gasFeeLimit == 0 { gasFeeLimit = defaultGasFeeLimit } - eth := &baseWallet{ net: cfg.Net, baseChainID: cfg.BaseChainID, @@ -784,7 +697,7 @@ func NewEVMWallet(cfg *EVMWalletConfig) (w *ETHWallet, err error) { gasFeeLimitV: gasFeeLimit, wallets: make(map[uint32]*assetWallet), monitoredTxs: make(map[common.Hash]*monitoredTx), - pendingTxs: make(map[common.Hash]*pendingTx), + pendingTxs: make(map[uint64]*extendedWalletTx), // Can be empty multiBalanceAddress: cfg.MultiBalAddress, } @@ -849,32 +762,10 @@ func getWalletDir(dataDir string, network dex.Network) string { func (w *ETHWallet) shutdown() { w.node.shutdown() - if err := w.monitoredTxDB.Close(); err != nil { - w.log.Errorf("error closing tx db: %v", err) - } -} - -// loadMonitoredTxs takes all of the monitored tx from the db and puts them -// into an in memory map. -func loadMonitoredTxs(db kvdb.KeyValueDB) (map[common.Hash]*monitoredTx, error) { - monitoredTxs := make(map[common.Hash]*monitoredTx) - - if err := db.ForEach(func(k, v []byte) error { - var h common.Hash - copy(h[:], k) - - txRec := &monitoredTx{} - if err := txRec.UnmarshalBinary(v); err != nil { - return err - } - - monitoredTxs[h] = txRec - return nil - }); err != nil { - return nil, fmt.Errorf("failed to load txs to monitor: %w", err) + w.txDbWg.Wait() + if err := w.txDB.close(); err != nil { + w.log.Errorf("error closing tx history db: %v", err) } - - return monitoredTxs, nil } // Connect connects to the node RPC server. Satisfies dex.Connector. @@ -933,13 +824,18 @@ func (w *ETHWallet) Connect(ctx context.Context) (_ *sync.WaitGroup, err error) } } - db, err := kvdb.NewFileDB(filepath.Join(w.dir, "tx.db"), w.log.SubLogger("TXDB")) + w.txDB, err = newBadgerTxDB(filepath.Join(w.dir, "tx.db"), w.log.SubLogger("TXDB")) + if err != nil { + return nil, err + } + w.txDbWg = sync.WaitGroup{} + + w.monitoredTxs, err = w.txDB.getMonitoredTxs() if err != nil { return nil, err } - w.monitoredTxDB = db - w.monitoredTxs, err = loadMonitoredTxs(w.monitoredTxDB) + w.pendingTxs, err = w.txDB.getPendingTxs() if err != nil { return nil, err } @@ -960,12 +856,22 @@ func (w *ETHWallet) Connect(ctx context.Context) (_ *sync.WaitGroup, err error) w.connected.Store(true) var wg sync.WaitGroup + + wg.Add(1) + w.txDbWg.Add(1) + go func() { + defer wg.Done() + defer w.txDbWg.Done() + w.txDB.run(ctx) + }() + wg.Add(1) go func() { defer wg.Done() w.monitorBlocks(ctx) w.shutdown() }() + wg.Add(1) go func() { defer wg.Done() @@ -1004,6 +910,7 @@ func (w *TokenWallet) Connect(ctx context.Context) (*sync.WaitGroup, error) { w.connected.Store(false) } }() + return &wg, nil } @@ -2049,7 +1956,7 @@ func (w *ETHWallet) Swap(swaps *asset.Swaps) ([]asset.Receipt, asset.Coin, uint6 } txHash := tx.Hash() - w.addPendingTx(w.assetID, txHash, tx.Nonce(), swapVal, 0, fees) + w.addToTxHistory(tx.Nonce(), -int64(swapVal), swaps.FeeRate*gasLimit, 0, w.assetID, txHash[:], asset.Swap) receipts := make([]asset.Receipt, 0, n) for _, swap := range swaps.Contracts { @@ -2140,7 +2047,7 @@ func (w *TokenWallet) Swap(swaps *asset.Swaps) ([]asset.Receipt, asset.Coin, uin contractAddr := w.netToken.SwapContracts[swaps.Version].Address.String() txHash := tx.Hash() - w.addPendingTx(w.assetID, txHash, tx.Nonce(), swapVal, 0, fees) + w.addToTxHistory(tx.Nonce(), -int64(swapVal), swaps.FeeRate*gasLimit, 0, w.assetID, txHash[:], asset.Swap) receipts := make([]asset.Receipt, 0, n) for _, swap := range swaps.Contracts { @@ -2313,7 +2220,7 @@ func (w *assetWallet) Redeem(form *asset.RedeemForm, feeWallet *assetWallet, non } txHash := tx.Hash() - w.addPendingTx(w.assetID, txHash, tx.Nonce(), 0, redeemedValue, gasFeeCap*gasLimit) + w.addToTxHistory(tx.Nonce(), int64(redeemedValue), gasFeeCap*gasLimit, 0, w.assetID, txHash[:], asset.Redeem) txs := make([]dex.Bytes, len(form.Redemptions)) for i := range txs { @@ -2389,7 +2296,8 @@ func (w *assetWallet) approveToken(amount *big.Int, maxFeeRate, gasLimit uint64, w.log.Infof("Approval sent for %s at token address %s, nonce = %s, txID = %s", dex.BipIDSymbol(w.assetID), c.tokenAddress(), txOpts.Nonce, tx.Hash().Hex()) - w.addPendingTx(w.assetID, tx.Hash(), txOpts.Nonce.Uint64(), 0, 0, maxFeeRate*gasLimit) + txHash := tx.Hash() + w.addToTxHistory(tx.Nonce(), 0, maxFeeRate*gasLimit, 0, w.assetID, txHash[:], asset.ApproveToken) return nil }) @@ -2805,27 +2713,6 @@ func (w *assetWallet) ContractLockTimeExpired(ctx context.Context, contract dex. return expired, swap.LockTime, nil } -func (eth *baseWallet) addPendingTx(assetID uint32, txHash common.Hash, nonce, out, in, fees uint64) { - // We don't track pending txs locally if we have access to txpool. - if _, is := eth.node.(txPoolFetcher); is { - return - } - eth.tipMtx.RLock() - tip := eth.currentTip.Number.Uint64() - eth.tipMtx.RUnlock() - eth.pendingTxMtx.Lock() - eth.pendingTxs[txHash] = &pendingTx{ - assetID: assetID, - out: out, - in: in, - maxFees: fees, - nonce: nonce, - stamp: time.Now(), - lastCheck: tip, - } - eth.pendingTxMtx.Unlock() -} - // findRedemptionResult is used internally for queued findRedemptionRequests. type findRedemptionResult struct { err error @@ -2992,7 +2879,8 @@ func (w *assetWallet) Refund(_, contract dex.Bytes, feeRate uint64) (dex.Bytes, } txHash := tx.Hash() - w.addPendingTx(w.assetID, txHash, tx.Nonce(), 0, dexeth.WeiToGwei(swap.Value), fees) + refundValue := dexeth.WeiToGwei(swap.Value) + w.addToTxHistory(tx.Nonce(), int64(refundValue), fees, 0, w.assetID, txHash[:], asset.Refund) return txHash[:], nil } @@ -3266,8 +3154,10 @@ func (w *ETHWallet) Send(addr string, value, _ uint64) (asset.Coin, error) { if err != nil { return nil, err } + txHash := tx.Hash() - w.addPendingTx(w.assetID, txHash, tx.Nonce(), value, 0, maxFee) + w.addToTxHistory(tx.Nonce(), -int64(value), maxFee, 0, w.assetID, txHash[:], asset.Send) + return &coin{id: txHash, value: value}, nil } @@ -3288,8 +3178,10 @@ func (w *TokenWallet) Send(addr string, value, _ uint64) (asset.Coin, error) { if err != nil { return nil, err } + txHash := tx.Hash() - w.addPendingTx(w.assetID, txHash, tx.Nonce(), value, 0, maxFee) + w.addToTxHistory(tx.Nonce(), -int64(value), maxFee, 0, w.assetID, txHash[:], asset.Send) + return &coin{id: txHash, value: value}, nil } @@ -3578,7 +3470,10 @@ func (eth *ETHWallet) checkForNewBlocks(ctx context.Context) { w.emit.TipChange(bestHdr.Number.Uint64()) } }() + + eth.txDbWg.Add(1) go func() { + defer eth.txDbWg.Done() for _, w := range connectedWallets { w.checkFindRedemptions() } @@ -3588,6 +3483,12 @@ func (eth *ETHWallet) checkForNewBlocks(ctx context.Context) { w.checkPendingApprovals() } }() + + eth.txDbWg.Add(1) + go func() { + defer eth.txDbWg.Done() + eth.checkPendingTxs() + }() } // getLatestMonitoredTx looks up a txHash in the monitoredTxs map. If the @@ -3624,7 +3525,7 @@ func (w *assetWallet) getLatestMonitoredTx(txHash common.Hash) (*monitoredTx, er func (w *assetWallet) recordReplacementTx(originalTx *monitoredTx, replacementHash common.Hash) error { originalTx.replacementTx = &replacementHash originalHash := originalTx.tx.Hash() - if err := w.monitoredTxDB.Store(originalHash[:], originalTx); err != nil { + if err := w.txDB.storeMonitoredTx(originalHash, originalTx); err != nil { return fmt.Errorf("error recording replacement tx: %v", err) } @@ -3638,7 +3539,7 @@ func (w *assetWallet) recordReplacementTx(originalTx *monitoredTx, replacementHa replacementTx.mtx.Lock() defer replacementTx.mtx.Unlock() replacementTx.replacedTx = &originalHash - if err := w.monitoredTxDB.Store(replacementHash[:], replacementTx); err != nil { + if err := w.txDB.storeMonitoredTx(replacementHash, replacementTx); err != nil { return fmt.Errorf("error recording replaced tx: %v", err) } @@ -3679,10 +3580,12 @@ func (w *assetWallet) clearMonitoredTx(tx *monitoredTx) { defer w.monitoredTxsMtx.Unlock() txsToDelete := w.txsToDelete(tx) - for _, hash := range txsToDelete { - if err := w.monitoredTxDB.Delete(hash[:]); err != nil { - w.log.Errorf("failed to delete monitored tx: %v", err) - } + err := w.txDB.removeMonitoredTxs(txsToDelete) + if err != nil { + w.log.Errorf("Error removing monitored txs: %v", err) + // Don't remove these txs from the memory map, so that the removal + // from the db can be attempted again. + return } // Delete from the database immediately, but keep in the memory map a bit @@ -3711,7 +3614,7 @@ func (w *assetWallet) monitorTx(tx *types.Transaction, blockSubmitted uint64) { blockSubmitted: blockSubmitted, } h := tx.Hash() - if err := w.monitoredTxDB.Store(h[:], monitoredTx); err != nil { + if err := w.txDB.storeMonitoredTx(h, monitoredTx); err != nil { w.log.Errorf("error storing monitored tx: %v", err) } @@ -4115,69 +4018,76 @@ func (w *assetWallet) checkPendingApprovals() { } } -// sumPendingTxs sums the expected incoming and outgoing values in unconfirmed -// transactions stored in pendingTxs. Not used if the node is a txPoolFetcher. +// sumPendingTxs sums the expected incoming and outgoing values in pending +// transactions stored in pendingTxs. Not used if the node is a +// txPoolFetcher. func (w *assetWallet) sumPendingTxs(bal *big.Int) (out, in uint64) { + isToken := w.assetID != w.baseChainID + + pendingTxsCopy := make(map[uint64]*extendedWalletTx, len(w.pendingTxs)) + w.pendingTxsMtx.Lock() + for nonce, tx := range w.pendingTxs { + pendingTxsCopy[nonce] = tx + } + balanceHasChanged := w.pendingTxCheckBal == nil || bal.Cmp(w.pendingTxCheckBal) != 0 + w.pendingTxCheckBal = bal + w.pendingTxsMtx.Unlock() + w.tipMtx.RLock() tip := w.currentTip.Number.Uint64() w.tipMtx.RUnlock() - isToken := w.assetID != w.baseChainID - - addPendingTx := func(pt *pendingTx) { - in += pt.in - if !isToken { - if pt.assetID != w.baseChainID { - out += pt.maxFees + addPendingTx := func(txAssetID uint32, pt *extendedWalletTx) { + if txAssetID == w.assetID { + if pt.BalanceDelta > 0 { + in += uint64(pt.BalanceDelta) } else { - out += pt.out + pt.maxFees + out += uint64(-1 * pt.BalanceDelta) } - return } - // token - out += pt.out + if !isToken { + out += pt.Fees + } } - w.pendingTxMtx.Lock() - defer w.pendingTxMtx.Unlock() - balanceHasChanged := w.pendingTxCheckBal == nil || bal.Cmp(w.pendingTxCheckBal) != 0 - w.pendingTxCheckBal = bal - for txHash, pt := range w.pendingTxs { - if isToken && pt.assetID != w.assetID { - continue + processPendingTx := func(nonce uint64, wt *extendedWalletTx) { + wt.mtx.Lock() + defer wt.mtx.Unlock() + + // Already confirmed, but still in the unconfirmed txs map waiting for + // txConfsNeededToConfirm confirmations. + if wt.BlockNumber != 0 { + return + } + + txAssetID := w.baseChainID + if wt.TokenID != nil { + txAssetID = *wt.TokenID + } + if isToken && w.assetID != txAssetID { + return } - if pt.lastCheck == tip && !balanceHasChanged { + + if tip == wt.lastCheck || !balanceHasChanged { // Expect nothing has changed since our last check. - addPendingTx(pt) - continue + addPendingTx(txAssetID, wt) + return } - confs, err := w.node.transactionConfirmations(w.ctx, txHash) - if err != nil { - if !errors.Is(err, asset.CoinNotFoundError) { - w.log.Errorf("Error getting confirmations for pending tx %s: %v", txHash, err) - } - if time.Since(pt.stamp) > time.Minute*5 { - currentNonce, err := w.node.getConfirmedNonce(w.ctx) - if err != nil { - w.log.Errorf("Error getting account nonce for stale pending tx check: %v", err) - continue - } - if currentNonce >= pt.nonce { - w.log.Errorf("pending tx not confirmed but nonce has been confirmed") - delete(w.pendingTxs, txHash) - } - } - if !errors.Is(err, asset.CoinNotFoundError) { - continue - } + + givenUp := w.checkPendingTx(nonce, wt) + if givenUp { + return } - if confs > 0 { - delete(w.pendingTxs, txHash) - continue + + if wt.BlockNumber == 0 { + addPendingTx(txAssetID, wt) } - pt.lastCheck = tip // Avoid multiple checks on the same block. - addPendingTx(pt) } + + for nonce, wt := range pendingTxsCopy { + processPendingTx(nonce, wt) + } + return } @@ -4636,6 +4546,201 @@ func checkTxStatus(receipt *types.Receipt, gasLimit uint64) error { return nil } +// checkPendingTx checks the confirmation status of a transaction. The BlockNumber +// and Fees fields of the extendedWalletTx are updated if the transaction is confirmed, +// and if the transaction has reached the required number of confirmations, it is removed +// from w.pendingTxs. +// True is returned from this function if we have given up on the transaction, and it +// should not be considered in the pending tx calculation. +// +// extendedWalletTx.mtx MUST be held while calling this function, but the +// w.pendingTxsMtx MUST NOT be held. +func (w *baseWallet) checkPendingTx(nonce uint64, pendingTx *extendedWalletTx) (givenUp bool) { + w.tipMtx.RLock() + tip := w.currentTip.Number.Uint64() + w.tipMtx.RUnlock() + + var updated bool + defer func() { + if givenUp { + err := w.txDB.removeTx(pendingTx.ID) + if err != nil { + w.log.Errorf("failed to remove tx from db: %v", err) + } else { + w.pendingTxsMtx.Lock() + delete(w.pendingTxs, nonce) + w.pendingTxsMtx.Unlock() + } + return + } + + if updated || !pendingTx.savedToDB { + err := w.txDB.storeTx(nonce, pendingTx) + if err != nil { + w.log.Errorf("error updating tx in db: %w", err) + pendingTx.savedToDB = false + return + } + + pendingTx.savedToDB = true + if pendingTx.Confirmed { + w.pendingTxsMtx.Lock() + delete(w.pendingTxs, nonce) + w.pendingTxsMtx.Unlock() + } + } + }() + + if pendingTx.lastCheck == tip { + return false + } + pendingTx.lastCheck = tip + + var txHash common.Hash + copy(txHash[:], pendingTx.ID) + receipt, tx, err := w.node.transactionReceipt(w.ctx, txHash) + if err != nil { + if errors.Is(err, asset.CoinNotFoundError) && pendingTx.BlockNumber > 0 { + w.log.Warnf("TxID %v was previously confirmed but now cannot be found", pendingTx.ID) + pendingTx.BlockNumber = 0 + updated = true + } + if !errors.Is(err, asset.CoinNotFoundError) { + w.log.Errorf("Error getting confirmations for pending tx %s: %v", txHash, err) + } + if time.Since(time.Unix(int64(pendingTx.TimeStamp), 0)) > time.Minute*6 { + givenUp = true + } + + return + } + + if receipt.BlockNumber == nil || receipt.BlockNumber.Cmp(new(big.Int)) == 0 { + if pendingTx.BlockNumber > 0 { + w.log.Warnf("TxID %v was previously confirmed but now is not confirmed", pendingTx.ID) + pendingTx.BlockNumber = 0 + updated = true + } + return + } + hdr, err := w.node.headerByHash(w.ctx, receipt.BlockHash) + if err != nil { + w.log.Errorf("Error getting header for hash %v: %v", receipt.BlockHash, err) + return + } + if hdr == nil { + w.log.Errorf("Header for hash %v is nil", receipt.BlockHash) + return + } + + effectiveGasPrice := new(big.Int).Add(hdr.BaseFee, tx.EffectiveGasTipValue(hdr.BaseFee)) + bigFees := new(big.Int).Mul(effectiveGasPrice, big.NewInt(int64(receipt.GasUsed))) + + fees := dexeth.WeiToGwei(bigFees) + blockNumber := receipt.BlockNumber.Uint64() + if pendingTx.BlockNumber != blockNumber || pendingTx.Fees != fees { + pendingTx.Fees = dexeth.WeiToGwei(bigFees) + pendingTx.BlockNumber = blockNumber + updated = true + } + + var confs uint64 + if blockNumber > 0 && tip >= blockNumber { + confs = tip - blockNumber + 1 + } + if confs >= txConfsNeededToConfirm { + if !pendingTx.Confirmed { + updated = true + } + pendingTx.Confirmed = true + } + + return +} + +// checkPendingTxs checks the confirmation status of all pending transactions. +func (w *baseWallet) checkPendingTxs() { + pendingTxsCopy := make(map[uint64]*extendedWalletTx, len(w.pendingTxs)) + w.pendingTxsMtx.Lock() + for nonce, tx := range w.pendingTxs { + pendingTxsCopy[nonce] = tx + } + w.pendingTxsMtx.Unlock() + + for nonce, pendingTx := range pendingTxsCopy { + pendingTx.mtx.Lock() + w.checkPendingTx(nonce, pendingTx) + pendingTx.mtx.Unlock() + } +} + +const txHistoryNonceKey = "Nonce" + +func (w *baseWallet) addToTxHistory(nonce uint64, balanceDelta int64, fees, blockNumber uint64, + assetID uint32, id dex.Bytes, typ asset.TransactionType) { + w.tipMtx.RLock() + tip := w.currentTip.Number.Uint64() + w.tipMtx.RUnlock() + var confs uint64 + if blockNumber > 0 && tip >= blockNumber { + confs = tip - blockNumber + 1 + } + + var tokenAssetID *uint32 + if assetID != w.baseChainID { + tokenAssetID = &assetID + } + + wt := &extendedWalletTx{ + WalletTransaction: &asset.WalletTransaction{ + Type: typ, + ID: id, + BalanceDelta: balanceDelta, + Fees: fees, + BlockNumber: blockNumber, + TokenID: tokenAssetID, + AdditionalData: map[string]string{ + txHistoryNonceKey: strconv.FormatUint(nonce, 10), + }, + }, + TimeStamp: uint64(time.Now().Unix()), + Confirmed: confs >= txConfsNeededToConfirm, + } + + if !wt.Confirmed { + w.pendingTxsMtx.Lock() + w.pendingTxs[nonce] = wt + w.pendingTxsMtx.Unlock() + } + + err := w.txDB.storeTx(nonce, wt) + if err != nil { + if wt.Confirmed { + // If it's confirmed but we failed to store it in the db, add + // it to the map so we can retry. + w.pendingTxsMtx.Lock() + w.pendingTxs[nonce] = wt + w.pendingTxsMtx.Unlock() + } + w.log.Errorf("error storing tx in db: %v", err) + } +} + +// TxHistory returns all the transactions the wallet has made. This +// includes the ETH wallet and all token wallets. If refID is nil, then +// transactions starting from the most recent are returned (past is ignored). +// If past is true, the transactions prior to the refID are returned, otherwise +// the transactions after the refID are returned. n is the number of +// transactions to return. If n is <= 0, all the transactions will be returned. +func (w *baseWallet) TxHistory(n int, refID *dex.Bytes, past bool) ([]*asset.WalletTransaction, error) { + baseChainWallet := w.wallet(w.baseChainID) + if baseChainWallet == nil || !baseChainWallet.connected.Load() { + return nil, fmt.Errorf("wallet not connected") + } + + return w.txDB.getTxs(n, refID, past) +} + // providersFile reads a file located at ~/dextest/credentials.json. // The file contains seed and provider information for wallets used for // getgas, deploy, and nodeclient testing. If simnet providers are not @@ -4645,6 +4750,12 @@ type providersFile struct { Providers map[string] /* symbol */ map[string] /* network */ []string `json:"providers"` } +// fileCredentials contain the seed and providers to use for GetGasEstimates. +type fileCredentials struct { + Seed dex.Bytes `json:"seed"` + Providers map[string]string `json:"providers"` +} + // getFileCredentials reads the file at path and extracts the seed and the // provider for the network. func getFileCredentials(chain, path string, net dex.Network) (seed []byte, providers []string, err error) { diff --git a/client/asset/eth/eth_test.go b/client/asset/eth/eth_test.go index 5f466cc5e6..d86eedf9f9 100644 --- a/client/asset/eth/eth_test.go +++ b/client/asset/eth/eth_test.go @@ -14,6 +14,7 @@ import ( "fmt" "math/big" "math/rand" + "path/filepath" "sort" "strings" "sync" @@ -21,7 +22,6 @@ import ( "time" "decred.org/dcrdex/client/asset" - "decred.org/dcrdex/client/asset/kvdb" "decred.org/dcrdex/dex" "decred.org/dcrdex/dex/config" "decred.org/dcrdex/dex/encode" @@ -117,8 +117,10 @@ type testNode struct { receipt *types.Receipt receiptTx *types.Transaction receiptErr error + receipts map[common.Hash]*types.Receipt + receiptTxs map[common.Hash]*types.Transaction + receiptErrs map[common.Hash]error hdrByHash *types.Header - txReceipt *types.Receipt lastSignedTx *types.Transaction sendTxTx *types.Transaction sendTxErr error @@ -180,7 +182,9 @@ func (n *testNode) txOpts(ctx context.Context, val, maxGas uint64, maxFeeRate, n if maxFeeRate == nil { maxFeeRate = n.maxFeeRate } - return newTxOpts(ctx, n.addr, val, maxGas, maxFeeRate, dexeth.GweiToWei(2)), nil + txOpts := newTxOpts(ctx, n.addr, val, maxGas, maxFeeRate, dexeth.GweiToWei(2)) + txOpts.Nonce = big.NewInt(1) + return txOpts, nil } func (n *testNode) currentFees(ctx context.Context) (baseFees, tipCap *big.Int, err error) { @@ -257,7 +261,14 @@ func (n *testNode) headerByHash(_ context.Context, txHash common.Hash) (*types.H } func (n *testNode) transactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, *types.Transaction, error) { - return n.receipt, n.receiptTx, n.receiptErr + if n.receiptErr != nil { + return nil, nil, n.receiptErr + } + if n.receipt != nil { + return n.receipt, n.receiptTx, nil + } + + return n.receipts[txHash], n.receiptTxs[txHash], n.receiptErrs[txHash] } func (n *testNode) setBalanceError(w *assetWallet, err error) { @@ -407,6 +418,265 @@ func (c *tTokenContractor) estimateTransferGas(context.Context, *big.Int) (uint6 return c.transferEstimate, c.transferEstimateErr } +type tTxDB struct { + storeTxCalled bool + storeTxErr error + removeTxCalled bool + removeTxErr error +} + +var _ txDB = (*tTxDB)(nil) + +func (db *tTxDB) storeTx(nonce uint64, wt *extendedWalletTx) error { + db.storeTxCalled = true + return db.storeTxErr +} +func (db *tTxDB) removeTx(id dex.Bytes) error { + db.removeTxCalled = true + return db.removeTxErr +} +func (db *tTxDB) getTxs(n int, refID *dex.Bytes, past bool) ([]*asset.WalletTransaction, error) { + return nil, nil +} +func (db *tTxDB) getPendingTxs() (map[uint64]*extendedWalletTx, error) { + return nil, nil +} +func (db *tTxDB) getMonitoredTxs() (map[common.Hash]*monitoredTx, error) { + return nil, nil +} +func (db *tTxDB) storeMonitoredTx(txHash common.Hash, monitoredTx *monitoredTx) error { + return nil +} +func (db *tTxDB) removeMonitoredTxs(txHash []common.Hash) error { + return nil +} +func (db *tTxDB) close() error { + return nil +} +func (db *tTxDB) run(context.Context) {} + +func TestCheckUnconfirmedTxs(t *testing.T) { + const tipHeight = 50 + const baseFeeGwei = 100 + const gasTipCapGwei = 2 + + type tExtendedWalletTx struct { + wt *extendedWalletTx + confs uint32 + gasUsed uint64 + txReceiptErr error + } + + newExtendedWalletTx := func(assetID uint32, maxFees uint64, currBlockNumber uint64, txReceiptConfs uint32, + txReceiptGasUsed uint64, txReceiptErr error, timeStamp int64, savedToDB bool) *tExtendedWalletTx { + var tokenID *uint32 + if assetID != BipID { + tokenID = &assetID + } + + return &tExtendedWalletTx{ + wt: &extendedWalletTx{ + WalletTransaction: &asset.WalletTransaction{ + BlockNumber: currBlockNumber, + TokenID: tokenID, + Fees: maxFees, + }, + TimeStamp: uint64(timeStamp), + savedToDB: savedToDB, + }, + confs: txReceiptConfs, + gasUsed: txReceiptGasUsed, + txReceiptErr: txReceiptErr, + } + } + + gasFee := func(gasUsed uint64) uint64 { + return gasUsed * (baseFeeGwei + gasTipCapGwei) + } + + now := time.Now().Unix() + + tests := []struct { + name string + assetID uint32 + unconfirmedTxs map[uint64]*tExtendedWalletTx + confirmedNonce uint64 + + expTxsAfter map[uint64]*extendedWalletTx + expStoreTxCalled bool + expRemoveTxCalled bool + storeTxErr error + removeTxErr error + }{ + { + name: "coin not found", + assetID: BipID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, 0, 0, 0, asset.CoinNotFoundError, now, true), + }, + expTxsAfter: map[uint64]*extendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, 0, 0, 0, asset.CoinNotFoundError, now, true).wt, + }, + }, + { + name: "tx was nonce replaced", + assetID: BipID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, 0, 0, 0, asset.CoinNotFoundError, now-(5*60+1), true), + }, + confirmedNonce: 1, + expTxsAfter: map[uint64]*extendedWalletTx{}, + expRemoveTxCalled: true, + }, + { + name: "leave in unconfirmed txs if txDB.removeTx fails", + assetID: BipID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, 0, 0, 0, asset.CoinNotFoundError, now-(5*60+1), true), + }, + confirmedNonce: 1, + expTxsAfter: map[uint64]*extendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, 0, 0, 0, asset.CoinNotFoundError, now-(5*60+1), true).wt, + }, + removeTxErr: errors.New(""), + expRemoveTxCalled: true, + }, + { + name: "not nonce replaced, but still cannot find after 10 mins", + assetID: BipID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, 0, 0, 0, asset.CoinNotFoundError, now-(10*60+1), true), + }, + confirmedNonce: 0, + expTxsAfter: map[uint64]*extendedWalletTx{}, + expRemoveTxCalled: true, + }, + { + name: "still in mempool", + assetID: BipID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, 0, 0, 0, nil, now, true), + }, + expTxsAfter: map[uint64]*extendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, 0, 0, 0, nil, now, true).wt, + }, + }, + { + name: "1 confirmation", + assetID: BipID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, 0, 1, 6e5, nil, now, true), + }, + expTxsAfter: map[uint64]*extendedWalletTx{ + 1: newExtendedWalletTx(BipID, gasFee(6e5), tipHeight, 0, 0, nil, now, true).wt, + }, + expStoreTxCalled: true, + }, + { + name: "3 confirmations", + assetID: BipID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 1: newExtendedWalletTx(BipID, gasFee(6e5), tipHeight, 3, 6e5, nil, now, true), + }, + expTxsAfter: map[uint64]*extendedWalletTx{}, + expStoreTxCalled: true, + }, + { + name: "3 confirmations, leave in unconfirmed txs if txDB.storeTx fails", + assetID: BipID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 1: newExtendedWalletTx(BipID, gasFee(6e5), tipHeight-2, 3, 6e5, nil, now, true), + }, + expTxsAfter: map[uint64]*extendedWalletTx{ + 1: newExtendedWalletTx(BipID, gasFee(6e5), tipHeight-2, 3, 6e5, nil, now, true).wt, + }, + expStoreTxCalled: true, + storeTxErr: errors.New(""), + }, + { + name: "was confirmed but now not found", + assetID: BipID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, tipHeight-1, 0, 0, asset.CoinNotFoundError, now, true), + }, + expTxsAfter: map[uint64]*extendedWalletTx{ + 1: newExtendedWalletTx(BipID, 1e7, 0, 0, 0, asset.CoinNotFoundError, now, true).wt, + }, + expStoreTxCalled: true, + }, + } + + for _, tt := range tests { + if tt.name != "1 confirmation" { + continue + } + t.Run(tt.name, func(t *testing.T) { + _, eth, node, shutdown := tassetWallet(tt.assetID) + defer shutdown() + + node.tokenContractor.bal = unlimitedAllowance + node.receipts = make(map[common.Hash]*types.Receipt) + node.receiptTxs = make(map[common.Hash]*types.Transaction) + node.receiptErrs = make(map[common.Hash]error) + node.hdrByHash = &types.Header{ + BaseFee: dexeth.GweiToWei(baseFeeGwei), + } + node.confNonce = tt.confirmedNonce + eth.connected.Store(true) + eth.tipMtx.Lock() + eth.currentTip = &types.Header{Number: new(big.Int).SetUint64(tipHeight)} + eth.tipMtx.Unlock() + + txDB := &tTxDB{ + storeTxErr: tt.storeTxErr, + removeTxErr: tt.removeTxErr, + } + eth.txDB = txDB + + for nonce, pt := range tt.unconfirmedTxs { + txHash := common.BytesToHash(encode.RandomBytes(32)) + pt.wt.ID = txHash[:] + eth.pendingTxs[nonce] = pt.wt + var blockNumber *big.Int + if pt.confs > 0 { + blockNumber = big.NewInt(int64(tipHeight - pt.confs + 1)) + } + node.receipts[txHash] = &types.Receipt{BlockNumber: blockNumber, GasUsed: pt.gasUsed} + node.receiptTxs[txHash] = types.NewTx(&types.DynamicFeeTx{ + GasTipCap: dexeth.GweiToWei(gasTipCapGwei), + GasFeeCap: dexeth.GweiToWei(2 * baseFeeGwei), + }) + node.receiptErrs[txHash] = pt.txReceiptErr + } + + eth.checkPendingTxs() + + if len(eth.pendingTxs) != len(tt.expTxsAfter) { + t.Fatalf("expected %d unconfirmed txs, got %d", len(tt.expTxsAfter), len(eth.pendingTxs)) + } + for nonce, expTx := range tt.expTxsAfter { + if tx, ok := eth.pendingTxs[nonce]; !ok { + t.Fatalf("expected unconfirmed tx with nonce %d", nonce) + } else { + if tx.Fees != expTx.Fees { + t.Fatalf("expected fees %d, got %d", expTx.Fees, tx.Fees) + } + if tx.BlockNumber != expTx.BlockNumber { + t.Fatalf("expected block number %d, got %d", expTx.BlockNumber, tx.BlockNumber) + } + } + } + + if txDB.storeTxCalled != tt.expStoreTxCalled { + t.Fatalf("expected storeTx called %v, got %v", tt.expStoreTxCalled, txDB.storeTxCalled) + } + if txDB.removeTxCalled != tt.expRemoveTxCalled { + t.Fatalf("expected removeTx called %v, got %v", tt.expRemoveTxCalled, txDB.removeTxCalled) + } + }) + } +} + func TestCheckForNewBlocks(t *testing.T) { header0 := &types.Header{Number: new(big.Int)} header1 := &types.Header{Number: big.NewInt(1)} @@ -440,6 +710,7 @@ func TestCheckForNewBlocks(t *testing.T) { ctx: ctx, log: tLogger, currentTip: header0, + txDB: &tTxDB{}, }, log: tLogger.SubLogger("ETH"), emit: emit, @@ -612,19 +883,19 @@ func tassetWallet(assetID uint32) (asset.Wallet, *assetWallet, *tMempoolNode, co aw := &assetWallet{ baseWallet: &baseWallet{ - baseChainID: BipID, - chainID: dexeth.ChainIDs[dex.Simnet], - tokens: dexeth.Tokens, - addr: node.addr, - net: dex.Simnet, - node: node, - ctx: ctx, - log: tLogger, - gasFeeLimitV: defaultGasFeeLimit, - monitoredTxs: make(map[common.Hash]*monitoredTx), - monitoredTxDB: kvdb.NewMemoryDB(), - pendingTxs: make(map[common.Hash]*pendingTx), - currentTip: &types.Header{Number: new(big.Int)}, + baseChainID: BipID, + chainID: dexeth.ChainIDs[dex.Simnet], + tokens: dexeth.Tokens, + addr: node.addr, + net: dex.Simnet, + node: node, + ctx: ctx, + log: tLogger, + gasFeeLimitV: defaultGasFeeLimit, + monitoredTxs: make(map[common.Hash]*monitoredTx), + pendingTxs: make(map[uint64]*extendedWalletTx), + txDB: &tTxDB{}, + currentTip: &types.Header{Number: new(big.Int)}, }, versionedGases: versionedGases, maxSwapGas: versionedGases[0].Swap, @@ -830,42 +1101,47 @@ func TestBalanceWithMempool(t *testing.T) { } func TestBalanceNoMempool(t *testing.T) { - const tipHeight = 50 const lastCheck = tipHeight - 1 - type tPendingTx struct { - *pendingTx + type tExtendedWalletTx struct { + wt *extendedWalletTx confs uint32 } - newPendingTx := func(assetID uint32, out, in, maxFees uint64, confs uint32) *tPendingTx { - return &tPendingTx{ - pendingTx: &pendingTx{ - assetID: assetID, - out: out, - in: in, - maxFees: maxFees, - stamp: time.Now(), + newExtendedWalletTx := func(assetID uint32, out, in, maxFees uint64, currBlockNumber uint64, txReceiptConfs uint32) *tExtendedWalletTx { + var tokenID *uint32 + if assetID != BipID { + tokenID = &assetID + } + + return &tExtendedWalletTx{ + wt: &extendedWalletTx{ + WalletTransaction: &asset.WalletTransaction{ + BalanceDelta: int64(in) - int64(out), + BlockNumber: 0, + TokenID: tokenID, + Fees: maxFees, + }, lastCheck: lastCheck, }, - confs: confs, + confs: txReceiptConfs, } } tests := []struct { - name string - assetID uint32 - pendingTxs []*tPendingTx - expPendingIn uint64 - expPendingOut uint64 - expCountAfter int + name string + assetID uint32 + unconfirmedTxs map[uint64]*tExtendedWalletTx + expPendingIn uint64 + expPendingOut uint64 + expCountAfter int }{ { name: "single eth tx", assetID: BipID, - pendingTxs: []*tPendingTx{ - newPendingTx(BipID, 1, 0, 2, 0), + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 0: newExtendedWalletTx(BipID, 1, 0, 2, 0, 0), }, expPendingOut: 3, expCountAfter: 1, @@ -873,15 +1149,23 @@ func TestBalanceNoMempool(t *testing.T) { { name: "single tx expired", assetID: BipID, - pendingTxs: []*tPendingTx{ - newPendingTx(BipID, 1, 0, 1, 1), + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 0: newExtendedWalletTx(BipID, 1, 0, 1, 0, 1), + }, + expCountAfter: 1, + }, + { + name: "single tx expired, txConfsNeededToConfirm confs", + assetID: BipID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 0: newExtendedWalletTx(BipID, 1, 0, 1, 0, txConfsNeededToConfirm), }, }, { name: "eth with token fees", assetID: BipID, - pendingTxs: []*tPendingTx{ - newPendingTx(simnetTokenID, 4, 0, 5, 0), + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 0: newExtendedWalletTx(simnetTokenID, 4, 0, 5, 0, 0), }, expPendingOut: 5, expCountAfter: 1, @@ -889,9 +1173,9 @@ func TestBalanceNoMempool(t *testing.T) { { name: "token with 1 tx and other ignored assets", assetID: simnetTokenID, - pendingTxs: []*tPendingTx{ - newPendingTx(simnetTokenID, 4, 0, 5, 0), - newPendingTx(simnetTokenID+1, 8, 0, 9, 0), + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 0: newExtendedWalletTx(simnetTokenID, 4, 0, 5, 0, 0), + 1: newExtendedWalletTx(simnetTokenID+1, 8, 0, 9, 0, 0), }, expPendingOut: 4, expCountAfter: 2, @@ -899,8 +1183,8 @@ func TestBalanceNoMempool(t *testing.T) { { name: "token with 1 tx incoming", assetID: simnetTokenID, - pendingTxs: []*tPendingTx{ - newPendingTx(simnetTokenID, 0, 15, 5, 0), + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 0: newExtendedWalletTx(simnetTokenID, 0, 15, 5, 0, 0), }, expPendingIn: 15, expCountAfter: 1, @@ -908,16 +1192,24 @@ func TestBalanceNoMempool(t *testing.T) { { name: "eth mixed txs", assetID: BipID, - pendingTxs: []*tPendingTx{ - newPendingTx(BipID, 1, 0, 2, 0), // 3 eth out - newPendingTx(simnetTokenID, 3, 0, 4, 1), // confirmed - newPendingTx(simnetTokenID, 5, 0, 6, 0), // 6 eth out - newPendingTx(BipID, 0, 7, 1, 0), // 1 eth out, 7 eth in + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 0: newExtendedWalletTx(BipID, 1, 0, 2, 0, 0), // 3 eth out + 1: newExtendedWalletTx(simnetTokenID, 3, 0, 4, 0, txConfsNeededToConfirm), // confirmed + 2: newExtendedWalletTx(simnetTokenID, 5, 0, 6, 0, 0), // 6 eth out + 3: newExtendedWalletTx(BipID, 0, 7, 1, 0, 0), // 1 eth out, 7 eth in }, expPendingOut: 10, expPendingIn: 7, expCountAfter: 3, }, + { + name: "already confirmed, but still waiting for txConfsNeededToConfirm", + assetID: simnetTokenID, + unconfirmedTxs: map[uint64]*tExtendedWalletTx{ + 0: newExtendedWalletTx(simnetTokenID, 0, 15, 5, tipHeight, 1), + }, + expCountAfter: 1, + }, } for _, tt := range tests { @@ -925,23 +1217,30 @@ func TestBalanceNoMempool(t *testing.T) { _, eth, tNode, shutdown := tassetWallet(tt.assetID) defer shutdown() eth.node = tNode.testNode // no mempool - tNode.txConfirmations = make(map[common.Hash]uint32) - tNode.txConfsErr = make(map[common.Hash]error) tNode.bal = unlimitedAllowance tNode.tokenContractor.bal = unlimitedAllowance - + tNode.receipts = make(map[common.Hash]*types.Receipt) + tNode.receiptTxs = make(map[common.Hash]*types.Transaction) + tNode.hdrByHash = &types.Header{ + BaseFee: big.NewInt(100e9), + } + eth.connected.Store(true) eth.tipMtx.Lock() eth.currentTip = &types.Header{Number: new(big.Int).SetUint64(tipHeight)} eth.tipMtx.Unlock() - for _, pt := range tt.pendingTxs { + for nonce, pt := range tt.unconfirmedTxs { txHash := common.BytesToHash(encode.RandomBytes(32)) - eth.pendingTxs[txHash] = pt.pendingTx - if pt.confs == 0 { - tNode.txConfsErr[txHash] = asset.CoinNotFoundError - } else { - tNode.txConfirmations[txHash] = pt.confs + pt.wt.ID = txHash[:] + eth.pendingTxs[nonce] = pt.wt + var blockNumber *big.Int + if pt.confs > 0 { + blockNumber = big.NewInt(int64(tipHeight - pt.confs + 1)) } + tNode.receipts[txHash] = &types.Receipt{BlockNumber: blockNumber} + tNode.receiptTxs[txHash] = types.NewTx(&types.DynamicFeeTx{ + GasTipCap: big.NewInt(2e9), + }) } bal, err := eth.balanceWithTxPool() @@ -4135,7 +4434,14 @@ func testConfirmRedemption(t *testing.T, assetID uint32) { } } - tests := []struct { + tempDir := t.TempDir() + txDB, err := newBadgerTxDB(filepath.Join(tempDir, "tx.db"), tLogger) + if err != nil { + t.Fatalf("error creating tx db: %v", err) + } + defer eth.txDB.close() + + type test struct { name string redemption *asset.Redemption @@ -4166,7 +4472,9 @@ func testConfirmRedemption(t *testing.T, assetID uint32) { receipt *types.Receipt receiptErr error - }{ + } + + tests := []*test{ { name: "in monitored txs, found by geth, not yet confirmed", coinID: toEthTxCoinID(3, 200, redeem0Data), @@ -4698,7 +5006,7 @@ func testConfirmRedemption(t *testing.T, assetID uint32) { }, } - for _, test := range tests { + runTest := func(test *test) { fmt.Printf("###### %s ###### \n", test.name) node.getTxResMap = make(map[common.Hash]*tGetTxRes) for hash, txData := range test.getTxResMap { @@ -4728,14 +5036,31 @@ func testConfirmRedemption(t *testing.T, assetID uint32) { node.receipt = test.receipt node.receiptErr = test.receiptErr - eth.monitoredTxDB = kvdb.NewMemoryDB() + eth.txDB = txDB eth.monitoredTxs = test.monitoredTxs + for h, tx := range test.monitoredTxs { - if err := eth.monitoredTxDB.Store(h[:], tx); err != nil { + if err := eth.txDB.storeMonitoredTx(h, tx); err != nil { t.Fatalf("%s: error storing monitored tx: %v", test.name, err) } } + // clear the monitored txs after each test + defer func() { + storedTxs, err := eth.txDB.getMonitoredTxs() + if err != nil { + t.Fatalf("%s: failed to load stored txs", test.name) + } + storedTxHashes := make([]common.Hash, 0, len(storedTxs)) + for h := range storedTxs { + storedTxHashes = append(storedTxHashes, h) + } + err = eth.txDB.removeMonitoredTxs(storedTxHashes) + if err != nil { + t.Fatalf("%s: failed to remove stored txs", test.name) + } + }() + result, err := wi.ConfirmRedemption(test.coinID, test.redemption, 0) if test.expectErr { if err == nil { @@ -4744,7 +5069,7 @@ func testConfirmRedemption(t *testing.T, assetID uint32) { if test.expectSwapRefundedErr && !errors.Is(asset.ErrSwapRefunded, err) { t.Fatalf("%s: expected swap refunded error but got %v", test.name, err) } - continue + return } if err != nil { t.Fatalf("%s: unexpected error %v", test.name, err) @@ -4803,7 +5128,7 @@ func testConfirmRedemption(t *testing.T, assetID uint32) { } return true } - storedTxs, err := loadMonitoredTxs(eth.monitoredTxDB) + storedTxs, err := eth.txDB.getMonitoredTxs() if err != nil { t.Fatalf("%s: failed to load stored txs", test.name) } @@ -4838,6 +5163,10 @@ func testConfirmRedemption(t *testing.T, assetID uint32) { t.Fatalf("%s: expected result %+v != result %+v", test.name, test.expectedResult, result) } } + + for _, test := range tests { + runTest(test) + } } func TestMarshalMonitoredTx(t *testing.T) { @@ -5245,8 +5574,10 @@ func TestSwapOrRedemptionFeesPaid(t *testing.T) { wantErr: true, }} for _, test := range tests { - node.receipt = test.receipt + var txHash common.Hash + copy(txHash[:], test.coinID) node.receiptTx = test.receiptTx + node.receipt = test.receipt node.receiptErr = test.receiptErr node.hdrByHash = test.hdrByHash node.bestHdr = test.bestHdr diff --git a/client/asset/eth/txdb.go b/client/asset/eth/txdb.go new file mode 100644 index 0000000000..50856f4292 --- /dev/null +++ b/client/asset/eth/txdb.go @@ -0,0 +1,572 @@ +// This code is available on the terms of the project LICENSE.md file, +// also available online at https://blueoakcouncil.org/license/1.0.0. + +package eth + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "math" + "sync" + "time" + + "decred.org/dcrdex/client/asset" + "decred.org/dcrdex/dex" + "decred.org/dcrdex/dex/encode" + "github.com/dgraph-io/badger" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// extendedWalletTx is an asset.WalletTransaction extended with additional +// fields used for tracking transactions. +type extendedWalletTx struct { + mtx sync.RWMutex + *asset.WalletTransaction + // Confirmed will be set to true once the transaction has 3 confirmations. + Confirmed bool `json:"confirmed"` + BlockSubmitted uint64 `json:"blockSubmitted"` + TimeStamp uint64 `json:"timeStamp"` + + lastCheck uint64 + savedToDB bool +} + +// monitoredTx is used to keep track of redemption transactions that have not +// yet been confirmed. If a transaction has to be replaced due to the fee +// being too low or another transaction being mined with the same nonce, +// the replacement transaction's ID is recorded in the replacementTx field. +// replacedTx is used to maintain a doubly linked list, which allows deletion +// of transactions that were replaced after a transaction is confirmed. +type monitoredTx struct { + tx *types.Transaction + blockSubmitted uint64 + + // This mutex must be held during the entire process of confirming + // a transaction. This is to avoid confirmations of the same + // transactions happening concurrently resulting in more than one + // replacement for the same transaction. + mtx sync.Mutex + replacementTx *common.Hash + // replacedTx could be set when the tx is created, be immutable, and not + // need the mutex, but since Redeem doesn't know if the transaction is a + // replacement or a new one, this variable is set in recordReplacementTx. + replacedTx *common.Hash + errorsBroadcasted uint16 +} + +// MarshalBinary marshals a monitoredTx into a byte array. +// It satisfies the encoding.BinaryMarshaler interface for monitoredTx. +func (m *monitoredTx) MarshalBinary() (data []byte, err error) { + b := encode.BuildyBytes{0} + txB, err := m.tx.MarshalBinary() + if err != nil { + return nil, fmt.Errorf("error marshaling tx: %v", err) + } + b = b.AddData(txB) + + blockB := make([]byte, 8) + binary.BigEndian.PutUint64(blockB, m.blockSubmitted) + b = b.AddData(blockB) + + if m.replacementTx != nil { + replacementTxHash := m.replacementTx[:] + b = b.AddData(replacementTxHash) + } + + return b, nil +} + +// UnmarshalBinary loads a data from a marshalled byte array into a +// monitoredTx. +func (m *monitoredTx) UnmarshalBinary(data []byte) error { + ver, pushes, err := encode.DecodeBlob(data) + if err != nil { + return err + } + if ver != 0 { + return fmt.Errorf("unknown version %d", ver) + } + if len(pushes) != 2 && len(pushes) != 3 { + return fmt.Errorf("wrong number of pushes %d", len(pushes)) + } + m.tx = &types.Transaction{} + if err := m.tx.UnmarshalBinary(pushes[0]); err != nil { + return fmt.Errorf("error reading tx: %w", err) + } + + m.blockSubmitted = binary.BigEndian.Uint64(pushes[1]) + + if len(pushes) == 3 { + var replacementTxHash common.Hash + copy(replacementTxHash[:], pushes[2]) + m.replacementTx = &replacementTxHash + } + + return nil +} + +var ( + // noncePrefix is the prefix for the key used to map a nonce to an + // extendedWalletTx. + noncePrefix = []byte("nonce-") + // txHashPrefix is the prefix for the key used to map a transaction hash + // to a nonce key. + txHashPrefix = []byte("txHash-") + // monitoredTxPrefix is the prefix for the key used to map a transaction + // hash to a monitoredTx. + monitoredTxPrefix = []byte("monitoredTx-") + // dbVersionKey is the key used to store the database version. + dbVersionKey = []byte("dbVersion") +) + +func nonceKey(nonce uint64) []byte { + key := make([]byte, len(noncePrefix)+8) + copy(key, noncePrefix) + binary.BigEndian.PutUint64(key[len(noncePrefix):], nonce) + return key +} + +func nonceFromKey(nk []byte) (uint64, error) { + if !bytes.HasPrefix(nk, noncePrefix) { + return 0, fmt.Errorf("nonce key %x does not have nonce prefix %x", nk, noncePrefix) + } + return binary.BigEndian.Uint64(nk[len(noncePrefix):]), nil +} + +func txHashKey(txHash dex.Bytes) []byte { + key := make([]byte, len(txHashPrefix)+len(txHash)) + copy(key, txHashPrefix) + copy(key[len(txHashPrefix):], txHash) + return key +} + +func monitoredTxKey(txHash dex.Bytes) []byte { + key := make([]byte, len(monitoredTxPrefix)+len(txHash)) + copy(key, monitoredTxPrefix) + copy(key[len(monitoredTxPrefix):], txHash) + return key +} + +func monitoredTxHashFromKey(mtk []byte) (common.Hash, error) { + if !bytes.HasPrefix(mtk, monitoredTxPrefix) { + return common.Hash{}, fmt.Errorf("monitored tx key %x does not have monitored tx prefix %x", mtk, monitoredTxPrefix) + } + var txHash common.Hash + copy(txHash[:], mtk[len(monitoredTxPrefix):]) + return txHash, nil +} + +var maxNonceKey = nonceKey(math.MaxUint64) + +// initialDBVersion only contained mappings from txHash -> monitoredTx. +const initialDBVersion = 0 + +// prefixDBVersion contains three mappings each marked with a prefix: +// +// nonceKey -> extendedWalletTx (noncePrefix) +// txHash -> nonceKey (txHashPrefix) +// txHash -> monitoredTx (monitoredTxPrefix) +const prefixDBVersion = 1 +const txDBVersion = prefixDBVersion + +type txDB interface { + storeTx(nonce uint64, wt *extendedWalletTx) error + removeTx(id dex.Bytes) error + getTxs(n int, refID *dex.Bytes, past bool) ([]*asset.WalletTransaction, error) + getPendingTxs() (map[uint64]*extendedWalletTx, error) + storeMonitoredTx(txHash common.Hash, tx *monitoredTx) error + getMonitoredTxs() (map[common.Hash]*monitoredTx, error) + removeMonitoredTxs([]common.Hash) error + close() error + run(context.Context) +} + +type badgerTxDB struct { + *badger.DB + log dex.Logger +} + +var _ txDB = (*badgerTxDB)(nil) + +// badgerLoggerWrapper wraps dex.Logger and translates Warnf to Warningf to +// satisfy badger.Logger. +type badgerLoggerWrapper struct { + dex.Logger +} + +var _ badger.Logger = (*badgerLoggerWrapper)(nil) + +// Warningf -> dex.Logger.Warnf +func (log *badgerLoggerWrapper) Warningf(s string, a ...interface{}) { + log.Warnf(s, a...) +} + +func newBadgerTxDB(filePath string, log dex.Logger) (*badgerTxDB, error) { + // If memory use is a concern, could try + // .WithValueLogLoadingMode(options.FileIO) // default options.MemoryMap + // .WithMaxTableSize(sz int64); // bytes, default 6MB + // .WithValueLogFileSize(sz int64), bytes, default 1 GB, must be 1MB <= sz <= 1GB + opts := badger.DefaultOptions(filePath).WithLogger(&badgerLoggerWrapper{log}) + db, err := badger.Open(opts) + if err == badger.ErrTruncateNeeded { + // Probably a Windows thing. + // https://github.com/dgraph-io/badger/issues/744 + log.Warnf("error opening badger db: %v", err) + // Try again with value log truncation enabled. + opts.Truncate = true + log.Warnf("Attempting to reopen badger DB with the Truncate option set...") + db, err = badger.Open(opts) + } + if err != nil { + return nil, err + } + + txDB := &badgerTxDB{ + DB: db, + log: log, + } + + err = txDB.updateVersion() + if err != nil { + return nil, fmt.Errorf("failed to update db: %w", err) + } + + return txDB, nil +} + +// updateVersion updates the DB to the latest version. In version 0, +// only a mapping from txHash to monitoredTx was stored, with no +// prefixes. +func (s *badgerTxDB) updateVersion() error { + // Check if the database version is stored. If not, the db + // is version 0. + var version int + err := s.View(func(txn *badger.Txn) error { + item, err := txn.Get(dbVersionKey) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return nil + } + return err + } + return item.Value(func(versionB []byte) error { + version = int(binary.BigEndian.Uint64(versionB)) + return nil + }) + }) + if err != nil { + s.log.Errorf("error retrieving database version: %v", err) + } + + if version == initialDBVersion { + err = s.Update(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + it := txn.NewIterator(opts) + defer it.Close() + + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + key := item.Key() + newKey := monitoredTxKey(key) + monitoredTxB, err := item.ValueCopy(nil) + if err != nil { + return err + } + + err = txn.Set(newKey, monitoredTxB) + if err != nil { + return err + } + err = txn.Delete(key) + if err != nil { + return err + } + } + + versionB := make([]byte, 8) + binary.BigEndian.PutUint64(versionB, 1) + return txn.Set(dbVersionKey, versionB) + }) + if err != nil { + return err + } + s.log.Infof("Updated database to version %d", prefixDBVersion) + } else if version > txDBVersion { + return fmt.Errorf("database version %d is not supported", version) + } + + return nil +} + +func (s *badgerTxDB) close() error { + return s.Close() +} + +func (s *badgerTxDB) run(ctx context.Context) { + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + for { + select { + case <-ticker.C: + err := s.RunValueLogGC(0.5) + if err != nil && !errors.Is(err, badger.ErrNoRewrite) { + s.log.Errorf("garbage collection error: %v", err) + } + case <-ctx.Done(): + return + } + } +} + +// storeTx stores a mapping from nonce to extendedWalletTx and a mapping from +// transaction hash to nonce so transactions can be looked up by hash. If a +// nonce already exists, the extendedWalletTx is overwritten. +func (s *badgerTxDB) storeTx(nonce uint64, wt *extendedWalletTx) error { + wtB, err := json.Marshal(wt) + if err != nil { + return err + } + nk := nonceKey(nonce) + tk := txHashKey(wt.ID) + + return s.Update(func(txn *badger.Txn) error { + oldWtItem, err := txn.Get(nk) + if err != nil && !errors.Is(err, badger.ErrKeyNotFound) { + return err + } + + // If there is an existing transaction with this nonce, delete the + // mapping from tx hash to nonce. + if err == nil { + oldWt := new(extendedWalletTx) + err = oldWtItem.Value(func(oldWtB []byte) error { + err := json.Unmarshal(oldWtB, oldWt) + if err != nil { + s.log.Errorf("unable to unmarhsal wallet transaction: %s: %v", string(oldWtB), err) + } + return err + }) + if err == nil && !bytes.Equal(oldWt.ID, wt.ID) { + err = txn.Delete(txHashKey(oldWt.ID)) + if err != nil { + s.log.Errorf("failed to delete old tx id: %s: %v", oldWt.ID.String(), err) + } + } + } + + // Store nonce key -> wallet transaction + if err := txn.Set(nk, wtB); err != nil { + return err + } + + // Store tx hash -> nonce key + return txn.Set(tk, nk) + }) +} + +// removeTx removes a tx from the db. +func (s *badgerTxDB) removeTx(id dex.Bytes) error { + tk := txHashKey(id) + + return s.Update(func(txn *badger.Txn) error { + txIDEntry, err := txn.Get(tk) + if err != nil { + return err + } + err = txn.Delete(tk) + if err != nil { + return err + } + + nk, err := txIDEntry.ValueCopy(nil) + if err != nil { + return err + } + + return txn.Delete(nk) + }) +} + +// getTxs returns the n more recent transaction if refID is nil, or the +// n transactions before/after refID depending on the value of past. The +// transactions are returned in chronological order. +func (s *badgerTxDB) getTxs(n int, refID *dex.Bytes, past bool) ([]*asset.WalletTransaction, error) { + var txs []*asset.WalletTransaction + + err := s.View(func(txn *badger.Txn) error { + var startNonceKey []byte + if refID != nil { + // Get the nonce for the provided tx hash. + tk := txHashKey(*refID) + item, err := txn.Get(tk) + if err != nil { + return err + } + err = item.Value(func(nonceB []byte) error { + startNonceKey = nonceB + return nil + }) + if err != nil { + return err + } + } else { + past = true + } + if startNonceKey == nil { + startNonceKey = maxNonceKey + } + + opts := badger.DefaultIteratorOptions + opts.Reverse = past + opts.Prefix = noncePrefix + it := txn.NewIterator(opts) + defer it.Close() + + for it.Seek(startNonceKey); it.Valid() && n <= 0 || len(txs) < n; it.Next() { + item := it.Item() + err := item.Value(func(wtB []byte) error { + wt := new(asset.WalletTransaction) + err := json.Unmarshal(wtB, wt) + if err != nil { + s.log.Errorf("unable to unmarhsal wallet transaction: %s: %v", string(wtB), err) + return err + } + if refID != nil && bytes.Equal(wt.ID, *refID) { + return nil + } + if past { + txs = append([]*asset.WalletTransaction{wt}, txs...) + } else { + txs = append(txs, wt) + } + return nil + }) + if err != nil { + return err + } + } + return nil + }) + + return txs, err +} + +// getPendingTxs returns a map of nonce to extendedWalletTx for all +// pending transactions. +func (s *badgerTxDB) getPendingTxs() (map[uint64]*extendedWalletTx, error) { + // We will be iterating backwards from the most recent nonce. + // If we find numConfirmedTxsToCheck consecutive confirmed transactions, + // we can stop iterating. + const numConfirmedTxsToCheck = 20 + + txs := make(map[uint64]*extendedWalletTx, 4) + + err := s.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.Reverse = true + opts.Prefix = noncePrefix + it := txn.NewIterator(opts) + defer it.Close() + + var numConfirmedTxs int + for it.Seek(maxNonceKey); it.Valid(); it.Next() { + item := it.Item() + err := item.Value(func(wtB []byte) error { + wt := new(extendedWalletTx) + err := json.Unmarshal(wtB, wt) + if err != nil { + s.log.Errorf("unable to unmarhsal wallet transaction: %s: %v", string(wtB), err) + return err + } + if !wt.Confirmed { + numConfirmedTxs = 0 + nonce, err := nonceFromKey(item.Key()) + if err != nil { + return err + } + txs[nonce] = wt + } else { + numConfirmedTxs++ + if numConfirmedTxs >= numConfirmedTxsToCheck { + return nil + } + } + return nil + }) + if err != nil { + return err + } + } + return nil + }) + + return txs, err +} + +// storeMonitoredTx stores a monitoredTx in the database. +func (s *badgerTxDB) storeMonitoredTx(txHash common.Hash, tx *monitoredTx) error { + txKey := monitoredTxKey(txHash.Bytes()) + txBytes, err := tx.MarshalBinary() + if err != nil { + return err + } + return s.Update(func(txn *badger.Txn) error { + return txn.Set(txKey, txBytes) + }) +} + +// getMonitoredTxs returns a map of transaction hash to monitoredTx for all +// monitored transactions. +func (s *badgerTxDB) getMonitoredTxs() (map[common.Hash]*monitoredTx, error) { + monitoredTxs := make(map[common.Hash]*monitoredTx) + + err := s.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.Prefix = monitoredTxPrefix + it := txn.NewIterator(opts) + defer it.Close() + + for it.Seek(monitoredTxPrefix); it.Valid(); it.Next() { + item := it.Item() + err := item.Value(func(txBytes []byte) error { + tx := new(monitoredTx) + err := tx.UnmarshalBinary(txBytes) + if err != nil { + return err + } + txHash, err := monitoredTxHashFromKey(item.Key()) + if err != nil { + return err + } + monitoredTxs[txHash] = tx + return nil + }) + if err != nil { + return err + } + } + return nil + }) + + return monitoredTxs, err +} + +// removeMonitoredTxs removes the monitored transactions with the provided +// hashes from the database. +func (s *badgerTxDB) removeMonitoredTxs(txHashes []common.Hash) error { + return s.Update(func(txn *badger.Txn) error { + for _, txHash := range txHashes { + txKey := monitoredTxKey(txHash.Bytes()) + err := txn.Delete(txKey) + if err != nil { + return err + } + } + return nil + }) +} diff --git a/client/asset/eth/txdb_test.go b/client/asset/eth/txdb_test.go new file mode 100644 index 0000000000..0c6c451cea --- /dev/null +++ b/client/asset/eth/txdb_test.go @@ -0,0 +1,424 @@ +package eth + +import ( + "reflect" + "testing" + + "decred.org/dcrdex/client/asset" + "decred.org/dcrdex/dex" + "decred.org/dcrdex/dex/encode" + "github.com/dgraph-io/badger" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +func TestTxDB(t *testing.T) { + tempDir := t.TempDir() + tLogger := dex.StdOutLogger("TXDB", dex.LevelTrace) + + txHistoryStore, err := newBadgerTxDB(tempDir, tLogger) + if err != nil { + t.Fatalf("error creating tx history store: %v", err) + } + + txs, err := txHistoryStore.getTxs(0, nil, true) + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + if len(txs) != 0 { + t.Fatalf("expected 0 txs but got %d", len(txs)) + } + + wt1 := &extendedWalletTx{ + WalletTransaction: &asset.WalletTransaction{ + Type: asset.Send, + ID: encode.RandomBytes(32), + BalanceDelta: -100, + Fees: 300, + BlockNumber: 123, + AdditionalData: map[string]string{ + "Nonce": "1", + }, + TokenID: &simnetTokenID, + }, + Confirmed: true, + } + + wt2 := &extendedWalletTx{ + WalletTransaction: &asset.WalletTransaction{ + Type: asset.Swap, + ID: encode.RandomBytes(32), + BalanceDelta: -200, + Fees: 100, + BlockNumber: 124, + AdditionalData: map[string]string{ + "Nonce": "2", + }, + }, + } + + wt3 := &extendedWalletTx{ + WalletTransaction: &asset.WalletTransaction{ + Type: asset.Redeem, + ID: encode.RandomBytes(32), + BalanceDelta: 200, + Fees: 200, + BlockNumber: 125, + AdditionalData: map[string]string{ + "Nonce": "3", + }, + }, + } + + wt4 := &extendedWalletTx{ + WalletTransaction: &asset.WalletTransaction{ + Type: asset.Redeem, + ID: encode.RandomBytes(32), + BalanceDelta: 200, + Fees: 300, + BlockNumber: 125, + AdditionalData: map[string]string{ + "Nonce": "3", + }, + }, + } + + err = txHistoryStore.storeTx(1, wt1) + if err != nil { + t.Fatalf("error storing tx: %v", err) + } + txs, err = txHistoryStore.getTxs(0, nil, true) + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + expectedTxs := []*asset.WalletTransaction{wt1.WalletTransaction} + if !reflect.DeepEqual(expectedTxs, txs) { + t.Fatalf("expected txs %+v but got %+v", expectedTxs, txs) + } + + err = txHistoryStore.storeTx(2, wt2) + if err != nil { + t.Fatalf("error storing tx: %v", err) + } + txs, err = txHistoryStore.getTxs(0, nil, true) + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + expectedTxs = []*asset.WalletTransaction{wt1.WalletTransaction, wt2.WalletTransaction} + if !reflect.DeepEqual(expectedTxs, txs) { + t.Fatalf("expected txs %+v but got %+v", expectedTxs, txs) + } + + err = txHistoryStore.storeTx(3, wt3) + if err != nil { + t.Fatalf("error storing tx: %v", err) + } + txs, err = txHistoryStore.getTxs(2, nil, true) + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + expectedTxs = []*asset.WalletTransaction{wt2.WalletTransaction, wt3.WalletTransaction} + if !reflect.DeepEqual(expectedTxs, txs) { + t.Fatalf("expected txs %+v but got %+v", expectedTxs, txs) + } + + txs, err = txHistoryStore.getTxs(0, &wt2.ID, true) + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + expectedTxs = []*asset.WalletTransaction{wt1.WalletTransaction} + if !reflect.DeepEqual(expectedTxs, txs) { + t.Fatalf("expected txs %+v but got %+v", expectedTxs, txs) + } + + txs, err = txHistoryStore.getTxs(0, &wt2.ID, false) + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + expectedTxs = []*asset.WalletTransaction{wt3.WalletTransaction} + if !reflect.DeepEqual(expectedTxs, txs) { + t.Fatalf("expected txs %+v but got %+v", expectedTxs, txs) + } + + // Update nonce with different tx + err = txHistoryStore.storeTx(3, wt4) + if err != nil { + t.Fatalf("error storing tx: %v", err) + } + txs, err = txHistoryStore.getTxs(0, nil, false) + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + if len(txs) != 3 { + t.Fatalf("expected 3 txs but got %d", len(txs)) + } + expectedTxs = []*asset.WalletTransaction{wt1.WalletTransaction, wt2.WalletTransaction, wt4.WalletTransaction} + if !reflect.DeepEqual(expectedTxs, txs) { + t.Fatalf("expected txs %+v but got %+v", expectedTxs, txs) + } + + // Update same tx with new fee + wt4.Fees = 300 + err = txHistoryStore.storeTx(3, wt4) + if err != nil { + t.Fatalf("error storing tx: %v", err) + } + txs, err = txHistoryStore.getTxs(0, nil, false) + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + expectedTxs = []*asset.WalletTransaction{wt1.WalletTransaction, wt2.WalletTransaction, wt4.WalletTransaction} + if !reflect.DeepEqual(expectedTxs, txs) { + t.Fatalf("expected txs %+v but got %+v", expectedTxs, txs) + } + + txHistoryStore.close() + + txHistoryStore, err = newBadgerTxDB(tempDir, dex.StdOutLogger("TXDB", dex.LevelTrace)) + if err != nil { + t.Fatalf("error creating tx history store: %v", err) + } + defer txHistoryStore.close() + + txs, err = txHistoryStore.getTxs(0, nil, false) + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + expectedTxs = []*asset.WalletTransaction{wt1.WalletTransaction, wt2.WalletTransaction, wt4.WalletTransaction} + if !reflect.DeepEqual(expectedTxs, txs) { + t.Fatalf("expected txs %+v but got %+v", expectedTxs, txs) + } + + unconfirmedTxs, err := txHistoryStore.getPendingTxs() + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + expectedUnconfirmedTxs := map[uint64]*extendedWalletTx{ + 3: wt4, + 2: wt2, + } + if !reflect.DeepEqual(expectedUnconfirmedTxs, unconfirmedTxs) { + t.Fatalf("expected txs %+v but got %+v", expectedUnconfirmedTxs, unconfirmedTxs) + } + + err = txHistoryStore.removeTx(wt2.ID) + if err != nil { + t.Fatalf("error removing tx: %v", err) + } + + txs, err = txHistoryStore.getTxs(0, nil, false) + if err != nil { + t.Fatalf("error retrieving txs: %v", err) + } + expectedTxs = []*asset.WalletTransaction{wt1.WalletTransaction, wt4.WalletTransaction} + if !reflect.DeepEqual(expectedTxs, txs) { + t.Fatalf("expected txs %+v but got %+v", expectedTxs, txs) + } + txHashes := make([]common.Hash, 3) + for i := range txHashes { + txHashes[i] = common.BytesToHash(encode.RandomBytes(32)) + } + + monitoredTx1 := &monitoredTx{ + tx: types.NewTx(&types.LegacyTx{Data: []byte{1}}), + replacementTx: &txHashes[1], + blockSubmitted: 1, + } + monitoredTx2 := &monitoredTx{ + tx: types.NewTx(&types.LegacyTx{Data: []byte{2}}), + replacementTx: &txHashes[2], + replacedTx: &txHashes[0], + blockSubmitted: 2, + } + monitoredTx3 := &monitoredTx{ + tx: types.NewTx(&types.LegacyTx{Data: []byte{3}}), + replacedTx: &txHashes[1], + blockSubmitted: 3, + } + + txHistoryStore.storeMonitoredTx(txHashes[0], monitoredTx1) + txHistoryStore.storeMonitoredTx(txHashes[1], monitoredTx2) + txHistoryStore.storeMonitoredTx(txHashes[2], monitoredTx3) + monitoredTxs, err := txHistoryStore.getMonitoredTxs() + if err != nil { + t.Fatalf("error retrieving monitored txs: %v", err) + } + + expectedMonitoredTxs := map[common.Hash]*monitoredTx{ + txHashes[0]: monitoredTx1, + txHashes[1]: monitoredTx2, + txHashes[2]: monitoredTx3, + } + + if len(monitoredTxs) != len(expectedMonitoredTxs) { + t.Fatalf("expected %d monitored txs but got %d", len(expectedMonitoredTxs), len(monitoredTxs)) + } + + monitoredTxsEqual := func(a, b *monitoredTx) bool { + if a.tx.Hash() != b.tx.Hash() { + return false + } + if a.replacementTx != nil && b.replacementTx != nil && *a.replacementTx != *b.replacementTx { + return false + } + if a.replacedTx != nil && b.replacedTx != nil && *a.replacedTx != *b.replacedTx { + return false + } + if a.blockSubmitted != b.blockSubmitted { + return false + } + return true + } + + for txHash, monitoredTx := range monitoredTxs { + expectedMonitoredTx := expectedMonitoredTxs[txHash] + if !monitoredTxsEqual(monitoredTx, expectedMonitoredTxs[txHash]) { + t.Fatalf("expected monitored tx %+v but got %+v", expectedMonitoredTx, monitoredTx) + } + } + + err = txHistoryStore.removeMonitoredTxs([]common.Hash{txHashes[0]}) + if err != nil { + t.Fatalf("error removing monitored tx: %v", err) + } + + monitoredTxs, err = txHistoryStore.getMonitoredTxs() + if err != nil { + t.Fatalf("error retrieving monitored txs: %v", err) + } + + expectedMonitoredTxs = map[common.Hash]*monitoredTx{ + txHashes[1]: monitoredTx2, + txHashes[2]: monitoredTx3, + } + + if len(monitoredTxs) != len(expectedMonitoredTxs) { + t.Fatalf("expected %d monitored txs but got %d", len(expectedMonitoredTxs), len(monitoredTxs)) + } + + for txHash, monitoredTx := range monitoredTxs { + expectedMonitoredTx := expectedMonitoredTxs[txHash] + if !monitoredTxsEqual(monitoredTx, expectedMonitoredTxs[txHash]) { + t.Fatalf("expected monitored tx %+v but got %+v", expectedMonitoredTx, monitoredTx) + } + } + + err = txHistoryStore.removeMonitoredTxs([]common.Hash{txHashes[1], txHashes[2]}) + if err != nil { + t.Fatalf("error removing monitored tx: %v", err) + } + + monitoredTxs, err = txHistoryStore.getMonitoredTxs() + if err != nil { + t.Fatalf("error retrieving monitored txs: %v", err) + } + + if len(monitoredTxs) != 0 { + t.Fatalf("expected 0 monitored txs but got %d", len(monitoredTxs)) + } +} + +func TestTxDBUpgrade(t *testing.T) { + dir := t.TempDir() + tLogger := dex.StdOutLogger("TXDB", dex.LevelTrace) + + opts := badger.DefaultOptions(dir).WithLogger(&badgerLoggerWrapper{tLogger}) + db, err := badger.Open(opts) + if err == badger.ErrTruncateNeeded { + // Probably a Windows thing. + // https://github.com/dgraph-io/badger/issues/744 + tLogger.Warnf("newTxHistoryStore badger db: %v", err) + // Try again with value log truncation enabled. + opts.Truncate = true + tLogger.Warnf("Attempting to reopen badger DB with the Truncate option set...") + db, err = badger.Open(opts) + } + if err != nil { + t.Fatalf("error opening badger db: %v", err) + } + + txHashes := make([]common.Hash, 3) + for i := range txHashes { + txHashes[i] = common.BytesToHash(encode.RandomBytes(32)) + } + + monitoredTxs := map[common.Hash]*monitoredTx{ + txHashes[0]: { + tx: types.NewTx(&types.LegacyTx{Data: []byte{1}}), + replacementTx: &txHashes[1], + blockSubmitted: 1, + }, + txHashes[1]: { + tx: types.NewTx(&types.LegacyTx{Data: []byte{2}}), + replacementTx: &txHashes[2], + replacedTx: &txHashes[0], + blockSubmitted: 2, + }, + txHashes[2]: { + tx: types.NewTx(&types.LegacyTx{Data: []byte{3}}), + replacedTx: &txHashes[1], + blockSubmitted: 3, + }, + } + + err = db.Update(func(txn *badger.Txn) error { + for txHash, monitoredTx := range monitoredTxs { + monitoredTxB, err := monitoredTx.MarshalBinary() + if err != nil { + return err + } + + th := txHash + err = txn.Set(th[:], monitoredTxB) + if err != nil { + return err + } + } + return nil + }) + if err != nil { + t.Fatalf("error storing monitored txs: %v", err) + } + + err = db.Close() + if err != nil { + t.Fatalf("error closing badger db: %v", err) + } + + txHistoryStore, err := newBadgerTxDB(dir, tLogger) + if err != nil { + t.Fatalf("error creating tx history store: %v", err) + } + + retrievedMonitoredTxs, err := txHistoryStore.getMonitoredTxs() + if err != nil { + t.Fatalf("error retrieving monitored txs: %v", err) + } + + if len(retrievedMonitoredTxs) != len(monitoredTxs) { + t.Fatalf("expected %d monitored txs but got %d", len(monitoredTxs), len(retrievedMonitoredTxs)) + } + + monitoredTxsEqual := func(a, b *monitoredTx) bool { + if a.tx.Hash() != b.tx.Hash() { + return false + } + if a.replacementTx != nil && b.replacementTx != nil && *a.replacementTx != *b.replacementTx { + return false + } + if a.replacedTx != nil && b.replacedTx != nil && *a.replacedTx != *b.replacedTx { + return false + } + if a.blockSubmitted != b.blockSubmitted { + return false + } + return true + } + + for txHash, monitoredTx := range retrievedMonitoredTxs { + expectedMonitoredTx := monitoredTxs[txHash] + if !monitoredTxsEqual(monitoredTx, expectedMonitoredTx) { + t.Fatalf("expected monitored tx %+v but got %+v", expectedMonitoredTx, monitoredTx) + } + } +} diff --git a/client/asset/interface.go b/client/asset/interface.go index 32b3b8d6bb..a91e59d915 100644 --- a/client/asset/interface.go +++ b/client/asset/interface.go @@ -1059,6 +1059,9 @@ type WalletTransaction struct { Fees uint64 `json:"fees"` // BlockNumber is 0 for txs in the mempool. BlockNumber uint64 `json:"blockNumber"` + // TokenID will be non-nil if the BalanceDelta applies to the balance + // of a token. + TokenID *uint32 `json:"tokenID,omitempty"` // AdditionalData contains asset specific information, i.e. nonce // for ETH. AdditionalData map[string]string `json:"additionalData"`