Skip to content

Commit

Permalink
sweepbatcher: add option WithPublishErrorHandler
Browse files Browse the repository at this point in the history
WithPublishErrorHandler sets the callback used to handle publish errors.
It can be used to filter out noisy messages.
  • Loading branch information
starius committed Aug 27, 2024
1 parent 898efbf commit 7701cf9
Show file tree
Hide file tree
Showing 3 changed files with 265 additions and 97 deletions.
151 changes: 83 additions & 68 deletions sweepbatcher/sweep_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ type batch struct {
// verifySchnorrSig is a function that verifies a schnorr signature.
verifySchnorrSig VerifySchnorrSig

// publishErrorHandler is a function that handles transaction publishing
// error. By default, it logs all errors as warnings.
publishErrorHandler PublishErrorHandler

// purger is a function that can take a sweep which is being purged and
// hand it over to the batcher for further processing.
purger Purger
Expand All @@ -308,23 +312,24 @@ type Purger func(sweepReq *SweepRequest) error
// struct is only used as a wrapper for the arguments that are required to
// create a new batch.
type batchKit struct {
id int32
batchTxid *chainhash.Hash
batchPkScript []byte
state batchState
primaryID lntypes.Hash
sweeps map[lntypes.Hash]sweep
rbfCache rbfCache
returnChan chan SweepRequest
wallet lndclient.WalletKitClient
chainNotifier lndclient.ChainNotifierClient
signerClient lndclient.SignerClient
musig2SignSweep MuSig2SignSweep
verifySchnorrSig VerifySchnorrSig
purger Purger
store BatcherStore
log btclog.Logger
quit chan struct{}
id int32
batchTxid *chainhash.Hash
batchPkScript []byte
state batchState
primaryID lntypes.Hash
sweeps map[lntypes.Hash]sweep
rbfCache rbfCache
returnChan chan SweepRequest
wallet lndclient.WalletKitClient
chainNotifier lndclient.ChainNotifierClient
signerClient lndclient.SignerClient
musig2SignSweep MuSig2SignSweep
verifySchnorrSig VerifySchnorrSig
publishErrorHandler PublishErrorHandler
purger Purger
store BatcherStore
log btclog.Logger
quit chan struct{}
}

// scheduleNextCall schedules the next call to the batch handler's main event
Expand Down Expand Up @@ -353,28 +358,29 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
return &batch{
// We set the ID to a negative value to flag that this batch has
// never been persisted, so it needs to be assigned a new ID.
id: -1,
state: Open,
sweeps: make(map[lntypes.Hash]sweep),
blockEpochChan: make(chan int32),
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
stopping: make(chan struct{}),
finished: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
wallet: bk.wallet,
chainNotifier: bk.chainNotifier,
signerClient: bk.signerClient,
muSig2SignSweep: bk.musig2SignSweep,
verifySchnorrSig: bk.verifySchnorrSig,
purger: bk.purger,
store: bk.store,
cfg: &cfg,
id: -1,
state: Open,
sweeps: make(map[lntypes.Hash]sweep),
blockEpochChan: make(chan int32),
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
stopping: make(chan struct{}),
finished: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
wallet: bk.wallet,
chainNotifier: bk.chainNotifier,
signerClient: bk.signerClient,
muSig2SignSweep: bk.musig2SignSweep,
verifySchnorrSig: bk.verifySchnorrSig,
publishErrorHandler: bk.publishErrorHandler,
purger: bk.purger,
store: bk.store,
cfg: &cfg,
}
}

Expand All @@ -396,32 +402,33 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
}

return &batch{
id: bk.id,
state: bk.state,
primarySweepID: bk.primaryID,
sweeps: bk.sweeps,
blockEpochChan: make(chan int32),
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
stopping: make(chan struct{}),
finished: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
batchPkScript: bk.batchPkScript,
rbfCache: bk.rbfCache,
wallet: bk.wallet,
chainNotifier: bk.chainNotifier,
signerClient: bk.signerClient,
muSig2SignSweep: bk.musig2SignSweep,
verifySchnorrSig: bk.verifySchnorrSig,
purger: bk.purger,
store: bk.store,
log: bk.log,
cfg: &cfg,
id: bk.id,
state: bk.state,
primarySweepID: bk.primaryID,
sweeps: bk.sweeps,
blockEpochChan: make(chan int32),
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
errChan: make(chan error, 1),
callEnter: make(chan struct{}),
callLeave: make(chan struct{}),
stopping: make(chan struct{}),
finished: make(chan struct{}),
quit: bk.quit,
batchTxid: bk.batchTxid,
batchPkScript: bk.batchPkScript,
rbfCache: bk.rbfCache,
wallet: bk.wallet,
chainNotifier: bk.chainNotifier,
signerClient: bk.signerClient,
muSig2SignSweep: bk.musig2SignSweep,
verifySchnorrSig: bk.verifySchnorrSig,
publishErrorHandler: bk.publishErrorHandler,
purger: bk.purger,
store: bk.store,
log: bk.log,
cfg: &cfg,
}, nil
}

Expand Down Expand Up @@ -795,23 +802,31 @@ func (b *batch) publish(ctx context.Context) error {
return err
}

// logPublishError is a function which logs publish errors.
logPublishError := func(errMsg string, err error) {
b.publishErrorHandler(err, errMsg, b.log)
}

if b.cfg.mixedBatch {
fee, err, signSuccess = b.publishMixedBatch(ctx)
if err != nil {
b.log.Warnf("Mixed batch publish error: %v", err)
logPublishError("mixed batch publish error", err)
}
} else {
fee, err, signSuccess = b.publishBatchCoop(ctx)
if err != nil {
b.log.Warnf("co-op publish error: %v", err)
logPublishError("co-op publish error", err)
}
}

if !signSuccess {
fee, err = b.publishBatch(ctx)
if err != nil {
logPublishError("non-coop publish error", err)
}
}

if err != nil {
b.log.Warnf("publish error: %v", err)
return nil
}

Expand Down
90 changes: 61 additions & 29 deletions sweepbatcher/sweep_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btclog"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop/labels"
"github.com/lightninglabs/loop/loopdb"
Expand Down Expand Up @@ -159,6 +160,15 @@ type VerifySchnorrSig func(pubKey *btcec.PublicKey, hash, sig []byte) error
type FeeRateProvider func(ctx context.Context,
swapHash lntypes.Hash) (chainfee.SatPerKWeight, error)

// PublishErrorHandler is a function that handles transaction publishing error.
type PublishErrorHandler func(err error, errMsg string, log btclog.Logger)

// logAllErrorsAsWarnings is an instance of PublishErrorHandler which logs all
// errors as warnings.
func logAllErrorsAsWarnings(err error, errMsg string, log btclog.Logger) {
log.Warnf("%s: %v", errMsg, err)
}

// SweepRequest is a request to sweep a specific outpoint.
type SweepRequest struct {
// SwapHash is the hash of the swap that is being swept.
Expand Down Expand Up @@ -296,6 +306,10 @@ type Batcher struct {
// expensive) way. If the whole procedure fails for whatever reason, the
// batch is signed non-cooperatively (the fallback).
mixedBatch bool

// publishErrorHandler is a function that handles transaction publishing
// error. By default, it logs all errors as warnings.
publishErrorHandler PublishErrorHandler
}

// BatcherConfig holds batcher configuration.
Expand Down Expand Up @@ -341,6 +355,10 @@ type BatcherConfig struct {
// expensive) way. If the whole procedure fails for whatever reason, the
// batch is signed non-cooperatively (the fallback).
mixedBatch bool

// publishErrorHandler is a function that handles transaction publishing
// error. By default, it logs all errors as warnings.
publishErrorHandler PublishErrorHandler
}

// BatcherOption configures batcher behaviour.
Expand Down Expand Up @@ -420,6 +438,14 @@ func WithMixedBatch() BatcherOption {
}
}

// WithPublishErrorHandler sets the callback used to handle publish errors.
// It can be used to filter out noisy messages.
func WithPublishErrorHandler(handler PublishErrorHandler) BatcherOption {
return func(cfg *BatcherConfig) {
cfg.publishErrorHandler = handler
}
}

// NewBatcher creates a new Batcher instance.
func NewBatcher(wallet lndclient.WalletKitClient,
chainNotifier lndclient.ChainNotifierClient,
Expand All @@ -432,6 +458,10 @@ func NewBatcher(wallet lndclient.WalletKitClient,
// By default, loop/labels.LoopOutBatchSweepSuccess is used
// to label sweep transactions.
txLabeler: labels.LoopOutBatchSweepSuccess,

// publishErrorHandler is a function that handles transaction
// publishing error. By default, it logs all errors as warnings.
publishErrorHandler: logAllErrorsAsWarnings,
}
for _, opt := range opts {
opt(&cfg)
Expand All @@ -448,26 +478,27 @@ func NewBatcher(wallet lndclient.WalletKitClient,
}

return &Batcher{
batches: make(map[int32]*batch),
sweepReqs: make(chan SweepRequest),
errChan: make(chan error, 1),
quit: make(chan struct{}),
initDone: make(chan struct{}),
wallet: wallet,
chainNotifier: chainNotifier,
signerClient: signerClient,
musig2ServerSign: musig2ServerSigner,
VerifySchnorrSig: verifySchnorrSig,
chainParams: chainparams,
store: store,
sweepStore: sweepStore,
clock: cfg.clock,
initialDelay: cfg.initialDelay,
publishDelay: cfg.publishDelay,
customFeeRate: cfg.customFeeRate,
txLabeler: cfg.txLabeler,
customMuSig2Signer: cfg.customMuSig2Signer,
mixedBatch: cfg.mixedBatch,
batches: make(map[int32]*batch),
sweepReqs: make(chan SweepRequest),
errChan: make(chan error, 1),
quit: make(chan struct{}),
initDone: make(chan struct{}),
wallet: wallet,
chainNotifier: chainNotifier,
signerClient: signerClient,
musig2ServerSign: musig2ServerSigner,
VerifySchnorrSig: verifySchnorrSig,
chainParams: chainparams,
store: store,
sweepStore: sweepStore,
clock: cfg.clock,
initialDelay: cfg.initialDelay,
publishDelay: cfg.publishDelay,
customFeeRate: cfg.customFeeRate,
txLabeler: cfg.txLabeler,
customMuSig2Signer: cfg.customMuSig2Signer,
mixedBatch: cfg.mixedBatch,
publishErrorHandler: cfg.publishErrorHandler,
}
}

Expand Down Expand Up @@ -1092,14 +1123,15 @@ func (b *Batcher) newBatchConfig(maxTimeoutDistance int32) batchConfig {
// newBatchKit creates new batch kit.
func (b *Batcher) newBatchKit() batchKit {
return batchKit{
returnChan: b.sweepReqs,
wallet: b.wallet,
chainNotifier: b.chainNotifier,
signerClient: b.signerClient,
musig2SignSweep: b.musig2ServerSign,
verifySchnorrSig: b.VerifySchnorrSig,
purger: b.AddSweep,
store: b.store,
quit: b.quit,
returnChan: b.sweepReqs,
wallet: b.wallet,
chainNotifier: b.chainNotifier,
signerClient: b.signerClient,
musig2SignSweep: b.musig2ServerSign,
verifySchnorrSig: b.VerifySchnorrSig,
publishErrorHandler: b.publishErrorHandler,
purger: b.AddSweep,
store: b.store,
quit: b.quit,
}
}
Loading

0 comments on commit 7701cf9

Please sign in to comment.