From a4ba932a23d1a22eb5ceaeb178797ce84d922b5d Mon Sep 17 00:00:00 2001 From: Konstantin Makarov Date: Thu, 22 Feb 2024 01:07:45 +0400 Subject: [PATCH] Refactoring Simplification. Add metrics. --- config/metrics.go | 28 ++- go.mod | 1 + go.sum | 2 - listener/errors.go | 13 -- listener/errors_test.go | 70 ------ listener/listener.go | 193 +++++++++------- listener/listener_test.go | 370 ++++++++++++------------------- listener/monitor_test.go | 2 + listener/repository.go | 12 +- listener/repository_mock_test.go | 10 +- listener/wal_transaction.go | 1 + 11 files changed, 297 insertions(+), 405 deletions(-) delete mode 100644 listener/errors_test.go diff --git a/config/metrics.go b/config/metrics.go index fa0049a..14623c5 100644 --- a/config/metrics.go +++ b/config/metrics.go @@ -7,9 +7,16 @@ import ( // Metrics Prometheus metrics. type Metrics struct { - filterSkippedEvents, publishedEvents *prometheus.CounterVec + filterSkippedEvents, publishedEvents, problematicEvents *prometheus.CounterVec } +const ( + labelApp = "app" + labelTable = "table" + labelSubject = "subject" + labelKind = "kind" +) + // NewMetrics create and initialize new Prometheus metrics. func NewMetrics() *Metrics { return &Metrics{ @@ -17,13 +24,19 @@ func NewMetrics() *Metrics { Name: "published_events_total", Help: "The total number of published events", }, - []string{"app", "subject", "table"}, + []string{labelApp, labelSubject, labelTable}, + ), + problematicEvents: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "problematic_events_total", + Help: "The total number of skipped problematic events", + }, + []string{labelApp, labelKind}, ), filterSkippedEvents: promauto.NewCounterVec(prometheus.CounterOpts{ Name: "filter_skipped_events_total", Help: "The total number of skipped events", }, - []string{"app", "table"}, + []string{labelApp, labelTable}, ), } } @@ -32,10 +45,15 @@ const appName = "wal-listener" // IncPublishedEvents increment published events counter. func (m Metrics) IncPublishedEvents(subject, table string) { - m.publishedEvents.With(prometheus.Labels{"app": appName, "subject": subject, "table": table}).Inc() + m.publishedEvents.With(prometheus.Labels{labelApp: appName, labelSubject: subject, labelTable: table}).Inc() } // IncFilterSkippedEvents increment skipped by filter events counter. func (m Metrics) IncFilterSkippedEvents(table string) { - m.filterSkippedEvents.With(prometheus.Labels{"app": appName, "table": table}).Inc() + m.filterSkippedEvents.With(prometheus.Labels{labelApp: appName, labelTable: table}).Inc() +} + +// IncProblematicEvents increment skipped by filter events counter. +func (m Metrics) IncProblematicEvents(kind string) { + m.problematicEvents.With(prometheus.Labels{labelApp: appName, labelKind: kind}).Inc() } diff --git a/go.mod b/go.mod index fe05473..b5dd5e6 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/urfave/cli/v2 v2.26.0 github.com/wagslane/go-rabbitmq v0.12.4 + golang.org/x/sync v0.6.0 ) require ( diff --git a/go.sum b/go.sum index e50cea9..a110496 100644 --- a/go.sum +++ b/go.sum @@ -40,8 +40,6 @@ github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= -github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= diff --git a/listener/errors.go b/listener/errors.go index fd23929..90c7340 100644 --- a/listener/errors.go +++ b/listener/errors.go @@ -11,16 +11,3 @@ var ( errUnknownMessageType = errors.New("unknown message type") errRelationNotFound = errors.New("relation not found") ) - -type serviceErr struct { - Caller string - Err error -} - -func newListenerError(caller string, err error) *serviceErr { - return &serviceErr{Caller: caller, Err: err} -} - -func (e *serviceErr) Error() string { - return e.Caller + ": " + e.Err.Error() -} diff --git a/listener/errors_test.go b/listener/errors_test.go deleted file mode 100644 index 75c1ad0..0000000 --- a/listener/errors_test.go +++ /dev/null @@ -1,70 +0,0 @@ -package listener - -import ( - "errors" - "reflect" - "testing" -) - -func Test_serviceErr_Error(t *testing.T) { - type fields struct { - Caller string - Err error - } - tests := []struct { - name string - fields fields - want string - }{ - { - name: "success", - fields: fields{ - Caller: "hello()", - Err: errors.New("invalid username"), - }, - want: "hello(): invalid username", - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - e := &serviceErr{ - Caller: tt.fields.Caller, - Err: tt.fields.Err, - } - if got := e.Error(); got != tt.want { - t.Errorf("Error() = %v, want %v", got, tt.want) - } - }) - } -} - -func Test_newListenerError(t *testing.T) { - type args struct { - caller string - err error - } - tests := []struct { - name string - args args - want *serviceErr - }{ - { - name: "success", - args: args{ - caller: "hello()", - err: errors.New("invalid username"), - }, - want: &serviceErr{ - Caller: "hello()", - Err: errors.New("invalid username"), - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := newListenerError(tt.args.caller, tt.args.err); !reflect.DeepEqual(got, tt.want) { - t.Errorf("newListenerError() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/listener/listener.go b/listener/listener.go index c449e2d..caf849e 100644 --- a/listener/listener.go +++ b/listener/listener.go @@ -2,7 +2,6 @@ package listener import ( "context" - "errors" "fmt" "log/slog" "os" @@ -11,13 +10,12 @@ import ( "time" "github.com/jackc/pgx" + "golang.org/x/sync/errgroup" "github.com/ihippik/wal-listener/v2/config" "github.com/ihippik/wal-listener/v2/publisher" ) -const errorBufferSize = 100 - // Logical decoding plugin. const pgOutputPlugin = "pgoutput" @@ -42,6 +40,7 @@ type replication interface { type repository interface { CreatePublication(name string) error GetSlotLSN(slotName string) (string, error) + NewStandbyStatus(walPositions ...uint64) (status *pgx.StandbyStatus, err error) IsAlive() bool Close() error } @@ -49,6 +48,7 @@ type repository interface { type monitor interface { IncPublishedEvents(subject, table string) IncFilterSkippedEvents(table string) + IncProblematicEvents(kind string) } // Listener main service struct. @@ -57,13 +57,11 @@ type Listener struct { log *slog.Logger monitor monitor mu sync.RWMutex - slotName string publisher eventPublisher replicator replication repository repository parser parser lsn uint64 - errChannel chan error } // NewWalListener create and initialize new service instance. @@ -79,19 +77,17 @@ func NewWalListener( return &Listener{ log: log, monitor: monitor, - slotName: cfg.Listener.SlotName, cfg: cfg, publisher: pub, repository: repo, replicator: repl, parser: parser, - errChannel: make(chan error, errorBufferSize), } } // Process is main service entry point. func (l *Listener) Process(ctx context.Context) error { - logger := l.log.With("slot_name", l.slotName) + logger := l.log.With("slot_name", l.cfg.Listener.SlotName) ctx, stop := signal.NotifyContext(ctx, os.Interrupt) defer stop() @@ -108,7 +104,7 @@ func (l *Listener) Process(ctx context.Context) error { } if !slotIsExists { - consistentPoint, _, err := l.replicator.CreateReplicationSlotEx(l.slotName, pgOutputPlugin) + consistentPoint, _, err := l.replicator.CreateReplicationSlotEx(l.cfg.Listener.SlotName, pgOutputPlugin) if err != nil { return fmt.Errorf("create replication slot: %w", err) } @@ -120,19 +116,32 @@ func (l *Listener) Process(ctx context.Context) error { l.setLSN(lsn) - logger.Info("new slot was created", slog.String("slot", l.slotName)) + logger.Info("new slot was created", slog.String("slot", l.cfg.Listener.SlotName)) } else { logger.Info("slot already exists, LSN updated") } - go l.Stream(ctx) + group := new(errgroup.Group) + + group.Go(func() error { + return l.Stream(ctx) + }) + group.Go(func() error { + return l.checkConnection(ctx) + }) + + if err = group.Wait(); err != nil { + return err + } + + return nil +} +// checkConnection periodically checks connections. +func (l *Listener) checkConnection(ctx context.Context) error { refresh := time.NewTicker(l.cfg.Listener.RefreshConnection) defer refresh.Stop() - var svcErr *serviceErr - -ProcessLoop: for { select { case <-refresh.C: @@ -143,35 +152,27 @@ ProcessLoop: if !l.repository.IsAlive() { return fmt.Errorf("repository: %w", errConnectionIsLost) } - case err := <-l.errChannel: - if errors.As(err, &svcErr) { - return svcErr - } - - logger.Error("listener: received error", "err", err) case <-ctx.Done(): - logger.Debug("listener: context was canceled") + l.log.Debug("cgeck connection: context was canceled") if err := l.Stop(); err != nil { - logger.Error("listener: stop error", "err", err) + l.log.Error("failed to stop service", "err", err) } - break ProcessLoop + return nil } } - - return nil } // slotIsExists checks whether a slot has already been created and if it has been created uses it. func (l *Listener) slotIsExists() (bool, error) { - restartLSNStr, err := l.repository.GetSlotLSN(l.slotName) + restartLSNStr, err := l.repository.GetSlotLSN(l.cfg.Listener.SlotName) if err != nil { return false, fmt.Errorf("get slot lsn: %w", err) } if len(restartLSNStr) == 0 { - l.log.Warn("restart LSN not found", slog.String("slot_name", l.slotName)) + l.log.Warn("restart LSN not found", slog.String("slot_name", l.cfg.Listener.SlotName)) return false, nil } @@ -190,18 +191,23 @@ const ( publicationName = "wal-listener" ) +const ( + problemKindParse = "parse" + problemKindPublish = "publish" + problemKindAck = "ack" +) + // Stream receive event from PostgreSQL. // Accept message, apply filter and publish it in NATS server. -func (l *Listener) Stream(ctx context.Context) { +func (l *Listener) Stream(ctx context.Context) error { if err := l.replicator.StartReplication( - l.slotName, + l.cfg.Listener.SlotName, l.readLSN(), -1, protoVersion, publicationNames(publicationName), ); err != nil { - l.errChannel <- newListenerError("StartReplication()", err) - return + return fmt.Errorf("start replication: %w", err) } go l.SendPeriodicHeartbeats(ctx) @@ -210,77 +216,92 @@ func (l *Listener) Stream(ctx context.Context) { for { if err := ctx.Err(); err != nil { - l.errChannel <- fmt.Errorf("stream: context canceled: %w", err) - break + l.log.Warn("stream: context canceled", "err", err) + return nil } msg, err := l.replicator.WaitForReplicationMessage(ctx) if err != nil { - l.errChannel <- newListenerError("stream: wait for replication message", err) + return fmt.Errorf("wait for replication message: %w", err) + } + + if msg == nil { + l.log.Debug("got empty message") continue } - if msg != nil { - if msg.WalMessage != nil { - l.log.Debug("receive WAL message", slog.Uint64("wal", msg.WalMessage.WalStart)) + if err = l.processMessage(ctx, msg, tx); err != nil { + return fmt.Errorf("process message: %w", err) + } + + l.processHeartBeat(msg) + } +} - if err := l.parser.ParseWalMessage(msg.WalMessage.WalData, tx); err != nil { - l.log.Error("message parse failed", "err", err) - l.errChannel <- fmt.Errorf("parse WAL message: %w", err) +func (l *Listener) processMessage(ctx context.Context, msg *pgx.ReplicationMessage, tx *WalTransaction) error { + if msg.WalMessage == nil { + l.log.Debug("empty wal-message") + return nil + } - continue - } + l.log.Debug("WAL message has been received", slog.Uint64("wal", msg.WalMessage.WalStart)) - if tx.CommitTime != nil { - natsEvents := tx.CreateEventsWithFilter(l.cfg.Listener.Filter.Tables) + if err := l.parser.ParseWalMessage(msg.WalMessage.WalData, tx); err != nil { + l.monitor.IncProblematicEvents(problemKindParse) + return fmt.Errorf("parse: %w", err) + } - for _, event := range natsEvents { - subjectName := event.SubjectName(l.cfg) + if tx.CommitTime != nil { + for _, event := range tx.CreateEventsWithFilter(l.cfg.Listener.Filter.Tables) { + subjectName := event.SubjectName(l.cfg) - if err = l.publisher.Publish(ctx, subjectName, event); err != nil { - l.errChannel <- fmt.Errorf("publish message: %w", err) - continue - } + if err := l.publisher.Publish(ctx, subjectName, event); err != nil { + l.monitor.IncProblematicEvents(problemKindPublish) + return fmt.Errorf("publish: %w", err) + } - l.monitor.IncPublishedEvents(subjectName, event.Table) + l.monitor.IncPublishedEvents(subjectName, event.Table) - l.log.Info( - "event was sent", - slog.String("subject", subjectName), - slog.String("action", event.Action), - slog.String("table", event.Table), - slog.Uint64("lsn", l.readLSN()), - ) - } + l.log.Info( + "event was sent", + slog.String("subject", subjectName), + slog.String("action", event.Action), + slog.String("table", event.Table), + slog.Uint64("lsn", l.readLSN()), + ) + } - tx.Clear() - } + tx.Clear() + } - if msg.WalMessage.WalStart > l.readLSN() { - if err = l.AckWalMessage(msg.WalMessage.WalStart); err != nil { - l.errChannel <- fmt.Errorf("acknowledge WAL message: %w", err) - continue - } + if msg.WalMessage.WalStart > l.readLSN() { + if err := l.AckWalMessage(msg.WalMessage.WalStart); err != nil { + l.monitor.IncProblematicEvents(problemKindAck) + return fmt.Errorf("ack: %w", err) + } - l.log.Debug("ack WAL msg", slog.Uint64("lsn", l.readLSN())) - } - } + l.log.Debug("ack WAL message", slog.Uint64("lsn", l.readLSN())) + } - if msg.ServerHeartbeat != nil { - l.log.Debug( - "received server heartbeat", - slog.Uint64("server_wal_end", msg.ServerHeartbeat.ServerWalEnd), - slog.Uint64("server_time", msg.ServerHeartbeat.ServerTime), - ) + return nil +} - if msg.ServerHeartbeat.ReplyRequested == 1 { - l.log.Debug("status requested") +func (l *Listener) processHeartBeat(msg *pgx.ReplicationMessage) { + if msg.ServerHeartbeat == nil { + return + } - if err = l.SendStandbyStatus(); err != nil { - l.errChannel <- fmt.Errorf("send standby status: %w", err) - } - } - } + l.log.Debug( + "received server heartbeat", + slog.Uint64("server_wal_end", msg.ServerHeartbeat.ServerWalEnd), + slog.Uint64("server_time", msg.ServerHeartbeat.ServerTime), + ) + + if msg.ServerHeartbeat.ReplyRequested == 1 { + l.log.Debug("status requested") + + if err := l.SendStandbyStatus(); err != nil { + l.log.Warn("send standby status: %w", err) } } } @@ -316,18 +337,20 @@ func (l *Listener) SendPeriodicHeartbeats(ctx context.Context) { return case <-heart.C: if err := l.SendStandbyStatus(); err != nil { - l.log.Error("failed to send status heartbeat", "err", err) + l.log.Error("failed to send heartbeat status", "err", err) continue } - l.log.Debug("sending periodic status heartbeat") + l.log.Debug("sending periodic heartbeat status") } } } // SendStandbyStatus sends a `StandbyStatus` object with the current RestartLSN value to the server. func (l *Listener) SendStandbyStatus() error { - standbyStatus, err := pgx.NewStandbyStatus(l.readLSN()) + lsn := l.readLSN() + + standbyStatus, err := l.repository.NewStandbyStatus(lsn) if err != nil { return fmt.Errorf("unable to create StandbyStatus object: %w", err) } diff --git a/listener/listener_test.go b/listener/listener_test.go index 9f007e0..fdcbc71 100644 --- a/listener/listener_test.go +++ b/listener/listener_test.go @@ -96,8 +96,10 @@ func TestListener_slotIsExists(t *testing.T) { tt.setup() w := &Listener{ - log: logger, - slotName: tt.fields.slotName, + log: logger, + cfg: &config.Config{Listener: &config.ListenerCfg{ + SlotName: tt.fields.slotName, + }}, repository: repo, } @@ -191,11 +193,17 @@ func nowInNano() uint64 { } func TestListener_SendStandbyStatus(t *testing.T) { - repl := new(replicatorMock) type fields struct { restartLSN uint64 } + repl := new(replicatorMock) + repo := new(repositoryMock) + + setNewStandbyStatus := func(walPositions []uint64, status *pgx.StandbyStatus, err error) { + repo.On("NewStandbyStatus", walPositions).Return(status, err).After(10 * time.Millisecond).Once() + } + setSendStandbyStatus := func(status *pgx.StandbyStatus, err error) { repl.On( "SendStandbyStatus", @@ -216,11 +224,19 @@ func TestListener_SendStandbyStatus(t *testing.T) { { name: "success", setup: func() { + setNewStandbyStatus([]uint64{10}, &pgx.StandbyStatus{ + WalWritePosition: 10, + WalFlushPosition: 10, + WalApplyPosition: 10, + ClientTime: nowInNano(), + ReplyRequested: 0, + }, nil) + setSendStandbyStatus( &pgx.StandbyStatus{ - WalWritePosition: 0, - WalFlushPosition: 0, - WalApplyPosition: 0, + WalWritePosition: 10, + WalFlushPosition: 10, + WalApplyPosition: 10, ClientTime: nowInNano(), ReplyRequested: 0, }, @@ -228,18 +244,26 @@ func TestListener_SendStandbyStatus(t *testing.T) { ) }, fields: fields{ - restartLSN: 0, + restartLSN: 10, }, wantErr: false, }, { - name: "some err", + name: "some replicator err", setup: func() { + setNewStandbyStatus([]uint64{10}, &pgx.StandbyStatus{ + WalWritePosition: 10, + WalFlushPosition: 10, + WalApplyPosition: 10, + ClientTime: nowInNano(), + ReplyRequested: 0, + }, nil) + setSendStandbyStatus( &pgx.StandbyStatus{ - WalWritePosition: 0, - WalFlushPosition: 0, - WalApplyPosition: 0, + WalWritePosition: 10, + WalFlushPosition: 10, + WalApplyPosition: 10, ClientTime: nowInNano(), ReplyRequested: 0, }, @@ -247,7 +271,23 @@ func TestListener_SendStandbyStatus(t *testing.T) { ) }, fields: fields{ - restartLSN: 0, + restartLSN: 10, + }, + wantErr: true, + }, + { + name: "some repo err", + setup: func() { + setNewStandbyStatus([]uint64{10}, &pgx.StandbyStatus{ + WalWritePosition: 10, + WalFlushPosition: 10, + WalApplyPosition: 10, + ClientTime: nowInNano(), + ReplyRequested: 0, + }, errors.New("some err")) + }, + fields: fields{ + restartLSN: 10, }, wantErr: true, }, @@ -255,19 +295,21 @@ func TestListener_SendStandbyStatus(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + defer repl.AssertExpectations(t) + defer repo.AssertExpectations(t) + tt.setup() w := &Listener{ log: logger, replicator: repl, + repository: repo, lsn: tt.fields.restartLSN, } if err := w.SendStandbyStatus(); (err != nil) != tt.wantErr { t.Errorf("SendStandbyStatus() error = %v, wantErr %v", err, tt.wantErr) } - - repl.AssertExpectations(t) }) } } @@ -290,8 +332,6 @@ func standByStatusMatcher(want *pgx.StandbyStatus) any { } func TestListener_AckWalMessage(t *testing.T) { - repl := new(replicatorMock) - type fields struct { restartLSN uint64 } @@ -300,6 +340,13 @@ func TestListener_AckWalMessage(t *testing.T) { LSN uint64 } + repl := new(replicatorMock) + repo := new(repositoryMock) + + setNewStandbyStatus := func(walPositions []uint64, status *pgx.StandbyStatus, err error) { + repo.On("NewStandbyStatus", walPositions).Return(status, err).After(10 * time.Millisecond).Once() + } + setSendStandbyStatus := func(status *pgx.StandbyStatus, err error) { repl.On( "SendStandbyStatus", @@ -319,6 +366,14 @@ func TestListener_AckWalMessage(t *testing.T) { { name: "success", setup: func() { + setNewStandbyStatus([]uint64{24658872}, &pgx.StandbyStatus{ + WalWritePosition: 24658872, + WalFlushPosition: 24658872, + WalApplyPosition: 24658872, + ClientTime: nowInNano(), + ReplyRequested: 0, + }, nil) + setSendStandbyStatus( &pgx.StandbyStatus{ WalWritePosition: 24658872, @@ -341,6 +396,14 @@ func TestListener_AckWalMessage(t *testing.T) { { name: "send status error", setup: func() { + setNewStandbyStatus([]uint64{24658872}, &pgx.StandbyStatus{ + WalWritePosition: 24658872, + WalFlushPosition: 24658872, + WalApplyPosition: 24658872, + ClientTime: nowInNano(), + ReplyRequested: 0, + }, nil) + setSendStandbyStatus( &pgx.StandbyStatus{ WalWritePosition: 24658872, @@ -370,6 +433,7 @@ func TestListener_AckWalMessage(t *testing.T) { w := &Listener{ log: logger, replicator: repl, + repository: repo, lsn: tt.fields.restartLSN, } if err := w.AckWalMessage(tt.args.LSN); (err != nil) != tt.wantErr { @@ -397,9 +461,12 @@ func TestListener_Stream(t *testing.T) { timeout time.Duration } + setNewStandbyStatus := func(walPositions []uint64, status *pgx.StandbyStatus, err error) { + repo.On("NewStandbyStatus", walPositions).Return(status, err).After(10 * time.Millisecond) + } + setParseWalMessageOnce := func(msg []byte, tx *WalTransaction, err error) { - prs.On("ParseWalMessage", msg, tx).Return(err).Once(). - After(10 * time.Millisecond) + prs.On("ParseWalMessage", msg, tx).Return(err) } setStartReplication := func(err error, slotName string, startLsn uint64, timeline int64, pluginArguments ...string) { @@ -409,14 +476,14 @@ func TestListener_Stream(t *testing.T) { startLsn, timeline, pluginArguments, - ).Return(err).Once().After(10 * time.Millisecond) + ).Return(err) } setWaitForReplicationMessage := func(msg *pgx.ReplicationMessage, err error) { repl.On( "WaitForReplicationMessage", mock.Anything, - ).Return(msg, err).Once().After(10 * time.Millisecond) + ).Return(msg, err).After(10 * time.Millisecond) } setSendStandbyStatus := func(want *pgx.StandbyStatus, err error) { @@ -428,8 +495,7 @@ func TestListener_Stream(t *testing.T) { want.WalWritePosition == got.WalWritePosition && abs(int64(want.ClientTime)-int64(got.ClientTime)) < 100000 }), - ). - Return(err).After(10 * time.Millisecond) + ).Return(err).After(10 * time.Millisecond) } setPublish := func(subject string, want publisher.Event, err error) { @@ -444,7 +510,7 @@ func TestListener_Stream(t *testing.T) { t.Errorf("- want + got\n- %#+v\n+ %#+v", want, got) } return ok - })).Return(err).Once().After(10 * time.Millisecond) + })).Return(err) } uuid.SetRand(bytes.NewReader(make([]byte, 512))) @@ -453,10 +519,11 @@ func TestListener_Stream(t *testing.T) { metrics := new(monitorMock) tests := []struct { - name string - setup func() - fields fields - args args + name string + setup func() + fields fields + args args + wantErr error }{ { name: "success", @@ -469,16 +536,15 @@ func TestListener_Stream(t *testing.T) { protoVersion, "publication_names 'wal-listener'", ) - setSendStandbyStatus( - &pgx.StandbyStatus{ - WalWritePosition: 0, - WalFlushPosition: 0, - WalApplyPosition: 0, - ClientTime: nowInNano(), - ReplyRequested: 0, - }, - nil, - ) + + setNewStandbyStatus([]uint64{10}, &pgx.StandbyStatus{ + WalWritePosition: 10, + WalFlushPosition: 10, + WalApplyPosition: 10, + ClientTime: nowInNano(), + ReplyRequested: 0, + }, nil) + setSendStandbyStatus( &pgx.StandbyStatus{ WalWritePosition: 10, @@ -489,6 +555,7 @@ func TestListener_Stream(t *testing.T) { }, nil, ) + setParseWalMessageOnce( []byte(`some bytes`), &WalTransaction{ @@ -538,7 +605,7 @@ func TestListener_Stream(t *testing.T) { Listener: &config.ListenerCfg{ SlotName: "myslot", AckTimeout: 0, - HeartbeatInterval: 1, + HeartbeatInterval: 5 * time.Millisecond, Filter: config.FilterStruct{ Tables: map[string][]string{"users": {"insert"}}, }, @@ -552,7 +619,7 @@ func TestListener_Stream(t *testing.T) { restartLSN: 0, }, args: args{ - timeout: 40 * time.Millisecond, + timeout: 5 * time.Millisecond, }, }, { @@ -587,138 +654,10 @@ func TestListener_Stream(t *testing.T) { args: args{ timeout: 100 * time.Microsecond, }, + wantErr: errors.New("start replication: some err"), }, { - name: "wait replication err", - setup: func() { - setStartReplication( - nil, - "myslot", - uint64(0), - int64(-1), - protoVersion, - "publication_names 'wal-listener'", - ) - setSendStandbyStatus( - &pgx.StandbyStatus{ - WalWritePosition: 0, - WalFlushPosition: 0, - WalApplyPosition: 0, - ClientTime: nowInNano(), - ReplyRequested: 0, - }, - nil, - ) - setWaitForReplicationMessage( - &pgx.ReplicationMessage{ - WalMessage: &pgx.WalMessage{ - WalStart: 10, - ServerWalEnd: 0, - ServerTime: 0, - WalData: []byte(`some bytes`), - }, - ServerHeartbeat: &pgx.ServerHeartbeat{ - ServerWalEnd: 0, - ServerTime: 0, - ReplyRequested: 1, - }, - }, - errSimple, - ) - }, - fields: fields{ - config: &config.Config{ - Listener: &config.ListenerCfg{ - SlotName: "myslot", - AckTimeout: 0, - HeartbeatInterval: 1, Filter: config.FilterStruct{ - Tables: map[string][]string{"users": {"insert"}}, - }, - }, - Publisher: &config.PublisherCfg{ - TopicPrefix: "pre_", - }, - }, - slotName: "myslot", - restartLSN: 0, - }, - args: args{ - timeout: 20 * time.Millisecond, - }, - }, - { - name: "parse err", - setup: func() { - setStartReplication( - nil, - "myslot", - uint64(0), - int64(-1), - protoVersion, - "publication_names 'wal-listener'", - ) - setSendStandbyStatus( - &pgx.StandbyStatus{ - WalWritePosition: 0, - WalFlushPosition: 0, - WalApplyPosition: 0, - ClientTime: nowInNano(), - ReplyRequested: 0, - }, - nil, - ) - setWaitForReplicationMessage( - &pgx.ReplicationMessage{ - WalMessage: &pgx.WalMessage{ - WalStart: 10, - ServerWalEnd: 0, - ServerTime: 0, - WalData: []byte(`some bytes`), - }, - ServerHeartbeat: &pgx.ServerHeartbeat{ - ServerWalEnd: 0, - ServerTime: 0, - ReplyRequested: 1, - }, - }, - nil, - ) - setParseWalMessageOnce( - []byte(`some bytes`), - &WalTransaction{ - monitor: metrics, - log: logger, - LSN: 0, - BeginTime: nil, - CommitTime: nil, - RelationStore: make(map[int32]RelationData), - Actions: nil, - }, - errSimple, - ) - }, - fields: fields{ - config: &config.Config{ - Listener: &config.ListenerCfg{ - SlotName: "myslot", - AckTimeout: 0, - HeartbeatInterval: 1, Filter: config.FilterStruct{ - Tables: map[string][]string{"users": {"insert"}}, - }, - }, - Publisher: &config.PublisherCfg{ - TopicPrefix: "pre_", - }, - }, - slotName: "myslot", - restartLSN: 0, - }, - args: args{ - timeout: 30 * time.Millisecond, - }, - }, - { - name: "publish err", + name: "wait for replication message err", setup: func() { setStartReplication( nil, @@ -728,6 +667,15 @@ func TestListener_Stream(t *testing.T) { protoVersion, "publication_names 'wal-listener'", ) + + setNewStandbyStatus([]uint64{0}, &pgx.StandbyStatus{ + WalWritePosition: 10, + WalFlushPosition: 10, + WalApplyPosition: 10, + ClientTime: nowInNano(), + ReplyRequested: 10, + }, nil) + setSendStandbyStatus( &pgx.StandbyStatus{ WalWritePosition: 10, @@ -738,16 +686,7 @@ func TestListener_Stream(t *testing.T) { }, nil, ) - setSendStandbyStatus( - &pgx.StandbyStatus{ - WalWritePosition: 0, - WalFlushPosition: 0, - WalApplyPosition: 0, - ClientTime: nowInNano(), - ReplyRequested: 0, - }, - nil, - ) + setWaitForReplicationMessage( &pgx.ReplicationMessage{ WalMessage: &pgx.WalMessage{ @@ -762,44 +701,8 @@ func TestListener_Stream(t *testing.T) { ReplyRequested: 1, }, }, - nil, - ) - setParseWalMessageOnce( - []byte(`some bytes`), - &WalTransaction{ - monitor: metrics, - log: logger, - LSN: 0, - BeginTime: nil, - CommitTime: nil, - RelationStore: make(map[int32]RelationData), - Actions: nil, - }, - nil, - ) - - setPublish( - "STREAM.pre_public_users", - publisher.Event{ - ID: uuid.MustParse("00000000-0000-4000-8000-000000000000"), - Schema: "public", - Table: "users", - Action: "INSERT", - Data: map[string]any{"id": 1}, - EventTime: time.Now(), - }, errSimple, ) - setSendStandbyStatus( - &pgx.StandbyStatus{ - WalWritePosition: 10, - WalFlushPosition: 10, - WalApplyPosition: 10, - ClientTime: nowInNano(), - ReplyRequested: 0, - }, - nil, - ) }, fields: fields{ config: &config.Config{ @@ -811,7 +714,7 @@ func TestListener_Stream(t *testing.T) { }, }, Publisher: &config.PublisherCfg{ - Topic: "STREAM", + Topic: "stream", TopicPrefix: "pre_", }, }, @@ -819,37 +722,37 @@ func TestListener_Stream(t *testing.T) { restartLSN: 0, }, args: args{ - timeout: 50 * time.Millisecond, + timeout: 100 * time.Microsecond, }, + wantErr: errors.New("wait for replication message: some err"), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + defer repl.AssertExpectations(t) + tt.setup() - ctx, cancel := context.WithTimeout(context.Background(), tt.args.timeout) + ctx, _ := context.WithTimeout(context.Background(), tt.args.timeout) + w := &Listener{ log: logger, monitor: metrics, cfg: tt.fields.config, - slotName: tt.fields.slotName, publisher: publ, replicator: repl, repository: repo, parser: prs, lsn: tt.fields.restartLSN, - errChannel: make(chan error, errorBufferSize), } - go func() { - <-w.errChannel - cancel() - }() - - w.Stream(ctx) + if err := w.Stream(ctx); err != nil && assert.Error(t, tt.wantErr, err.Error()) { + assert.EqualError(t, err, tt.wantErr.Error()) + } else { + assert.NoError(t, err) + } - repl.AssertExpectations(t) repl.ExpectedCalls = nil }) } @@ -908,6 +811,10 @@ func TestListener_Process(t *testing.T) { repl.On("CreateReplicationSlotEx", slotName, outputPlugin).Return(consistentPoint, snapshotName, err) } + setNewStandbyStatus := func(walPositions []uint64, status *pgx.StandbyStatus, err error) { + repo.On("NewStandbyStatus", walPositions).Return(status, err).After(10 * time.Millisecond) + } + tests := []struct { name string cfg *config.Config @@ -930,6 +837,15 @@ func TestListener_Process(t *testing.T) { }, setup: func() { ctx, _ = context.WithTimeout(ctx, time.Millisecond*200) + + setNewStandbyStatus([]uint64{1099511628288}, &pgx.StandbyStatus{ + WalWritePosition: 10, + WalFlushPosition: 10, + WalApplyPosition: 10, + ClientTime: nowInNano(), + ReplyRequested: 0, + }, nil) + setCreatePublication("wal-listener", nil) setGetSlotLSN("slot1", "100/200", nil) setStartReplication( diff --git a/listener/monitor_test.go b/listener/monitor_test.go index 5bc6c51..cb6064d 100644 --- a/listener/monitor_test.go +++ b/listener/monitor_test.go @@ -5,3 +5,5 @@ type monitorMock struct{} func (m *monitorMock) IncPublishedEvents(subject, table string) {} func (m *monitorMock) IncFilterSkippedEvents(table string) {} + +func (m *monitorMock) IncProblematicEvents(kind string) {} diff --git a/listener/repository.go b/listener/repository.go index 3859bf9..cd0bc74 100644 --- a/listener/repository.go +++ b/listener/repository.go @@ -2,6 +2,7 @@ package listener import ( "errors" + "fmt" "github.com/jackc/pgx" ) @@ -32,9 +33,16 @@ func (r RepositoryImpl) GetSlotLSN(slotName string) (string, error) { // CreatePublication create publication fo all. func (r RepositoryImpl) CreatePublication(name string) error { - _, err := r.conn.Exec(`CREATE PUBLICATION "` + name + `" FOR ALL TABLES`) + if _, err := r.conn.Exec(`CREATE PUBLICATION "` + name + `" FOR ALL TABLES`); err != nil { + return fmt.Errorf("exec: %w", err) + } + + return nil +} - return err +// IsAlive check database connection problems. +func (r RepositoryImpl) NewStandbyStatus(walPositions ...uint64) (status *pgx.StandbyStatus, err error) { + return pgx.NewStandbyStatus(walPositions...) } // IsAlive check database connection problems. diff --git a/listener/repository_mock_test.go b/listener/repository_mock_test.go index b43f511..96f8ab9 100644 --- a/listener/repository_mock_test.go +++ b/listener/repository_mock_test.go @@ -1,6 +1,9 @@ package listener -import "github.com/stretchr/testify/mock" +import ( + "github.com/jackc/pgx" + "github.com/stretchr/testify/mock" +) type repositoryMock struct { mock.Mock @@ -23,3 +26,8 @@ func (r *repositoryMock) CreatePublication(name string) (err error) { args := r.Called(name) return args.Error(0) } + +func (r *repositoryMock) NewStandbyStatus(walPositions ...uint64) (status *pgx.StandbyStatus, err error) { + args := r.Called(walPositions) + return args.Get(0).(*pgx.StandbyStatus), args.Error(1) +} diff --git a/listener/wal_transaction.go b/listener/wal_transaction.go index 64f96b7..dead8a2 100644 --- a/listener/wal_transaction.go +++ b/listener/wal_transaction.go @@ -8,6 +8,7 @@ import ( "github.com/goccy/go-json" "github.com/google/uuid" + "github.com/ihippik/wal-listener/v2/publisher" )