diff --git a/cmd/kaspawallet/daemon/server/balance.go b/cmd/kaspawallet/daemon/server/balance.go index 429764d37f..de1b040066 100644 --- a/cmd/kaspawallet/daemon/server/balance.go +++ b/cmd/kaspawallet/daemon/server/balance.go @@ -2,6 +2,7 @@ package server import ( "context" + "github.com/pkg/errors" "github.com/kaspanet/kaspad/cmd/kaspawallet/daemon/pb" "github.com/kaspanet/kaspad/cmd/kaspawallet/libkaspawallet" @@ -14,13 +15,15 @@ func (s *server) GetBalance(_ context.Context, _ *pb.GetBalanceRequest) (*pb.Get s.lock.RLock() defer s.lock.RUnlock() + if !s.isSynced() { + return nil, errors.Errorf("wallet daemon is not synced yet, %s", s.formatSyncStateReport()) + } + dagInfo, err := s.rpcClient.GetBlockDAGInfo() if err != nil { return nil, err } daaScore := dagInfo.VirtualDAAScore - maturity := s.params.BlockCoinbaseMaturity - balancesMap := make(balancesMapType, 0) for _, entry := range s.utxosSortedByAmount { amount := entry.UTXOEntry.Amount() @@ -30,7 +33,7 @@ func (s *server) GetBalance(_ context.Context, _ *pb.GetBalanceRequest) (*pb.Get balances = new(balancesType) balancesMap[address] = balances } - if isUTXOSpendable(entry, daaScore, maturity) { + if s.isUTXOSpendable(entry, daaScore) { balances.available += amount } else { balances.pending += amount @@ -64,9 +67,9 @@ func (s *server) GetBalance(_ context.Context, _ *pb.GetBalanceRequest) (*pb.Get }, nil } -func isUTXOSpendable(entry *walletUTXO, virtualDAAScore uint64, coinbaseMaturity uint64) bool { +func (s *server) isUTXOSpendable(entry *walletUTXO, virtualDAAScore uint64) bool { if !entry.UTXOEntry.IsCoinbase() { return true } - return entry.UTXOEntry.BlockDAAScore()+coinbaseMaturity < virtualDAAScore + return entry.UTXOEntry.BlockDAAScore()+s.coinbaseMaturity < virtualDAAScore } diff --git a/cmd/kaspawallet/daemon/server/broadcast.go b/cmd/kaspawallet/daemon/server/broadcast.go index 2b93a6d7f3..027ff41aca 100644 --- a/cmd/kaspawallet/daemon/server/broadcast.go +++ b/cmd/kaspawallet/daemon/server/broadcast.go @@ -54,11 +54,7 @@ func (s *server) broadcast(transactions [][]byte, isDomain bool) ([]string, erro } } - err = s.refreshUTXOs() - if err != nil { - return nil, err - } - + s.forceSync() return txIDs, nil } diff --git a/cmd/kaspawallet/daemon/server/create_unsigned_transaction.go b/cmd/kaspawallet/daemon/server/create_unsigned_transaction.go index fe75e2d28c..950b1d1e61 100644 --- a/cmd/kaspawallet/daemon/server/create_unsigned_transaction.go +++ b/cmd/kaspawallet/daemon/server/create_unsigned_transaction.go @@ -3,14 +3,12 @@ package server import ( "context" "fmt" - "time" "github.com/kaspanet/kaspad/cmd/kaspawallet/daemon/pb" "github.com/kaspanet/kaspad/cmd/kaspawallet/libkaspawallet" "github.com/kaspanet/kaspad/domain/consensus/utils/constants" "github.com/kaspanet/kaspad/util" "github.com/pkg/errors" - "golang.org/x/exp/slices" ) // TODO: Implement a better fee estimation mechanism @@ -35,7 +33,6 @@ func (s *server) createUnsignedTransactions(address string, amount uint64, isSen if !s.isSynced() { return nil, errors.Errorf("wallet daemon is not synced yet, %s", s.formatSyncStateReport()) } - // make sure address string is correct before proceeding to a // potentially long UTXO refreshment operation toAddress, err := util.DecodeAddress(address, s.params.Prefix) @@ -43,16 +40,11 @@ func (s *server) createUnsignedTransactions(address string, amount uint64, isSen return nil, err } - err = s.refreshUTXOs() - if err != nil { - return nil, err - } - var fromAddresses []*walletAddress for _, from := range fromAddressesString { fromAddress, exists := s.addressSet[from] if !exists { - return nil, fmt.Errorf("Specified from address %s does not exists", from) + return nil, fmt.Errorf("specified from address %s does not exists", from) } fromAddresses = append(fromAddresses, fromAddress) } @@ -106,19 +98,14 @@ func (s *server) selectUTXOs(spendAmount uint64, isSendAll bool, feePerInput uin return nil, 0, 0, err } - coinbaseMaturity := s.params.BlockCoinbaseMaturity - if dagInfo.NetworkName == "kaspa-testnet-11" { - coinbaseMaturity = 1000 - } - for _, utxo := range s.utxosSortedByAmount { - if (fromAddresses != nil && !slices.Contains(fromAddresses, utxo.address)) || - !isUTXOSpendable(utxo, dagInfo.VirtualDAAScore, coinbaseMaturity) { + if (fromAddresses != nil && !walletAddressesContain(fromAddresses, utxo.address)) || + !s.isUTXOSpendable(utxo, dagInfo.VirtualDAAScore) { continue } if broadcastTime, ok := s.usedOutpoints[*utxo.Outpoint]; ok { - if time.Since(broadcastTime) > time.Minute { + if s.usedOutpointHasExpired(broadcastTime) { delete(s.usedOutpoints, *utxo.Outpoint) } else { continue @@ -160,3 +147,13 @@ func (s *server) selectUTXOs(spendAmount uint64, isSendAll bool, feePerInput uin return selectedUTXOs, totalReceived, totalValue - totalSpend, nil } + +func walletAddressesContain(addresses []*walletAddress, contain *walletAddress) bool { + for _, address := range addresses { + if *address == *contain { + return true + } + } + + return false +} diff --git a/cmd/kaspawallet/daemon/server/server.go b/cmd/kaspawallet/daemon/server/server.go index 6afc311834..1b43dac8cf 100644 --- a/cmd/kaspawallet/daemon/server/server.go +++ b/cmd/kaspawallet/daemon/server/server.go @@ -2,12 +2,14 @@ package server import ( "fmt" - "github.com/kaspanet/kaspad/version" "net" "os" "sync" + "sync/atomic" "time" + "github.com/kaspanet/kaspad/version" + "github.com/kaspanet/kaspad/domain/consensus/model/externalapi" "github.com/kaspanet/kaspad/util/txmass" @@ -28,17 +30,22 @@ import ( type server struct { pb.UnimplementedKaspawalletdServer - rpcClient *rpcclient.RPCClient - params *dagconfig.Params - - lock sync.RWMutex - utxosSortedByAmount []*walletUTXO - nextSyncStartIndex uint32 - keysFile *keys.File - shutdown chan struct{} - addressSet walletAddressSet - txMassCalculator *txmass.Calculator - usedOutpoints map[externalapi.DomainOutpoint]time.Time + rpcClient *rpcclient.RPCClient // RPC client for ongoing user requests + backgroundRPCClient *rpcclient.RPCClient // RPC client dedicated for address and UTXO background fetching + params *dagconfig.Params + coinbaseMaturity uint64 // Is different from default if we use testnet-11 + + lock sync.RWMutex + utxosSortedByAmount []*walletUTXO + nextSyncStartIndex uint32 + keysFile *keys.File + shutdown chan struct{} + forceSyncChan chan struct{} + startTimeOfLastCompletedRefresh time.Time + addressSet walletAddressSet + txMassCalculator *txmass.Calculator + usedOutpoints map[externalapi.DomainOutpoint]time.Time + firstSyncDone atomic.Bool isLogFinalProgressLineShown bool maxUsedAddressesForLog uint32 @@ -72,6 +79,10 @@ func Start(params *dagconfig.Params, listen, rpcServer string, keysFilePath stri if err != nil { return (errors.Wrapf(err, "Error connecting to RPC server %s", rpcServer)) } + backgroundRPCClient, err := connectToRPC(params, rpcServer, timeout) + if err != nil { + return (errors.Wrapf(err, "Error making a second connection to RPC server %s", rpcServer)) + } log.Infof("Connected, reading keys file %s...", keysFilePath) keysFile, err := keys.ReadKeysFile(params, keysFilePath) @@ -84,13 +95,26 @@ func Start(params *dagconfig.Params, listen, rpcServer string, keysFilePath stri return err } + dagInfo, err := rpcClient.GetBlockDAGInfo() + if err != nil { + return nil + } + + coinbaseMaturity := params.BlockCoinbaseMaturity + if dagInfo.NetworkName == "kaspa-testnet-11" { + coinbaseMaturity = 1000 + } + serverInstance := &server{ rpcClient: rpcClient, + backgroundRPCClient: backgroundRPCClient, params: params, + coinbaseMaturity: coinbaseMaturity, utxosSortedByAmount: []*walletUTXO{}, nextSyncStartIndex: 0, keysFile: keysFile, shutdown: make(chan struct{}), + forceSyncChan: make(chan struct{}), addressSet: make(walletAddressSet), txMassCalculator: txmass.NewCalculator(params.MassPerTxByte, params.MassPerScriptPubKeyByte, params.MassPerSigOp), usedOutpoints: map[externalapi.DomainOutpoint]time.Time{}, @@ -100,8 +124,8 @@ func Start(params *dagconfig.Params, listen, rpcServer string, keysFilePath stri } log.Infof("Read, syncing the wallet...") - spawn("serverInstance.sync", func() { - err := serverInstance.sync() + spawn("serverInstance.syncLoop", func() { + err := serverInstance.syncLoop() if err != nil { printErrorAndExit(errors.Wrap(err, "error syncing the wallet")) } diff --git a/cmd/kaspawallet/daemon/server/split_transaction.go b/cmd/kaspawallet/daemon/server/split_transaction.go index b2154c9007..2f71fb9c2a 100644 --- a/cmd/kaspawallet/daemon/server/split_transaction.go +++ b/cmd/kaspawallet/daemon/server/split_transaction.go @@ -264,7 +264,7 @@ func (s *server) moreUTXOsForMergeTransaction(alreadySelectedUTXOs []*libkaspawa if _, ok := alreadySelectedUTXOsMap[*utxo.Outpoint]; ok { continue } - if !isUTXOSpendable(utxo, dagInfo.VirtualDAAScore, s.params.BlockCoinbaseMaturity) { + if !s.isUTXOSpendable(utxo, dagInfo.VirtualDAAScore) { continue } additionalUTXOs = append(additionalUTXOs, &libkaspawallet.UTXO{ diff --git a/cmd/kaspawallet/daemon/server/sync.go b/cmd/kaspawallet/daemon/server/sync.go index 0bfa622297..6aa7bd2b4e 100644 --- a/cmd/kaspawallet/daemon/server/sync.go +++ b/cmd/kaspawallet/daemon/server/sync.go @@ -23,7 +23,7 @@ func (was walletAddressSet) strings() []string { return addresses } -func (s *server) sync() error { +func (s *server) syncLoop() error { ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -32,29 +32,39 @@ func (s *server) sync() error { return err } - err = s.refreshExistingUTXOsWithLock() + err = s.refreshUTXOs() if err != nil { return err } - for range ticker.C { - err = s.collectFarAddresses() - if err != nil { - return err - } + s.firstSyncDone.Store(true) + log.Infof("Wallet is synced and ready for operation") - err = s.collectRecentAddresses() - if err != nil { - return err + for { + select { + case <-ticker.C: + case <-s.forceSyncChan: } - err = s.refreshExistingUTXOsWithLock() + err := s.sync() if err != nil { return err } } +} - return nil +func (s *server) sync() error { + err := s.collectFarAddresses() + if err != nil { + return err + } + + err = s.collectRecentAddresses() + if err != nil { + return err + } + + return s.refreshUTXOs() } const ( @@ -158,7 +168,7 @@ func (s *server) collectAddresses(start, end uint32) error { return err } - getBalancesByAddressesResponse, err := s.rpcClient.GetBalancesByAddresses(addressSet.strings()) + getBalancesByAddressesResponse, err := s.backgroundRPCClient.GetBalancesByAddresses(addressSet.strings()) if err != nil { return err } @@ -208,15 +218,17 @@ func (s *server) updateAddressesAndLastUsedIndexes(requestedAddressSet walletAdd return s.keysFile.SetLastUsedInternalIndex(lastUsedInternalIndex) } -func (s *server) refreshExistingUTXOsWithLock() error { - s.lock.Lock() - defer s.lock.Unlock() - - return s.refreshUTXOs() +func (s *server) usedOutpointHasExpired(outpointBroadcastTime time.Time) bool { + // If the node returns a UTXO we previously attempted to spend and enough time has passed, we assume + // that the network rejected or lost the previous transaction and allow a reuse. We set this time + // interval to a minute. + // We also verify that a full refresh UTXO operation started after this time point and has already + // completed, in order to make sure that indeed this state reflects a state obtained following the required wait time. + return s.startTimeOfLastCompletedRefresh.After(outpointBroadcastTime.Add(time.Minute)) } // updateUTXOSet clears the current UTXO set, and re-fills it with the given entries -func (s *server) updateUTXOSet(entries []*appmessage.UTXOsByAddressesEntry, mempoolEntries []*appmessage.MempoolEntryByAddress) error { +func (s *server) updateUTXOSet(entries []*appmessage.UTXOsByAddressesEntry, mempoolEntries []*appmessage.MempoolEntryByAddress, refreshStart time.Time) error { utxos := make([]*walletUTXO, 0, len(entries)) exclude := make(map[appmessage.RPCOutpoint]struct{}) @@ -243,6 +255,7 @@ func (s *server) updateUTXOSet(entries []*appmessage.UTXOsByAddressesEntry, memp return err } + // No need to lock for reading since the only writer of this set is on `syncLoop` on the same goroutine. address, ok := s.addressSet[entry.Address] if !ok { return errors.Errorf("Got result from address %s even though it wasn't requested", entry.Address) @@ -256,32 +269,56 @@ func (s *server) updateUTXOSet(entries []*appmessage.UTXOsByAddressesEntry, memp sort.Slice(utxos, func(i, j int) bool { return utxos[i].UTXOEntry.Amount() > utxos[j].UTXOEntry.Amount() }) + s.lock.Lock() + s.startTimeOfLastCompletedRefresh = refreshStart s.utxosSortedByAmount = utxos + // Cleanup expired used outpoints to avoid a memory leak + for outpoint, broadcastTime := range s.usedOutpoints { + if s.usedOutpointHasExpired(broadcastTime) { + delete(s.usedOutpoints, outpoint) + } + } + s.lock.Unlock() + return nil } func (s *server) refreshUTXOs() error { + refreshStart := time.Now() + + // No need to lock for reading since the only writer of this set is on `syncLoop` on the same goroutine. + addresses := s.addressSet.strings() // It's important to check the mempool before calling `GetUTXOsByAddresses`: // If we would do it the other way around an output can be spent in the mempool // and not in consensus, and between the calls its spending transaction will be // added to consensus and removed from the mempool, so `getUTXOsByAddressesResponse` // will include an obsolete output. - mempoolEntriesByAddresses, err := s.rpcClient.GetMempoolEntriesByAddresses(s.addressSet.strings(), true, true) + mempoolEntriesByAddresses, err := s.backgroundRPCClient.GetMempoolEntriesByAddresses(addresses, true, true) if err != nil { return err } - getUTXOsByAddressesResponse, err := s.rpcClient.GetUTXOsByAddresses(s.addressSet.strings()) + getUTXOsByAddressesResponse, err := s.backgroundRPCClient.GetUTXOsByAddresses(addresses) if err != nil { return err } - return s.updateUTXOSet(getUTXOsByAddressesResponse.Entries, mempoolEntriesByAddresses.Entries) + return s.updateUTXOSet(getUTXOsByAddressesResponse.Entries, mempoolEntriesByAddresses.Entries, refreshStart) +} + +func (s *server) forceSync() { + // Technically if two callers check the `if` simultaneously they will both spawn a + // goroutine, but we don't care about the small redundancy in such a rare case. + if len(s.forceSyncChan) == 0 { + go func() { + s.forceSyncChan <- struct{}{} + }() + } } func (s *server) isSynced() bool { - return s.nextSyncStartIndex > s.maxUsedIndex() + return s.nextSyncStartIndex > s.maxUsedIndex() && s.firstSyncDone.Load() } func (s *server) formatSyncStateReport() string { @@ -291,8 +328,11 @@ func (s *server) formatSyncStateReport() string { maxUsedIndex = s.nextSyncStartIndex } - return fmt.Sprintf("scanned %d out of %d addresses (%.2f%%)", - s.nextSyncStartIndex, maxUsedIndex, float64(s.nextSyncStartIndex)*100.0/float64(maxUsedIndex)) + if s.nextSyncStartIndex < s.maxUsedIndex() { + return fmt.Sprintf("scanned %d out of %d addresses (%.2f%%)", + s.nextSyncStartIndex, maxUsedIndex, float64(s.nextSyncStartIndex)*100.0/float64(maxUsedIndex)) + } + return "loading the wallet UTXO set" } func (s *server) updateSyncingProgressLog(currProcessedAddresses, currMaxUsedAddresses uint32) { @@ -311,7 +351,7 @@ func (s *server) updateSyncingProgressLog(currProcessedAddresses, currMaxUsedAdd if s.maxProcessedAddressesForLog >= s.maxUsedAddressesForLog { if !s.isLogFinalProgressLineShown { - log.Infof("Wallet is synced, ready for queries") + log.Infof("Finished scanning recent addresses") s.isLogFinalProgressLineShown = true } } else {