Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalized notif stream #826

Merged
merged 3 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions instantout/reservation/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,24 @@ type mockReservationClient struct {
mock.Mock
}

func (m *mockReservationClient) OpenReservation(ctx context.Context,
in *swapserverrpc.ServerOpenReservationRequest,
opts ...grpc.CallOption) (*swapserverrpc.ServerOpenReservationResponse,
func (m *mockReservationClient) ReservationNotificationStream(
ctx context.Context, in *swapserverrpc.ReservationNotificationRequest,
opts ...grpc.CallOption,
) (swapserverrpc.ReservationService_ReservationNotificationStreamClient,
error) {

args := m.Called(ctx, in, opts)
return args.Get(0).(*swapserverrpc.ServerOpenReservationResponse),
return args.Get(0).(swapserverrpc.ReservationService_ReservationNotificationStreamClient),
args.Error(1)
}

func (m *mockReservationClient) ReservationNotificationStream(
ctx context.Context, in *swapserverrpc.ReservationNotificationRequest,
opts ...grpc.CallOption,
) (swapserverrpc.ReservationService_ReservationNotificationStreamClient,
func (m *mockReservationClient) OpenReservation(ctx context.Context,
in *swapserverrpc.ServerOpenReservationRequest,
opts ...grpc.CallOption) (*swapserverrpc.ServerOpenReservationResponse,
error) {

args := m.Called(ctx, in, opts)
return args.Get(0).(swapserverrpc.ReservationService_ReservationNotificationStreamClient),
return args.Get(0).(*swapserverrpc.ServerOpenReservationResponse),
args.Error(1)
}

Expand Down
5 changes: 3 additions & 2 deletions instantout/reservation/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ type Config struct {
// swap server.
ReservationClient swapserverrpc.ReservationServiceClient

// FetchL402 is the function used to fetch the l402 token.
FetchL402 func(context.Context) error
// NotificationManager is the manager that handles the notification
// subscriptions.
NotificationManager NotificationManager
}

// FSM is the state machine that manages the reservation lifecycle.
Expand Down
9 changes: 9 additions & 0 deletions instantout/reservation/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package reservation
import (
"context"
"fmt"

"github.com/lightninglabs/loop/swapserverrpc"
)

var (
Expand Down Expand Up @@ -31,3 +33,10 @@ type Store interface {
// made.
ListReservations(ctx context.Context) ([]*Reservation, error)
}

// NotificationManager handles subscribing to incoming reservation
// subscriptions.
type NotificationManager interface {
SubscribeReservations(context.Context,
) <-chan *swapserverrpc.ServerReservationNotification
}
109 changes: 2 additions & 107 deletions instantout/reservation/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ type Manager struct {
// activeReservations contains all the active reservationsFSMs.
activeReservations map[ID]*FSM

// hasL402 is true if the client has a valid L402.
hasL402 bool

runCtx context.Context

sync.Mutex
Expand Down Expand Up @@ -59,22 +56,15 @@ func (m *Manager) Run(ctx context.Context, height int32) error {
return err
}

reservationResChan := make(
chan *reservationrpc.ServerReservationNotification,
)

err = m.RegisterReservationNotifications(reservationResChan)
if err != nil {
return err
}
ntfnChan := m.cfg.NotificationManager.SubscribeReservations(ctx)

for {
select {
case height := <-newBlockChan:
log.Debugf("Received block %v", height)
currentHeight = height

case reservationRes := <-reservationResChan:
case reservationRes := <-ntfnChan:
log.Debugf("Received reservation %x",
reservationRes.ReservationId)
_, err := m.newReservation(
Expand Down Expand Up @@ -157,101 +147,6 @@ func (m *Manager) newReservation(ctx context.Context, currentHeight uint32,
return reservationFSM, nil
}

// fetchL402 fetches the L402 from the server. This method will keep on
// retrying until it gets a valid response.
func (m *Manager) fetchL402(ctx context.Context) {
// Add a 0 timer so that we initially fetch the L402 immediately.
timer := time.NewTimer(0)
for {
select {
case <-ctx.Done():
return

case <-timer.C:
err := m.cfg.FetchL402(ctx)
if err != nil {
log.Warnf("Error fetching L402: %v", err)
timer.Reset(time.Second * 10)
continue
}
m.hasL402 = true
return
}
}
}

// RegisterReservationNotifications registers a new reservation notification
// stream.
func (m *Manager) RegisterReservationNotifications(
reservationChan chan *reservationrpc.ServerReservationNotification) error {

// In order to create a valid l402 we first are going to call
// the FetchL402 method. As a client might not have outbound capacity
// yet, we'll retry until we get a valid response.
if !m.hasL402 {
m.fetchL402(m.runCtx)
}

ctx, cancel := context.WithCancel(m.runCtx)

// We'll now subscribe to the reservation notifications.
reservationStream, err := m.cfg.ReservationClient.
ReservationNotificationStream(
ctx, &reservationrpc.ReservationNotificationRequest{},
)
if err != nil {
cancel()
return err
}

log.Debugf("Successfully subscribed to reservation notifications")

// We'll now start a goroutine that will forward all the reservation
// notifications to the reservationChan.
go func() {
for {
reservationRes, err := reservationStream.Recv()
if err == nil && reservationRes != nil {
log.Debugf("Received reservation %x",
reservationRes.ReservationId)
reservationChan <- reservationRes
continue
}
log.Errorf("Error receiving "+
"reservation: %v", err)

cancel()

// If we encounter an error, we'll
// try to reconnect.
for {
select {
case <-m.runCtx.Done():
return

case <-time.After(time.Second * 10):
log.Debugf("Reconnecting to " +
"reservation notifications")
err = m.RegisterReservationNotifications(
reservationChan,
)
if err != nil {
log.Errorf("Error "+
"reconnecting: %v", err)
continue
}

// If we were able to reconnect, we'll
// return.
return
}
}
}
}()

return nil
}

// RecoverReservations tries to recover all reservations that are still active
// from the database.
func (m *Manager) RecoverReservations(ctx context.Context) error {
Expand Down
38 changes: 15 additions & 23 deletions instantout/reservation/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

var (
Expand Down Expand Up @@ -118,27 +117,22 @@ func newManagerTestContext(t *testing.T) *ManagerTestContext {

sendChan := make(chan *swapserverrpc.ServerReservationNotification)

mockReservationClient.On(
"ReservationNotificationStream", mock.Anything, mock.Anything,
mock.Anything,
).Return(
&dummyReservationNotificationServer{
SendChan: sendChan,
}, nil,
)

mockReservationClient.On(
"OpenReservation", mock.Anything, mock.Anything, mock.Anything,
).Return(
&swapserverrpc.ServerOpenReservationResponse{}, nil,
)

mockNtfnManager := &mockNtfnManager{
sendChan: sendChan,
}

cfg := &Config{
Store: store,
Wallet: mockLnd.WalletKit,
ChainNotifier: mockLnd.ChainNotifier,
FetchL402: func(context.Context) error { return nil },
ReservationClient: mockReservationClient,
Store: store,
Wallet: mockLnd.WalletKit,
ChainNotifier: mockLnd.ChainNotifier,
ReservationClient: mockReservationClient,
NotificationManager: mockNtfnManager,
}

manager := NewManager(cfg)
Expand All @@ -152,17 +146,15 @@ func newManagerTestContext(t *testing.T) *ManagerTestContext {
}
}

type dummyReservationNotificationServer struct {
grpc.ClientStream

// SendChan is the channel that is used to send notifications.
SendChan chan *swapserverrpc.ServerReservationNotification
type mockNtfnManager struct {
sendChan chan *swapserverrpc.ServerReservationNotification
}

func (d *dummyReservationNotificationServer) Recv() (
*swapserverrpc.ServerReservationNotification, error) {
func (m *mockNtfnManager) SubscribeReservations(
ctx context.Context,
) <-chan *swapserverrpc.ServerReservationNotification {

return <-d.SendChan, nil
return m.sendChan
}

func mustDecodeID(id string) ID {
Expand Down
31 changes: 26 additions & 5 deletions loopd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/lightninglabs/loop/loopd/perms"
"github.com/lightninglabs/loop/loopdb"
loop_looprpc "github.com/lightninglabs/loop/looprpc"
"github.com/lightninglabs/loop/notifications"
loop_swaprpc "github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightninglabs/loop/sweepbatcher"
"github.com/lightningnetwork/lnd/clock"
Expand Down Expand Up @@ -501,21 +502,41 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
}
}

// Start the notification manager.
notificationCfg := &notifications.Config{
Client: loop_swaprpc.NewSwapServerClient(swapClient.Conn),
FetchL402: swapClient.Server.FetchL402,
}
notificationManager := notifications.NewManager(notificationCfg)

d.wg.Add(1)
go func() {
defer d.wg.Done()

log.Info("Starting notification manager")
err := notificationManager.Run(d.mainCtx)
if err != nil {
d.internalErrChan <- err
log.Errorf("Notification manager stopped: %v", err)
}
}()

var (
reservationManager *reservation.Manager
instantOutManager *instantout.Manager
)

// Create the reservation and instantout managers.
if d.cfg.EnableExperimental {
reservationStore := reservation.NewSQLStore(
loopdb.NewTypedStore[reservation.Querier](baseDb),
)
reservationConfig := &reservation.Config{
Store: reservationStore,
Wallet: d.lnd.WalletKit,
ChainNotifier: d.lnd.ChainNotifier,
ReservationClient: reservationClient,
FetchL402: swapClient.Server.FetchL402,
Store: reservationStore,
Wallet: d.lnd.WalletKit,
ChainNotifier: d.lnd.ChainNotifier,
ReservationClient: reservationClient,
NotificationManager: notificationManager,
}

reservationManager = reservation.NewManager(
Expand Down
4 changes: 4 additions & 0 deletions loopd/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/lightninglabs/loop/instantout/reservation"
"github.com/lightninglabs/loop/liquidity"
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/notifications"
"github.com/lightninglabs/loop/sweepbatcher"
"github.com/lightningnetwork/lnd"
"github.com/lightningnetwork/lnd/build"
Expand Down Expand Up @@ -48,6 +49,9 @@ func SetupLoggers(root *build.RotatingLogWriter, intercept signal.Interceptor) {
lnd.AddSubLogger(
root, instantout.Subsystem, intercept, instantout.UseLogger,
)
lnd.AddSubLogger(
root, notifications.Subsystem, intercept, notifications.UseLogger,
)
}

// genSubLogger creates a logger for a subsystem. We provide an instance of
Expand Down
26 changes: 26 additions & 0 deletions notifications/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package notifications

import (
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
)

// Subsystem defines the sub system name of this package.
const Subsystem = "NTFNS"

// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger

// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger(Subsystem, nil))
}

// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}
Loading
Loading