Skip to content
This repository has been archived by the owner on Mar 2, 2023. It is now read-only.

Commit

Permalink
Fix lookahead bug
Browse files Browse the repository at this point in the history
The accounter should only schedule additional addresses after checking
each transaction's height.
  • Loading branch information
alokmenghrajani committed Oct 18, 2018
1 parent e96d581 commit f7976dc
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 87 deletions.
78 changes: 44 additions & 34 deletions accounter/accounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package accounter

import (
"encoding/hex"
"fmt"
"log"
"sync"
"time"
Expand All @@ -28,8 +27,10 @@ type Accounter struct {
xpubs []string
blockHeight uint32 // height at which we want to compute the balance

addresses map[string]address // map of address script => (Address, txHashes)
transactions map[string]transaction // map of txhash => transaction
addresses map[string]address // map of address script => (Address, txHashes)
txAddressesMu sync.Mutex
txAddresses map[string][]*deriver.Address // map of txhash => []Address
transactions map[string]transaction // map of txhash => transaction

backend backend.Backend
deriver *deriver.AddressDeriver
Expand Down Expand Up @@ -71,20 +72,19 @@ type vout struct {
}

// New instantiates a new Accounter.
// TODO: find a better way to pass options to the NewCounter. Maybe thru a config or functional option params?
func New(b backend.Backend, addressDeriver *deriver.AddressDeriver, lookahead uint32, blockHeight uint32) *Accounter {
a := &Accounter{
return &Accounter{
blockHeight: blockHeight,
backend: b,
deriver: addressDeriver,
lookahead: lookahead,
lastAddresses: [2]uint32{lookahead, lookahead},
addresses: make(map[string]address),
txAddresses: make(map[string][]*deriver.Address),
transactions: make(map[string]transaction),
addrResponses: b.AddrResponses(),
txResponses: b.TxResponses(),
}
a.addresses = make(map[string]address)
a.transactions = make(map[string]transaction)
a.addrResponses = b.AddrResponses()
a.txResponses = b.TxResponses()
return a
}

func (a *Accounter) ComputeBalance() uint64 {
Expand Down Expand Up @@ -112,33 +112,17 @@ func (a *Accounter) fetchTransactions() {
}

func (a *Accounter) processTransactions() {
for hash, tx := range a.transactions {
// remove transactions which are too recent
if tx.height > int64(a.blockHeight) {
reporter.GetInstance().Logf("transaction %s has height %d > BLOCK HEIGHT (%d)", hash, tx.height, a.blockHeight)
delete(a.transactions, hash)
}
// remove transactions which haven't been mined
if tx.height <= 0 {
reporter.GetInstance().Logf("transaction %s has not been mined, yet (height=%d)", hash, tx.height)
delete(a.transactions, hash)
}
}
reporter.GetInstance().SetTxAfterFilter(int32(len(a.transactions)))
reporter.GetInstance().Log("done filtering")

// TODO: we could check that scheduled == fetched in the metrics we track in reporter.

// parse the transaction hex
for hash, tx := range a.transactions {
b, err := hex.DecodeString(tx.hex)
if err != nil {
fmt.Printf("failed to unhex transaction %s: %s", hash, tx.hex)
log.Panicf("failed to unhex transaction %s: %s", hash, tx.hex)
}
parsedTx, err := btcutil.NewTxFromBytes(b)
if err != nil {
fmt.Printf("failed to parse transaction %s: %s", hash, tx.hex)
continue
log.Panicf("failed to parse transaction %s: %s", hash, tx.hex)
}
for _, txin := range parsedTx.MsgTx().TxIn {
tx.vin = append(tx.vin, vin{
Expand Down Expand Up @@ -234,7 +218,10 @@ func (a *Accounter) sendWork() {
indexes[change]++
}
}
// apparently no more work for us, so we can sleep a bit
// apparently no more work for now.

// TODO: we should either merge sendWork/recvWork or use some kind of mutex to sleep exactly
// until there's more work that needs to be done. For now, a simple sleep works.
time.Sleep(time.Millisecond * 100)
}
}
Expand All @@ -251,6 +238,7 @@ func (a *Accounter) recvWork() {
continue
}
reporter.GetInstance().IncAddressesFetched()
reporter.GetInstance().Logf("received address: %s", resp.Address)

a.countMu.Lock()
a.processedAddrCount++
Expand All @@ -270,13 +258,15 @@ func (a *Accounter) recvWork() {
}
a.countMu.Unlock()

// we can only update the lastAddresses after we filter the transaction heights
a.txAddressesMu.Lock()
for _, txHash := range resp.TxHashes {
a.txAddresses[txHash] = append(a.txAddresses[txHash], resp.Address)
}
a.txAddressesMu.Unlock()

reporter.GetInstance().Logf("address %s has %d transactions", resp.Address, len(resp.TxHashes))

if resp.HasTransactions() {
a.countMu.Lock()
a.lastAddresses[resp.Address.Change()] = Max(a.lastAddresses[resp.Address.Change()], resp.Address.Index()+a.lookahead)
a.countMu.Unlock()
}
case resp, ok := <-txResponses:
// channel is closed now, so ignore this case by blocking forever
if !ok {
Expand All @@ -285,18 +275,38 @@ func (a *Accounter) recvWork() {
}

reporter.GetInstance().IncTxFetched()
reporter.GetInstance().Logf("received tx: %s", resp.Hash)

a.countMu.Lock()
a.processedTxCount++
a.countMu.Unlock()

if resp.Height > int64(a.blockHeight) {
continue
}
if resp.Height == 0 {
continue
}
if resp.Height < 0 {
log.Panicf("tx %s has negative height %d", resp.Hash, resp.Height)
}

tx := transaction{
height: resp.Height,
hex: resp.Hex,
vin: []vin{},
vout: []vout{},
}
a.transactions[resp.Hash] = tx

a.txAddressesMu.Lock()
a.countMu.Lock()
for _, addr := range a.txAddresses[resp.Hash] {
a.lastAddresses[addr.Change()] = Max(a.lastAddresses[addr.Change()], addr.Index()+a.lookahead)
}
a.countMu.Unlock()
a.txAddressesMu.Unlock()

case <-time.Tick(1 * time.Second):
if a.complete() {
return
Expand Down
4 changes: 3 additions & 1 deletion backend/electrum/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,9 @@ func (n *Node) BlockchainAddressGetHistory(address string) ([]*Transaction, erro
// https://electrumx.readthedocs.io/en/latest/protocol-methods.html#blockchain-transaction-get
func (n *Node) BlockchainTransactionGet(txid string) (string, error) {
var hex string
err := n.request("blockchain.transaction.get", []interface{}{txid, false}, &hex)
// some servers don't handle the second parameter (even though they advertise version 1.2)
// so we leave it out.
err := n.request("blockchain.transaction.get", []interface{}{txid}, &hex)
return hex, err
}

Expand Down
15 changes: 8 additions & 7 deletions backend/electrum_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func NewElectrumBackend(addr, port string, network utils.Network) (*ElectrumBack

// Connect to a node and handle requests
if err := eb.addNode(addr, port, network); err != nil {
fmt.Printf("failed to connect to initial node: %+v", err)
log.Printf("failed to connect to initial node: %+v", err)
return nil, err
}

Expand Down Expand Up @@ -181,6 +181,7 @@ func (eb *ElectrumBackend) ChainHeight() uint32 {
func (eb *ElectrumBackend) addNode(addr, port string, network utils.Network) error {
ident := electrum.NodeIdent(addr, port)

// note: this code contains a TOCTOU bug. We risk connecting to the same node multiple times.
eb.nodeMu.RLock()
_, existsGood := eb.nodes[ident]
_, existsBad := eb.blacklistedNodes[ident]
Expand Down Expand Up @@ -443,7 +444,7 @@ func (eb *ElectrumBackend) cacheTxs(txs []*electrum.Transaction) {
for _, tx := range txs {
height, exists := eb.transactions[tx.Hash]
if exists && (height != int64(tx.Height)) {
log.Panicf("inconsistent cache: %s %d != %d", tx.Hash, height, tx.Height)
log.Panicf("inconsistent transactions cache: %s %d != %d", tx.Hash, height, tx.Height)
}
eb.transactions[tx.Hash] = int64(tx.Height)
}
Expand Down Expand Up @@ -495,19 +496,19 @@ func (eb *ElectrumBackend) findPeers() {

func (eb *ElectrumBackend) addPeer(peer electrum.Peer) {
if strings.HasSuffix(peer.Host, ".onion") {
log.Printf("skipping %s because of .onion\n", peer.Host)
log.Printf("skipping %s because of .onion", peer.Host)
return
}
err := checkVersion(peer.Version)
if err != nil {
log.Printf("skipping %s because of protocol version %s\n", peer.Host, peer.Version)
log.Printf("skipping %s because of protocol version %s", peer.Host, peer.Version)
return
}
for _, feature := range peer.Features {
if strings.HasPrefix(feature, "t") {
go func(addr, feature string, network utils.Network) {
if err := eb.addNode(addr, feature, network); err != nil {
log.Printf("error on addNode: %+v\n", err)
log.Printf("error on addNode: %+v", err)
}
}(peer.IP, feature, eb.network)
return
Expand All @@ -517,11 +518,11 @@ func (eb *ElectrumBackend) addPeer(peer electrum.Peer) {
if strings.HasPrefix(feature, "s") {
go func(addr, feature string, network utils.Network) {
if err := eb.addNode(addr, feature, network); err != nil {
log.Printf("error on addNode: %+v\n", err)
log.Printf("error on addNode: %+v", err)
}
}(peer.IP, feature, eb.network)
return
}
}
log.Printf("skipping %s because of feature mismatch: %+v\n", peer, peer.Features)
log.Printf("skipping %s because of feature mismatch: %+v", peer, peer.Features)
}
17 changes: 8 additions & 9 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ func doKeytree() {
// Check that all the addresses have the same prefix
for i := 1; i < *keytreeN; i++ {
if xpubs[0][0:4] != xpubs[i][0:4] {
fmt.Printf("Prefixes must match: %s %s\n", xpubs[0], xpubs[i])
return
log.Panicf("Prefixes must match: %s %s", xpubs[0], xpubs[i])
}
}

Expand Down Expand Up @@ -144,8 +143,7 @@ func doFindAddr() {
// Check that all the addresses have the same prefix
for i := 1; i < *findAddrN; i++ {
if xpubs[0][0:4] != xpubs[i][0:4] {
fmt.Printf("Prefixes must match: %s %s\n", xpubs[0], xpubs[i])
return
log.Panicf("Prefixes must match: %s %s", xpubs[0], xpubs[i])
}
}
network := XpubToNetwork(xpubs[0])
Expand All @@ -164,7 +162,7 @@ func doFindAddr() {
}
}
}
fmt.Printf("not found\n")
log.Panic("not found")
}

func doFindBlock() {
Expand Down Expand Up @@ -207,6 +205,7 @@ func doComputeBalance() {
if *computeBalanceType == "single-address" {
fmt.Printf("Enter single address:\n")
singleAddress, _ = reader.ReadString('\n')
singleAddress = strings.TrimSpace(singleAddress)
network = AddressToNetwork(singleAddress)
} else {
for i := 0; i < *computeBalanceN; i++ {
Expand All @@ -229,12 +228,12 @@ func doComputeBalance() {
backend, err := computeBalanceBuildBackend(network)
PanicOnError(err)

// If blockHeight is 0, we default to current height - 6.
// If blockHeight is 0, we default to current height - 5.
if *computeBalanceBlockHeight == 0 {
*computeBalanceBlockHeight = backend.ChainHeight() - minConfirmations
*computeBalanceBlockHeight = backend.ChainHeight() - minConfirmations + 1
}
if *computeBalanceBlockHeight > backend.ChainHeight()-minConfirmations {
log.Panicf("blockHeight %d is too high (> %d - %d)", *computeBalanceBlockHeight, backend.ChainHeight(), minConfirmations)
if *computeBalanceBlockHeight > backend.ChainHeight()-minConfirmations+1 {
log.Panicf("blockHeight %d is too high (> %d - %d + 1)", *computeBalanceBlockHeight, backend.ChainHeight(), minConfirmations)
}
fmt.Printf("Going to compute balance at %d\n", *computeBalanceBlockHeight)

Expand Down
37 changes: 2 additions & 35 deletions reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ type Reporter struct {
addressesFetched uint32
txScheduled uint32
txFetched uint32
txAfterFilter int32
peers int32
}

Expand All @@ -29,8 +28,8 @@ func GetInstance() *Reporter {
}

func (r *Reporter) Log(msg string) {
fmt.Printf("%d/%d %d/%d/%d %d: %s\n", r.GetAddressesScheduled(), r.GetAddressesFetched(),
r.GetTxScheduled(), r.GetTxFetched(), r.GetTxAfterFilter(), r.GetPeers(), msg)
fmt.Printf("%d/%d %d/%d %d: %s\n", r.GetAddressesScheduled(), r.GetAddressesFetched(),
r.GetTxScheduled(), r.GetTxFetched(), r.GetPeers(), msg)
}

func (r *Reporter) Logf(format string, args ...interface{}) {
Expand All @@ -45,10 +44,6 @@ func (r *Reporter) GetAddressesFetched() uint32 {
return atomic.LoadUint32(&r.addressesFetched)
}

func (r *Reporter) SetAddressesFetched(n uint32) {
atomic.StoreUint32(&r.addressesFetched, n)
}

func (r *Reporter) IncAddressesScheduled() {
atomic.AddUint32(&r.addressesScheduled, 1)
}
Expand All @@ -57,10 +52,6 @@ func (r *Reporter) GetAddressesScheduled() uint32 {
return atomic.LoadUint32(&r.addressesScheduled)
}

func (r *Reporter) SetddressesScheduled(n uint32) {
atomic.StoreUint32(&r.addressesScheduled, n)
}

func (r *Reporter) IncTxFetched() {
atomic.AddUint32(&r.txFetched, 1)
}
Expand All @@ -69,10 +60,6 @@ func (r *Reporter) GetTxFetched() uint32 {
return atomic.LoadUint32(&r.txFetched)
}

func (r *Reporter) SetTxFetched(n uint32) {
atomic.StoreUint32(&r.txFetched, n)
}

func (r *Reporter) IncTxScheduled() {
atomic.AddUint32(&r.txScheduled, 1)
}
Expand All @@ -81,26 +68,6 @@ func (r *Reporter) GetTxScheduled() uint32 {
return atomic.LoadUint32(&r.txScheduled)
}

func (r *Reporter) SetTxScheduled(n uint32) {
atomic.StoreUint32(&r.txScheduled, n)
}

func (r *Reporter) IncTxAfterFilter() {
atomic.AddInt32(&r.txAfterFilter, 1)
}

func (r *Reporter) GetTxAfterFilter() int32 {
return atomic.LoadInt32(&r.txAfterFilter)
}

func (r *Reporter) SetTxAfterFilter(n int32) {
atomic.StoreInt32(&r.txAfterFilter, n)
}

func (r *Reporter) IncPeers() {
atomic.AddInt32(&r.peers, 1)
}

func (r *Reporter) GetPeers() int32 {
return atomic.LoadInt32(&r.peers)
}
Expand Down
2 changes: 1 addition & 1 deletion utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func AddressToNetwork(addr string) Network {
case 'n':
return Testnet // pubkey hash
case '2':
return Testnet //script hash
return Testnet // script hash
case '1':
return Mainnet // pubkey hash
case '3':
Expand Down

0 comments on commit f7976dc

Please sign in to comment.