Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy wallet utxo sync after broadcasting a tx #2258

Merged
merged 13 commits into from
Dec 27, 2023
6 changes: 1 addition & 5 deletions cmd/kaspawallet/daemon/server/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,7 @@ func (s *server) broadcast(transactions [][]byte, isDomain bool) ([]string, erro
}
}

err = s.refreshUTXOs()
if err != nil {
return nil, err
}

s.forceSync()
return txIDs, nil
}

Expand Down
10 changes: 3 additions & 7 deletions cmd/kaspawallet/daemon/server/create_unsigned_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,13 @@ func (s *server) createUnsignedTransactions(address string, amount uint64, isSen
if !s.isSynced() {
return nil, errors.Errorf("wallet daemon is not synced yet, %s", s.formatSyncStateReport())
}

// make sure address string is correct before proceeding to a
// potentially long UTXO refreshment operation
toAddress, err := util.DecodeAddress(address, s.params.Prefix)
if err != nil {
return nil, err
}

err = s.refreshUTXOs()
if err != nil {
return nil, err
}

var fromAddresses []*walletAddress
for _, from := range fromAddressesString {
fromAddress, exists := s.addressSet[from]
Expand Down Expand Up @@ -118,7 +112,9 @@ func (s *server) selectUTXOs(spendAmount uint64, isSendAll bool, feePerInput uin
}

if broadcastTime, ok := s.usedOutpoints[*utxo.Outpoint]; ok {
if time.Since(broadcastTime) > time.Minute {
// We want to free an outpoint from the used outpoints set if we refresh the UTXOs since it was
// marked as used, and it least one minute has passed.
if time.Since(broadcastTime) > time.Minute && s.startTimeOfLastCompletedRefresh.After(broadcastTime) {
michaelsutton marked this conversation as resolved.
Show resolved Hide resolved
delete(s.usedOutpoints, *utxo.Outpoint)
} else {
continue
Expand Down
25 changes: 15 additions & 10 deletions cmd/kaspawallet/daemon/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net"
"os"
"sync"
"sync/atomic"
"time"

"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
Expand All @@ -31,14 +32,17 @@ type server struct {
rpcClient *rpcclient.RPCClient
params *dagconfig.Params

lock sync.RWMutex
utxosSortedByAmount []*walletUTXO
nextSyncStartIndex uint32
keysFile *keys.File
shutdown chan struct{}
addressSet walletAddressSet
txMassCalculator *txmass.Calculator
usedOutpoints map[externalapi.DomainOutpoint]time.Time
lock sync.RWMutex
utxosSortedByAmount []*walletUTXO
nextSyncStartIndex uint32
keysFile *keys.File
shutdown chan struct{}
forceSyncChan chan struct{}
startTimeOfLastCompletedRefresh time.Time
addressSet walletAddressSet
txMassCalculator *txmass.Calculator
usedOutpoints map[externalapi.DomainOutpoint]time.Time
firstSyncDone atomic.Bool

isLogFinalProgressLineShown bool
maxUsedAddressesForLog uint32
Expand Down Expand Up @@ -91,6 +95,7 @@ func Start(params *dagconfig.Params, listen, rpcServer string, keysFilePath stri
nextSyncStartIndex: 0,
keysFile: keysFile,
shutdown: make(chan struct{}),
forceSyncChan: make(chan struct{}),
addressSet: make(walletAddressSet),
txMassCalculator: txmass.NewCalculator(params.MassPerTxByte, params.MassPerScriptPubKeyByte, params.MassPerSigOp),
usedOutpoints: map[externalapi.DomainOutpoint]time.Time{},
Expand All @@ -100,8 +105,8 @@ func Start(params *dagconfig.Params, listen, rpcServer string, keysFilePath stri
}

log.Infof("Read, syncing the wallet...")
spawn("serverInstance.sync", func() {
err := serverInstance.sync()
spawn("serverInstance.syncLoop", func() {
err := serverInstance.syncLoop()
if err != nil {
printErrorAndExit(errors.Wrap(err, "error syncing the wallet"))
}
Expand Down
70 changes: 45 additions & 25 deletions cmd/kaspawallet/daemon/server/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (was walletAddressSet) strings() []string {
return addresses
}

func (s *server) sync() error {
func (s *server) syncLoop() error {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

Expand All @@ -32,29 +32,39 @@ func (s *server) sync() error {
return err
}

err = s.refreshExistingUTXOsWithLock()
err = s.refreshUTXOs()
if err != nil {
return err
}

for range ticker.C {
err = s.collectFarAddresses()
if err != nil {
return err
}
s.firstSyncDone.Store(true)
log.Infof("Wallet is synced and ready for operation")

err = s.collectRecentAddresses()
if err != nil {
return err
for {
select {
case <-ticker.C:
case <-s.forceSyncChan:
}

err = s.refreshExistingUTXOsWithLock()
err := s.sync()
if err != nil {
return err
}
}
}

return nil
func (s *server) sync() error {
err := s.collectFarAddresses()
if err != nil {
return err
}

err = s.collectRecentAddresses()
if err != nil {
return err
}

return s.refreshUTXOs()
}

const (
Expand Down Expand Up @@ -208,15 +218,8 @@ func (s *server) updateAddressesAndLastUsedIndexes(requestedAddressSet walletAdd
return s.keysFile.SetLastUsedInternalIndex(lastUsedInternalIndex)
}

func (s *server) refreshExistingUTXOsWithLock() error {
s.lock.Lock()
defer s.lock.Unlock()

return s.refreshUTXOs()
}

// updateUTXOSet clears the current UTXO set, and re-fills it with the given entries
func (s *server) updateUTXOSet(entries []*appmessage.UTXOsByAddressesEntry, mempoolEntries []*appmessage.MempoolEntryByAddress) error {
func (s *server) updateUTXOSet(entries []*appmessage.UTXOsByAddressesEntry, mempoolEntries []*appmessage.MempoolEntryByAddress, refreshStart time.Time) error {
utxos := make([]*walletUTXO, 0, len(entries))

exclude := make(map[appmessage.RPCOutpoint]struct{})
Expand Down Expand Up @@ -256,12 +259,16 @@ func (s *server) updateUTXOSet(entries []*appmessage.UTXOsByAddressesEntry, memp

sort.Slice(utxos, func(i, j int) bool { return utxos[i].UTXOEntry.Amount() > utxos[j].UTXOEntry.Amount() })

s.lock.Lock()
s.startTimeOfLastCompletedRefresh = refreshStart
s.utxosSortedByAmount = utxos
s.lock.Unlock()

return nil
}

func (s *server) refreshUTXOs() error {
refreshStart := time.Now()
// It's important to check the mempool before calling `GetUTXOsByAddresses`:
// If we would do it the other way around an output can be spent in the mempool
// and not in consensus, and between the calls its spending transaction will be
Expand All @@ -277,11 +284,21 @@ func (s *server) refreshUTXOs() error {
return err
}

return s.updateUTXOSet(getUTXOsByAddressesResponse.Entries, mempoolEntriesByAddresses.Entries)
return s.updateUTXOSet(getUTXOsByAddressesResponse.Entries, mempoolEntriesByAddresses.Entries, refreshStart)
}

func (s *server) forceSync() {
// Technically if two callers check the `if` simultaneously they will both spawn a
// goroutine, but we don't care about the small redundancy in such a rare case.
if len(s.forceSyncChan) == 0 {
go func() {
s.forceSyncChan <- struct{}{}
}()
}
}

func (s *server) isSynced() bool {
return s.nextSyncStartIndex > s.maxUsedIndex()
return s.nextSyncStartIndex > s.maxUsedIndex() && s.firstSyncDone.Load()
}

func (s *server) formatSyncStateReport() string {
Expand All @@ -291,8 +308,11 @@ func (s *server) formatSyncStateReport() string {
maxUsedIndex = s.nextSyncStartIndex
}

return fmt.Sprintf("scanned %d out of %d addresses (%.2f%%)",
s.nextSyncStartIndex, maxUsedIndex, float64(s.nextSyncStartIndex)*100.0/float64(maxUsedIndex))
if s.nextSyncStartIndex < s.maxUsedIndex() {
return fmt.Sprintf("scanned %d out of %d addresses (%.2f%%)",
s.nextSyncStartIndex, maxUsedIndex, float64(s.nextSyncStartIndex)*100.0/float64(maxUsedIndex))
}
return "loading the wallet UTXO set"
}

func (s *server) updateSyncingProgressLog(currProcessedAddresses, currMaxUsedAddresses uint32) {
Expand All @@ -311,7 +331,7 @@ func (s *server) updateSyncingProgressLog(currProcessedAddresses, currMaxUsedAdd

if s.maxProcessedAddressesForLog >= s.maxUsedAddressesForLog {
if !s.isLogFinalProgressLineShown {
log.Infof("Wallet is synced, ready for queries")
log.Infof("Finished scanning recent addresses")
s.isLogFinalProgressLineShown = true
}
} else {
Expand Down
Loading