diff --git a/go.mod b/go.mod index e77306653..8dbe08191 100644 --- a/go.mod +++ b/go.mod @@ -26,7 +26,7 @@ require ( github.com/decred/dcrd/gcs/v4 v4.1.0 github.com/decred/dcrd/lru v1.1.2 github.com/decred/dcrd/math/uint256 v1.0.2 - github.com/decred/dcrd/mixing v0.4.0 + github.com/decred/dcrd/mixing v0.4.1 github.com/decred/dcrd/peer/v3 v3.1.2 github.com/decred/dcrd/rpc/jsonrpc/types/v4 v4.3.0 github.com/decred/dcrd/rpcclient/v8 v8.0.1 diff --git a/go.sum b/go.sum index 6ae686ae2..6d2fcd386 100644 --- a/go.sum +++ b/go.sum @@ -58,8 +58,8 @@ github.com/decred/dcrd/lru v1.1.2 h1:KdCzlkxppuoIDGEvCGah1fZRicrDH36IipvlB1ROkFY github.com/decred/dcrd/lru v1.1.2/go.mod h1:gEdCVgXs1/YoBvFWt7Scgknbhwik3FgVSzlnCcXL2N8= github.com/decred/dcrd/math/uint256 v1.0.2 h1:o8peafL5QmuXGTergI3YDpDU0eq5Z0pQi88B8ym4PRA= github.com/decred/dcrd/math/uint256 v1.0.2/go.mod h1:7M/y9wJJvlyNG/f/X6mxxhxo9dgloZHFiOfbiscl75A= -github.com/decred/dcrd/mixing v0.4.0 h1:XblHAND4Vt5owVUvjPorDg30eWT53DpCZs6VF7U1t6U= -github.com/decred/dcrd/mixing v0.4.0/go.mod h1:ySvVwTZyVz5YvevA6YjPrB6pJEwTm7IkHohTfaiHh2c= +github.com/decred/dcrd/mixing v0.4.1 h1:W8ZCzhmNyzG1xjJMA3L6FOElmp98Ttnk3dDUxD6irAE= +github.com/decred/dcrd/mixing v0.4.1/go.mod h1:ySvVwTZyVz5YvevA6YjPrB6pJEwTm7IkHohTfaiHh2c= github.com/decred/dcrd/peer/v3 v3.1.2 h1:Qe7SpqDtfM0HARmDYwr4WjUu16X6HQ7ZWNnHqE1swiw= github.com/decred/dcrd/peer/v3 v3.1.2/go.mod h1:M9FxNkHuEBtsRW5gwzIH4cJTWk5xSkxy9zG+TEL1N2Y= github.com/decred/dcrd/rpc/jsonrpc/types/v4 v4.3.0 h1:l0DnCcILTNrpy8APF3FLN312ChpkQaAuW30aC/RgBaw= diff --git a/mixing/field.go b/mixing/field.go index 6ff96b9f9..89d89bdcb 100644 --- a/mixing/field.go +++ b/mixing/field.go @@ -1,3 +1,7 @@ +// Copyright (c) 2019-2024 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + package mixing import ( diff --git a/mixing/flags.go b/mixing/flags.go index 07ae122eb..17c05a8be 100644 --- a/mixing/flags.go +++ b/mixing/flags.go @@ -1,3 +1,7 @@ +// Copyright (c) 2024 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + package mixing const ( diff --git a/mixing/keyagreement.go b/mixing/keyagreement.go index 3391353ce..daf96a0d5 100644 --- a/mixing/keyagreement.go +++ b/mixing/keyagreement.go @@ -18,7 +18,7 @@ import ( "github.com/decred/dcrd/wire" ) -// Aliases for sntrup4591761 types +// Aliases for sntrup4591761 types. type ( PQPublicKey = [sntrup4591761.PublicKeySize]byte PQPrivateKey = [sntrup4591761.PrivateKeySize]byte diff --git a/mixing/mixclient/blame.go b/mixing/mixclient/blame.go index c0dbc186f..ccecc2352 100644 --- a/mixing/mixclient/blame.go +++ b/mixing/mixclient/blame.go @@ -82,7 +82,7 @@ func (c *Client) blame(ctx context.Context, sesRun *sessionRun) (err error) { rcv.Sid = sesRun.sid rcv.RSs = make([]*wire.MsgMixSecrets, 0, 1) _ = mp.Receive(ctx, rcv) - rsHashes := make([]chainhash.Hash, len(rcv.RSs)) + rsHashes := make([]chainhash.Hash, 0, len(rcv.RSs)) for _, rs := range rcv.RSs { rsHashes = append(rsHashes, rs.Hash()) } diff --git a/mixing/mixclient/client.go b/mixing/mixclient/client.go index e136a98e0..ff6fac9f8 100644 --- a/mixing/mixclient/client.go +++ b/mixing/mixclient/client.go @@ -256,11 +256,6 @@ type sessionRun struct { cj *CoinJoin } -type queueMsg struct { - message mixing.Message - res chan error -} - type queueWork struct { p *peer f func(p *peer) error @@ -278,9 +273,8 @@ type Client struct { height uint32 mu sync.Mutex - warming chan struct{} - submitQueue chan *queueMsg - workQueue chan *queueWork + warming chan struct{} + workQueue chan *queueWork pairingWG sync.WaitGroup @@ -310,7 +304,6 @@ func NewClient(w Wallet) *Client { mixpool: w.Mixpool(), pairings: make(map[string]*pairedSessions), warming: make(chan struct{}), - submitQueue: make(chan *queueMsg, 200), workQueue: make(chan *queueWork, runtime.NumCPU()), blake256Hasher: blake256.New(), epoch: w.Mixpool().Epoch(), @@ -379,9 +372,6 @@ func (c *Client) Run(ctx context.Context) error { g.Go(func() error { return c.epochTicker(ctx) }) - g.Go(func() error { - return c.handleSubmitQueue(ctx) - }) for i := 0; i < runtime.NumCPU(); i++ { g.Go(func() error { return c.peerWorker(ctx) @@ -572,21 +562,7 @@ func (p *peer) signAndHash(m mixing.Message) error { } func (p *peer) submit(m mixing.Message) error { - qmsg := &queueMsg{ - message: m, - res: make(chan error, 1), - } - select { - case <-p.ctx.Done(): - return p.ctx.Err() - case p.client.submitQueue <- qmsg: - } - select { - case <-p.ctx.Done(): - return p.ctx.Err() - case err := <-qmsg.res: - return err - } + return p.client.wallet.SubmitMixMessage(p.ctx, m) } func (p *peer) signAndSubmit(m mixing.Message) error { @@ -713,18 +689,6 @@ func (c *Client) epochTicker(ctx context.Context) error { } } -func (c *Client) handleSubmitQueue(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - case qmsg := <-c.submitQueue: - err := c.wallet.SubmitMixMessage(ctx, qmsg.message) - qmsg.res <- err - } - } -} - // Dicemix performs a new mixing session for a coinjoin mix transaction. func (c *Client) Dicemix(ctx context.Context, cj *CoinJoin) error { select { @@ -780,8 +744,11 @@ func (c *Client) Dicemix(ctx context.Context, cj *CoinJoin) error { c.pairings[string(pairingID)] = pairing } pairing.localPeers[*p.id] = p + c.mu.Unlock() + err = p.submit(pr) if err != nil { + c.mu.Lock() delete(pairing.localPeers, *p.id) if len(pairing.localPeers) == 0 { delete(c.pairings, string(pairingID)) @@ -789,7 +756,6 @@ func (c *Client) Dicemix(ctx context.Context, cj *CoinJoin) error { c.mu.Unlock() return err } - c.mu.Unlock() select { case res := <-p.res: @@ -1420,7 +1386,6 @@ func (c *Client) run(ctx context.Context, ps *pairedSessions, madePairing *bool) Ciphertexts: make([]mixing.PQCiphertext, 0, len(prs)), MyIndex: p.myVk, } - ctIds := make([]identity, 0, len(cts)) for _, ct := range cts { if len(ct.Ciphertexts) != len(prs) { // Everyone sees this, can rerun without full blame now. @@ -1430,7 +1395,6 @@ func (c *Client) run(ctx context.Context, ps *pairedSessions, madePairing *bool) return nil } revealed.Ciphertexts = append(revealed.Ciphertexts, ct.Ciphertexts[p.myVk]) - ctIds = append(ctIds, ct.Identity) } // Derive shared secret keys @@ -1944,17 +1908,17 @@ func (c *Client) alternateSession(pairing []byte, prs []*wire.MsgMixPairReq, d * kes := c.mixpool.ReceiveKEsByPairing(pairing, unixEpoch) // Sort KEs by identity first (just to group these together) followed - // by the total referenced PR counts in decreasing order. - // When ranging over KEs below, this will allow us to consider the - // order in which other peers created their KEs, and how they are - // forming their sessions. + // by the total referenced PR counts in increasing order (most recent + // KEs first). When ranging over KEs below, this will allow us to + // consider the order in which other peers created their KEs, and how + // they are forming their sessions. sort.Slice(kes, func(i, j int) bool { a := kes[i] b := kes[j] if bytes.Compare(a.Identity[:], b.Identity[:]) == -1 { return true } - if len(a.SeenPRs) > len(b.SeenPRs) { + if len(a.SeenPRs) < len(b.SeenPRs) { return true } return false @@ -1967,7 +1931,7 @@ func (c *Client) alternateSession(pairing []byte, prs []*wire.MsgMixPairReq, d * prHashByIdentity[pr.Identity] = pr.Hash() } - // Only one KE per peer identity (the KE that references the most PR + // Only one KE per peer identity (the KE that references the least PR // hashes) is used for determining session agreement. type peerMsgs struct { pr *wire.MsgMixPairReq diff --git a/mixing/mixclient/errors.go b/mixing/mixclient/errors.go index 20fe58d50..2a72e96a1 100644 --- a/mixing/mixclient/errors.go +++ b/mixing/mixclient/errors.go @@ -1,3 +1,7 @@ +// Copyright (c) 2024 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + package mixclient import "errors" diff --git a/mixing/mixclient/limits.go b/mixing/mixclient/limits.go index e35d4296d..c996eebcd 100644 --- a/mixing/mixclient/limits.go +++ b/mixing/mixclient/limits.go @@ -1,3 +1,7 @@ +// Copyright (c) 2023-2024 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + package mixclient import ( @@ -15,7 +19,8 @@ const ( var estimatedRedeemP2PKHv0InputSize = estimateInputSize(redeemP2PKHv0SigScriptSize) -// estimateInputSize returns the worst case serialize size estimate for a tx input +// estimateInputSize returns the worst case serialize size estimate for a tx +// input. func estimateInputSize(scriptSize int) int { return 32 + // previous tx 4 + // output index @@ -28,7 +33,8 @@ func estimateInputSize(scriptSize int) int { 4 // sequence } -// estimateOutputSize returns the worst case serialize size estimate for a tx output +// estimateOutputSize returns the worst case serialize size estimate for a tx +// output. func estimateOutputSize(scriptSize int) int { return 8 + // previous tx 2 + // version diff --git a/mixing/mixpool/mixpool.go b/mixing/mixpool/mixpool.go index a1c4dd985..335dbb938 100644 --- a/mixing/mixpool/mixpool.go +++ b/mixing/mixpool/mixpool.go @@ -28,6 +28,7 @@ import ( const minconf = 1 const feeRate = 0.0001e8 +const earlyKEDuration = 5 * time.Second type idPubKey = [33]byte @@ -1066,10 +1067,7 @@ func (p *Pool) AcceptMessage(msg mixing.Message) (accepted []mixing.Message, err msgtype, &hash, sid)) } - err = p.acceptEntry(msg, msgtype, &hash, id, ses) - if err != nil { - return nil, err - } + p.acceptEntry(msg, msgtype, &hash, id, ses) return []mixing.Message{msg}, nil } @@ -1301,13 +1299,7 @@ func (p *Pool) reconsiderOrphans(accepted mixing.Message, id *idPubKey) []mixing continue } - err := p.acceptEntry(orphan, msgtype, &orphanHash, id, ses) - if err != nil { - log.Debugf("Orphan %v by identity %x could not be "+ - "processed after accepting KE %v", - orphanHash, id[:], ke.Hash()) - continue - } + p.acceptEntry(orphan, msgtype, &orphanHash, id, ses) acceptedOrphans = append(acceptedOrphans, orphan) acceptedMessages = append(acceptedMessages, orphan) @@ -1403,6 +1395,13 @@ func (p *Pool) checkAcceptKE(ke *wire.MsgMixKeyExchange) error { return ruleError(ErrPeerPositionOutOfBounds) } + now := time.Now() + keEpoch := time.Unix(int64(ke.Epoch), 0) + if now.Add(earlyKEDuration).Before(keEpoch) { + err := fmt.Errorf("KE received too early for stated epoch") + return ruleError(err) + } + return nil } @@ -1496,16 +1495,13 @@ func (p *Pool) acceptKE(ke *wire.MsgMixKeyExchange, hash *chainhash.Hash, id *id p.sessions[sid] = ses } - err = p.acceptEntry(ke, msgtypeKE, hash, id, ses) - if err != nil { - return nil, err - } + p.acceptEntry(ke, msgtypeKE, hash, id, ses) p.latestKE[*id] = ke return ke, nil } func (p *Pool) acceptEntry(msg mixing.Message, msgtype msgtype, hash *chainhash.Hash, - id *[33]byte, ses *session) error { + id *[33]byte, ses *session) { ses.hashes[*hash] = struct{}{} e := entry{ @@ -1524,8 +1520,6 @@ func (p *Pool) acceptEntry(msg mixing.Message, msgtype msgtype, hash *chainhash. ses.incrementCountFor(msgtype) ses.bc.signal() - - return nil } func confirmed(minConf, txHeight, curHeight int64) bool { @@ -1612,7 +1606,8 @@ func estimateP2PKHv0SerializeSize(inputs, outputs int, hasChange bool) int { txInsSize + txOutsSize + changeSize } -// estimateInputSize returns the worst case serialize size estimate for a tx input +// estimateInputSize returns the worst case serialize size estimate for a tx +// input. func estimateInputSize(scriptSize int) int { return 32 + // previous tx 4 + // output index @@ -1625,7 +1620,8 @@ func estimateInputSize(scriptSize int) int { 4 // sequence } -// estimateOutputSize returns the worst case serialize size estimate for a tx output +// estimateOutputSize returns the worst case serialize size estimate for a tx +// output. func estimateOutputSize(scriptSize int) int { return 8 + // previous tx 2 + // version diff --git a/mixing/mixpool/mixpool_test.go b/mixing/mixpool/mixpool_test.go index 1baa29ac1..bdfb5a814 100644 --- a/mixing/mixpool/mixpool_test.go +++ b/mixing/mixpool/mixpool_test.go @@ -14,6 +14,7 @@ import ( "math/big" "os" "testing" + "time" "decred.org/cspp/v2/solverrpc" "github.com/davecgh/go-spew/spew" @@ -255,7 +256,7 @@ func TestAccept(t *testing.T) { var ( seenPRs = []chainhash.Hash{pr.Hash()} - epoch uint64 = 0 + epoch uint64 = uint64(time.Now().Unix()) sid [32]byte = mixing.SortPRsForSession([]*wire.MsgMixPairReq{pr}, epoch) run uint32 = 0 pos uint32 = 0 diff --git a/mixing/mixpool/orphans_test.go b/mixing/mixpool/orphans_test.go index fb6bc0151..48ffd7917 100644 --- a/mixing/mixpool/orphans_test.go +++ b/mixing/mixpool/orphans_test.go @@ -8,6 +8,7 @@ import ( "errors" "reflect" "testing" + "time" "github.com/decred/dcrd/chaincfg/chainhash" "github.com/decred/dcrd/chaincfg/v3" @@ -65,7 +66,7 @@ func TestOrphans(t *testing.T) { pr.WriteHash(h) prs := []*wire.MsgMixPairReq{pr} - epoch := uint64(1704067200) + epoch := uint64(time.Now().Unix()) sid := mixing.SortPRsForSession(prs, epoch) ke := &wire.MsgMixKeyExchange{ Identity: id,