Skip to content

Commit

Permalink
notifications: add notification manager
Browse files Browse the repository at this point in the history
This commit adds a generic notification manager
that can be used to subscribe to different types
of notifications.
  • Loading branch information
sputn1ck committed Sep 19, 2024
1 parent aea7578 commit 2e81d93
Show file tree
Hide file tree
Showing 3 changed files with 380 additions and 0 deletions.
26 changes: 26 additions & 0 deletions notifications/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package notifications

import (
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
)

// Subsystem defines the sub system name of this package.
const Subsystem = "NTFNS"

// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger

// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger(Subsystem, nil))
}

// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}
193 changes: 193 additions & 0 deletions notifications/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package notifications

import (
"context"
"sync"
"time"

"github.com/lightninglabs/loop/swapserverrpc"
"google.golang.org/grpc"
)

type NotificationType int

const (
NotificationTypeUnknown NotificationType = iota
NotificationTypeReservation
)

type NotificationsClient interface {
SubscribeNotifications(ctx context.Context,
in *swapserverrpc.SubscribeNotificationsRequest,
opts ...grpc.CallOption) (
swapserverrpc.SwapServer_SubscribeNotificationsClient, error)
}

// Config contains all the services that the notification manager needs to
// operate.
type Config struct {
// Client is the client used to communicate with the swap server.
Client NotificationsClient

// FetchL402 is the function used to fetch the l402 token.
FetchL402 func(context.Context) error
}

// Manager is a manager for notifications that the swap server sends to the
// client.
type Manager struct {
cfg *Config

hasL402 bool

subscribers map[NotificationType][]subscriber
sync.Mutex
}

// NewManager creates a new notification manager.
func NewManager(cfg *Config) *Manager {
return &Manager{
cfg: cfg,
subscribers: make(map[NotificationType][]subscriber),
}
}

type subscriber struct {
subCtx context.Context
recvChan interface{}
}

// SubscribeReservations subscribes to the reservation notifications.
func (m *Manager) SubscribeReservations(ctx context.Context,
) <-chan *swapserverrpc.ServerReservationNotification {

// We'll create a channel that we'll use to send the notifications to the
// caller.
notifChan := make(
chan *swapserverrpc.ServerReservationNotification, //nolint:lll
)
sub := subscriber{
subCtx: ctx,
recvChan: notifChan,
}

m.Lock()
m.subscribers[NotificationTypeReservation] = append(
m.subscribers[NotificationTypeReservation],
sub,
)
m.Unlock()

return notifChan
}

// Run starts the notification manager.
func (n *Manager) Run(ctx context.Context, readyChan chan<- struct{}) error {
// In order to create a valid l402 we first are going to call
// the FetchL402 method. As a client might not have outbound capacity
// yet, we'll retry until we get a valid response.
if !n.hasL402 {
n.fetchL402(ctx)
}

var closedChan bool

// Start the notification runloop.
for {
// Return if the context has been canceled.
select {
case <-ctx.Done():
return nil
default:
}

callCtx, cancel := context.WithCancel(ctx)
notifStream, err := n.cfg.Client.SubscribeNotifications(
callCtx, &swapserverrpc.SubscribeNotificationsRequest{},
)
if err != nil {
cancel()
log.Errorf("Error subscribing to notifications: %v", err)
continue
}
if !closedChan {
close(readyChan)
}

log.Debugf("Successfully subscribed to server notifications")

for {
notification, err := notifStream.Recv()
if err == nil && notification != nil {
log.Debugf("Received notification: %v", notification)
n.handleNotification(notification)
continue
}
log.Errorf("Error receiving "+
"notification: %v", err)

cancel()
// Wait for a bit before reconnecting.
<-time.After(time.Second * 10)
break
}
}
}

// fetchL402 fetches the L402 from the server. This method will keep on
// retrying until it gets a valid response.
func (m *Manager) fetchL402(ctx context.Context) {
// Add a 0 timer so that we initially fetch the L402 immediately.
timer := time.NewTimer(0)
for {
select {
case <-ctx.Done():
return

case <-timer.C:
err := m.cfg.FetchL402(ctx)
if err != nil {
log.Warnf("Error fetching L402: %v", err)
timer.Reset(time.Second * 10)
continue
}
m.hasL402 = true

return
}
}
}

// handleNotification handles an incoming notification from the server,
// forwarding it to the appropriate subscribers.
func (n *Manager) handleNotification(notification *swapserverrpc.
SubscribeNotificationsResponse) {

switch notification.Notification.(type) {
case *swapserverrpc.SubscribeNotificationsResponse_ReservationNotification:
// We'll forward the reservation notification to all subscribers.
// Cleaning up any subscribers that have been canceled.
newSubs := []subscriber{}
reservationNtfn := notification.GetReservationNotification()
for _, sub := range n.subscribers[NotificationTypeReservation] {
recvChan := sub.recvChan.(chan *swapserverrpc.
ServerReservationNotification)

select {
case <-sub.subCtx.Done():
close(recvChan)
continue
case recvChan <- reservationNtfn:
newSubs = append(newSubs, sub)
}
}

n.Lock()
n.subscribers[NotificationTypeReservation] = newSubs
n.Unlock()

default:
log.Warnf("Received unknown notification type: %v",
notification)
}
}
161 changes: 161 additions & 0 deletions notifications/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package notifications

import (
"context"
"io"
"testing"
"time"

"github.com/lightninglabs/loop/swapserverrpc"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

var (
testReservationId = []byte{0x01, 0x02}
testReservationId2 = []byte{0x01, 0x02}
)

// mockNotificationsClient implements the NotificationsClient interface for testing.
type mockNotificationsClient struct {
mockStream swapserverrpc.SwapServer_SubscribeNotificationsClient
subscribeErr error
timesCalled int
}

func (m *mockNotificationsClient) SubscribeNotifications(ctx context.Context,
in *swapserverrpc.SubscribeNotificationsRequest,
opts ...grpc.CallOption) (
swapserverrpc.SwapServer_SubscribeNotificationsClient, error) {

m.timesCalled++
if m.subscribeErr != nil {
return nil, m.subscribeErr
}
return m.mockStream, nil
}

// mockSubscribeNotificationsClient simulates the server stream.
type mockSubscribeNotificationsClient struct {
grpc.ClientStream
recvChan chan *swapserverrpc.SubscribeNotificationsResponse
recvErrChan chan error
}

func (m *mockSubscribeNotificationsClient) Recv() (
*swapserverrpc.SubscribeNotificationsResponse, error) {

select {
case err := <-m.recvErrChan:
return nil, err
case notif, ok := <-m.recvChan:
if !ok {
return nil, io.EOF
}
return notif, nil
}
}

func (m *mockSubscribeNotificationsClient) Header() (metadata.MD, error) {
return nil, nil
}

func (m *mockSubscribeNotificationsClient) Trailer() metadata.MD {
return nil
}

func (m *mockSubscribeNotificationsClient) CloseSend() error {
return nil
}

func (m *mockSubscribeNotificationsClient) Context() context.Context {
return context.TODO()
}

func (m *mockSubscribeNotificationsClient) SendMsg(interface{}) error {
return nil
}

func (m *mockSubscribeNotificationsClient) RecvMsg(interface{}) error {
return nil
}

func TestManager_ReservationNotification(t *testing.T) {
// Create a mock notification client
recvChan := make(chan *swapserverrpc.SubscribeNotificationsResponse, 1)
errChan := make(chan error, 1)
mockStream := &mockSubscribeNotificationsClient{
recvChan: recvChan,
recvErrChan: errChan,
}
mockClient := &mockNotificationsClient{
mockStream: mockStream,
}

// Create a Manager with the mock client
mgr := NewManager(&Config{
Client: mockClient,
FetchL402: func(ctx context.Context) error {
// Simulate successful fetching of L402
return nil
},
})

// Subscribe to reservation notifications.
subCtx, subCancel := context.WithCancel(context.Background())
subChan := mgr.SubscribeReservations(subCtx)

// Run the manager.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rdyChan := make(chan struct{})
go func() {
err := mgr.Run(ctx, rdyChan)
require.NoError(t, err)
}()
<-rdyChan

// Wait a bit to ensure manager is running and has subscribed
time.Sleep(100 * time.Millisecond)
require.Equal(t, 1, mockClient.timesCalled)

// Send a test notification
testNotif := getTestNotification(testReservationId)

// Send the notification to the recvChan
recvChan <- testNotif

// Collect the notification in the callback
receivedNotification := <-subChan

// Now, check that the notification received in the callback matches the one sent
require.NotNil(t, receivedNotification)
require.Equal(t, testReservationId, receivedNotification.ReservationId)

// Cancel the subscription
subCancel()

// Send another test notification`
testNotif2 := getTestNotification(testReservationId2)
recvChan <- testNotif2

// Wait a bit to ensure the notification is not received
time.Sleep(100 * time.Millisecond)

require.Len(t, mgr.subscribers[NotificationTypeReservation], 0)

// Close the recvChan to stop the manager's receive loop
close(recvChan)
}

func getTestNotification(resId []byte) *swapserverrpc.SubscribeNotificationsResponse {
return &swapserverrpc.SubscribeNotificationsResponse{
Notification: &swapserverrpc.SubscribeNotificationsResponse_ReservationNotification{
ReservationNotification: &swapserverrpc.ServerReservationNotification{
ReservationId: resId,
},
},
}
}

0 comments on commit 2e81d93

Please sign in to comment.