Skip to content

Commit

Permalink
sweepbatcher: add option WithPublishErrorHandler
Browse files Browse the repository at this point in the history
WithPublishErrorHandler sets the callback used to handle publish errors.
It can be used to filter out noisy messages.
  • Loading branch information
starius committed Aug 27, 2024
1 parent c24d6e1 commit 4aa055d
Show file tree
Hide file tree
Showing 3 changed files with 281 additions and 97 deletions.
152 changes: 84 additions & 68 deletions sweepbatcher/sweep_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,11 @@ type batch struct {
// verifySchnorrSig is a function that verifies a schnorr signature.
verifySchnorrSig VerifySchnorrSig

// publishErrorHandler is a function that handles transaction publishing
// error. By default, it logs all errors as warnings, but "insufficient
// fee" as Info.
publishErrorHandler PublishErrorHandler

// purger is a function that can take a sweep which is being purged and
// hand it over to the batcher for further processing.
purger Purger
Expand All @@ -308,23 +313,24 @@ type Purger func(sweepReq *SweepRequest) error
// struct is only used as a wrapper for the arguments that are required to
// create a new batch.
type batchKit struct {
id int32
batchTxid *chainhash.Hash
batchPkScript []byte
state batchState
primaryID lntypes.Hash
sweeps map[lntypes.Hash]sweep
rbfCache rbfCache
returnChan chan SweepRequest
wallet lndclient.WalletKitClient
chainNotifier lndclient.ChainNotifierClient
signerClient lndclient.SignerClient
musig2SignSweep MuSig2SignSweep
verifySchnorrSig VerifySchnorrSig
purger Purger
store BatcherStore
log btclog.Logger
quit chan struct{}
id int32
batchTxid *chainhash.Hash
batchPkScript []byte
state batchState
primaryID lntypes.Hash
sweeps map[lntypes.Hash]sweep
rbfCache rbfCache
returnChan chan SweepRequest
wallet lndclient.WalletKitClient
chainNotifier lndclient.ChainNotifierClient
signerClient lndclient.SignerClient
musig2SignSweep MuSig2SignSweep
verifySchnorrSig VerifySchnorrSig
publishErrorHandler PublishErrorHandler
purger Purger
store BatcherStore
log btclog.Logger
quit chan struct{}
}

// scheduleNextCall schedules the next call to the batch handler's main event
Expand Down Expand Up @@ -353,28 +359,29 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
return &batch{
// We set the ID to a negative value to flag that this batch has
// never been persisted, so it needs to be assigned a new ID.
id: -1,
state: Open,
sweeps: make(map[lntypes.Hash]sweep),
blockEpochChan: make(chan int32),
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
stopping: make(chan struct{}),
finished: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
wallet: bk.wallet,
chainNotifier: bk.chainNotifier,
signerClient: bk.signerClient,
muSig2SignSweep: bk.musig2SignSweep,
verifySchnorrSig: bk.verifySchnorrSig,
purger: bk.purger,
store: bk.store,
cfg: &cfg,
id: -1,
state: Open,
sweeps: make(map[lntypes.Hash]sweep),
blockEpochChan: make(chan int32),
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
stopping: make(chan struct{}),
finished: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
wallet: bk.wallet,
chainNotifier: bk.chainNotifier,
signerClient: bk.signerClient,
muSig2SignSweep: bk.musig2SignSweep,
verifySchnorrSig: bk.verifySchnorrSig,
publishErrorHandler: bk.publishErrorHandler,
purger: bk.purger,
store: bk.store,
cfg: &cfg,
}
}

Expand All @@ -396,32 +403,33 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
}

return &batch{
id: bk.id,
state: bk.state,
primarySweepID: bk.primaryID,
sweeps: bk.sweeps,
blockEpochChan: make(chan int32),
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
stopping: make(chan struct{}),
finished: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
batchPkScript: bk.batchPkScript,
rbfCache: bk.rbfCache,
wallet: bk.wallet,
chainNotifier: bk.chainNotifier,
signerClient: bk.signerClient,
muSig2SignSweep: bk.musig2SignSweep,
verifySchnorrSig: bk.verifySchnorrSig,
purger: bk.purger,
store: bk.store,
log: bk.log,
cfg: &cfg,
id: bk.id,
state: bk.state,
primarySweepID: bk.primaryID,
sweeps: bk.sweeps,
blockEpochChan: make(chan int32),
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
stopping: make(chan struct{}),
finished: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
batchPkScript: bk.batchPkScript,
rbfCache: bk.rbfCache,
wallet: bk.wallet,
chainNotifier: bk.chainNotifier,
signerClient: bk.signerClient,
muSig2SignSweep: bk.musig2SignSweep,
verifySchnorrSig: bk.verifySchnorrSig,
publishErrorHandler: bk.publishErrorHandler,
purger: bk.purger,
store: bk.store,
log: bk.log,
cfg: &cfg,
}, nil
}

Expand Down Expand Up @@ -795,23 +803,31 @@ func (b *batch) publish(ctx context.Context) error {
return err
}

// logPublishError is a function which logs publish errors.
logPublishError := func(errMsg string, err error) {
b.publishErrorHandler(err, errMsg, b.log)
}

if b.cfg.mixedBatch {
fee, err, signSuccess = b.publishMixedBatch(ctx)
if err != nil {
b.log.Warnf("Mixed batch publish error: %v", err)
logPublishError("mixed batch publish error", err)
}
} else {
fee, err, signSuccess = b.publishBatchCoop(ctx)
if err != nil {
b.log.Warnf("co-op publish error: %v", err)
logPublishError("co-op publish error", err)
}
}

if !signSuccess {
fee, err = b.publishBatch(ctx)
if err != nil {
logPublishError("non-coop publish error", err)
}
}

if err != nil {
b.log.Warnf("publish error: %v", err)
return nil
}

Expand Down
105 changes: 76 additions & 29 deletions sweepbatcher/sweep_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"

Expand All @@ -12,6 +13,8 @@ import (
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btclog"
"github.com/btcsuite/btcwallet/chain"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop/labels"
"github.com/lightninglabs/loop/loopdb"
Expand Down Expand Up @@ -159,6 +162,25 @@ type VerifySchnorrSig func(pubKey *btcec.PublicKey, hash, sig []byte) error
type FeeRateProvider func(ctx context.Context,
swapHash lntypes.Hash) (chainfee.SatPerKWeight, error)

// PublishErrorHandler is a function that handles transaction publishing error.
type PublishErrorHandler func(err error, errMsg string, log btclog.Logger)

// defaultPublishErrorLogger is an instance of PublishErrorHandler which logs
// all errors as warnings, but "insufficient fee" as info (since they are
// expected, if RBF fails).
func defaultPublishErrorLogger(err error, errMsg string, log btclog.Logger) {
// Check if the error is "insufficient fee" error.
if strings.Contains(err.Error(), chain.ErrInsufficientFee.Error()) {
// Log "insufficient fee" with level Info.
log.Infof("%s: %v", errMsg, err)

return
}

// Log any other error as a warning.
log.Warnf("%s: %v", errMsg, err)
}

// SweepRequest is a request to sweep a specific outpoint.
type SweepRequest struct {
// SwapHash is the hash of the swap that is being swept.
Expand Down Expand Up @@ -296,6 +318,11 @@ type Batcher struct {
// expensive) way. If the whole procedure fails for whatever reason, the
// batch is signed non-cooperatively (the fallback).
mixedBatch bool

// publishErrorHandler is a function that handles transaction publishing
// error. By default, it logs all errors as warnings, but "insufficient
// fee" as Info.
publishErrorHandler PublishErrorHandler
}

// BatcherConfig holds batcher configuration.
Expand Down Expand Up @@ -341,6 +368,11 @@ type BatcherConfig struct {
// expensive) way. If the whole procedure fails for whatever reason, the
// batch is signed non-cooperatively (the fallback).
mixedBatch bool

// publishErrorHandler is a function that handles transaction publishing
// error. By default, it logs all errors as warnings, but "insufficient
// fee" as Info.
publishErrorHandler PublishErrorHandler
}

// BatcherOption configures batcher behaviour.
Expand Down Expand Up @@ -420,6 +452,14 @@ func WithMixedBatch() BatcherOption {
}
}

// WithPublishErrorHandler sets the callback used to handle publish errors.
// It can be used to filter out noisy messages.
func WithPublishErrorHandler(handler PublishErrorHandler) BatcherOption {
return func(cfg *BatcherConfig) {
cfg.publishErrorHandler = handler
}
}

// NewBatcher creates a new Batcher instance.
func NewBatcher(wallet lndclient.WalletKitClient,
chainNotifier lndclient.ChainNotifierClient,
Expand All @@ -432,6 +472,11 @@ func NewBatcher(wallet lndclient.WalletKitClient,
// By default, loop/labels.LoopOutBatchSweepSuccess is used
// to label sweep transactions.
txLabeler: labels.LoopOutBatchSweepSuccess,

// publishErrorHandler is a function that handles transaction
// publishing error. By default, it logs all errors as warnings,
// but "insufficient fee" as Info.
publishErrorHandler: defaultPublishErrorLogger,
}
for _, opt := range opts {
opt(&cfg)
Expand All @@ -448,26 +493,27 @@ func NewBatcher(wallet lndclient.WalletKitClient,
}

return &Batcher{
batches: make(map[int32]*batch),
sweepReqs: make(chan SweepRequest),
errChan: make(chan error, 1),
quit: make(chan struct{}),
initDone: make(chan struct{}),
wallet: wallet,
chainNotifier: chainNotifier,
signerClient: signerClient,
musig2ServerSign: musig2ServerSigner,
VerifySchnorrSig: verifySchnorrSig,
chainParams: chainparams,
store: store,
sweepStore: sweepStore,
clock: cfg.clock,
initialDelay: cfg.initialDelay,
publishDelay: cfg.publishDelay,
customFeeRate: cfg.customFeeRate,
txLabeler: cfg.txLabeler,
customMuSig2Signer: cfg.customMuSig2Signer,
mixedBatch: cfg.mixedBatch,
batches: make(map[int32]*batch),
sweepReqs: make(chan SweepRequest),
errChan: make(chan error, 1),
quit: make(chan struct{}),
initDone: make(chan struct{}),
wallet: wallet,
chainNotifier: chainNotifier,
signerClient: signerClient,
musig2ServerSign: musig2ServerSigner,
VerifySchnorrSig: verifySchnorrSig,
chainParams: chainparams,
store: store,
sweepStore: sweepStore,
clock: cfg.clock,
initialDelay: cfg.initialDelay,
publishDelay: cfg.publishDelay,
customFeeRate: cfg.customFeeRate,
txLabeler: cfg.txLabeler,
customMuSig2Signer: cfg.customMuSig2Signer,
mixedBatch: cfg.mixedBatch,
publishErrorHandler: cfg.publishErrorHandler,
}
}

Expand Down Expand Up @@ -1092,14 +1138,15 @@ func (b *Batcher) newBatchConfig(maxTimeoutDistance int32) batchConfig {
// newBatchKit creates new batch kit.
func (b *Batcher) newBatchKit() batchKit {
return batchKit{
returnChan: b.sweepReqs,
wallet: b.wallet,
chainNotifier: b.chainNotifier,
signerClient: b.signerClient,
musig2SignSweep: b.musig2ServerSign,
verifySchnorrSig: b.VerifySchnorrSig,
purger: b.AddSweep,
store: b.store,
quit: b.quit,
returnChan: b.sweepReqs,
wallet: b.wallet,
chainNotifier: b.chainNotifier,
signerClient: b.signerClient,
musig2SignSweep: b.musig2ServerSign,
verifySchnorrSig: b.VerifySchnorrSig,
publishErrorHandler: b.publishErrorHandler,
purger: b.AddSweep,
store: b.store,
quit: b.quit,
}
}
Loading

0 comments on commit 4aa055d

Please sign in to comment.