Skip to content

Commit

Permalink
feat: send kafka message when connector is reset
Browse files Browse the repository at this point in the history
Signed-off-by: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com>
  • Loading branch information
darkmatterpool committed Jan 5, 2023
1 parent b80d1ce commit c6b8c3d
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 111 deletions.
2 changes: 1 addition & 1 deletion internal/app/api/connectormodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func addConnector[ConnectorConfig models.ConnectorConfigObject](loader integrati
})

return integration.NewConnectorManager[ConnectorConfig](logger,
store, loader, schedulerFactory)
store, loader, schedulerFactory, publisher)
}),
fx.Provide(fx.Annotate(func(cm *integration.ConnectorManager[ConnectorConfig],
) connectorHandler {
Expand Down
8 changes: 7 additions & 1 deletion internal/app/ingestion/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

"github.com/formancehq/payments/internal/app/messages"

"github.com/formancehq/payments/internal/app/models"
)

Expand Down Expand Up @@ -44,7 +46,11 @@ func (i *DefaultIngester) IngestAccounts(ctx context.Context, batch AccountBatch
return fmt.Errorf("error upserting accounts: %w", err)
}

i.publish(ctx, TopicAccounts, NewEventSavedAccounts(accounts))
err := i.publisher.Publish(ctx, messages.TopicPayments,
messages.NewEventSavedAccounts(accounts))
if err != nil {
i.logger.Errorf("Publishing message: %w", err)
}

endedAt := time.Now()

Expand Down
105 changes: 0 additions & 105 deletions internal/app/ingestion/message.go

This file was deleted.

12 changes: 10 additions & 2 deletions internal/app/ingestion/payments.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"time"

"github.com/formancehq/payments/internal/app/messages"

"github.com/formancehq/payments/internal/app/models"
)

Expand Down Expand Up @@ -58,12 +60,18 @@ func (i *DefaultIngester) IngestPayments(ctx context.Context, batch PaymentBatch
return fmt.Errorf("error marshaling task state: %w", err)
}

if err := i.repo.UpdateTaskState(ctx, i.provider, taskDescriptor, taskState); err != nil {
if err = i.repo.UpdateTaskState(ctx, i.provider, taskDescriptor, taskState); err != nil {
return fmt.Errorf("error updating task state: %w", err)
}

for paymentIdx := range allPayments {
i.publish(ctx, TopicPayments, NewEventSavedPayments(allPayments[paymentIdx], i.provider))
err = i.publisher.Publish(ctx, messages.TopicPayments,
messages.NewEventSavedPayments(allPayments[paymentIdx], i.provider))
if err != nil {
i.logger.Errorf("Publishing message: %w", err)

continue
}
}

endedAt := time.Now()
Expand Down
20 changes: 19 additions & 1 deletion internal/app/integration/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package integration
import (
"context"

"github.com/formancehq/payments/internal/app/messages"

"github.com/formancehq/go-libs/sharedpublish"

"github.com/formancehq/payments/internal/app/storage"

"github.com/google/uuid"
Expand All @@ -29,6 +33,7 @@ type ConnectorManager[Config models.ConnectorConfigObject] struct {
store Repository
schedulerFactory TaskSchedulerFactory
scheduler *task.DefaultTaskScheduler
publisher sharedpublish.Publisher
}

func (l *ConnectorManager[ConnectorConfig]) Enable(ctx context.Context) error {
Expand Down Expand Up @@ -236,14 +241,26 @@ func (l *ConnectorManager[ConnectorConfig]) Reset(ctx context.Context) error {
return err
}

return l.Install(ctx, *config)
err = l.Install(ctx, *config)
if err != nil {
return err
}

err = l.publisher.Publish(ctx, messages.TopicPayments,
messages.NewEventResetConnector(l.loader.Name()))
if err != nil {
l.logger.Errorf("Publishing message: %w", err)
}

return nil
}

func NewConnectorManager[ConnectorConfig models.ConnectorConfigObject](
logger sharedlogging.Logger,
store Repository,
loader Loader[ConnectorConfig],
schedulerFactory TaskSchedulerFactory,
publisher sharedpublish.Publisher,
) *ConnectorManager[ConnectorConfig] {
return &ConnectorManager[ConnectorConfig]{
logger: logger.WithFields(map[string]interface{}{
Expand All @@ -253,5 +270,6 @@ func NewConnectorManager[ConnectorConfig models.ConnectorConfigObject](
store: store,
loader: loader,
schedulerFactory: schedulerFactory,
publisher: publisher,
}
}
2 changes: 1 addition & 1 deletion internal/app/integration/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func withManager[ConnectorConfig models.ConnectorConfigObject](builder *Connecto
WithAllowedTasks(1).
Build()
manager := NewConnectorManager[ConnectorConfig](logger, managerStore, loader,
schedulerFactory)
schedulerFactory, nil)

defer func() {
_ = manager.Uninstall(context.Background())
Expand Down
37 changes: 37 additions & 0 deletions internal/app/messages/accounts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package messages

import (
"time"

"github.com/formancehq/payments/internal/app/models"
)

type accountMessagePayload struct {
ID string `json:"id"`
CreatedAt time.Time `json:"createdAt"`
Reference string `json:"reference"`
Provider string `json:"provider"`
Type models.AccountType `json:"type"`
}

func NewEventSavedAccounts(accounts []models.Account) EventMessage {
payload := make([]accountMessagePayload, len(accounts))

for accountIdx, account := range accounts {
payload[accountIdx] = accountMessagePayload{
ID: account.ID.String(),
CreatedAt: account.CreatedAt,
Reference: account.Reference,
Provider: account.Provider,
Type: account.Type,
}
}

return EventMessage{
Date: time.Now().UTC(),
App: EventApp,
Version: EventVersion,
Type: EventTypeSavedAccounts,
Payload: payload,
}
}
25 changes: 25 additions & 0 deletions internal/app/messages/connectors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package messages

import (
"time"

"github.com/formancehq/payments/internal/app/models"
)

type connectorMessagePayload struct {
CreatedAt time.Time `json:"createdAt"`
Connector models.ConnectorProvider `json:"connector"`
}

func NewEventResetConnector(connector models.ConnectorProvider) EventMessage {
return EventMessage{
Date: time.Now().UTC(),
App: EventApp,
Version: EventVersion,
Type: EventTypeConnectorReset,
Payload: connectorMessagePayload{
CreatedAt: time.Now().UTC(),
Connector: connector,
},
}
}
25 changes: 25 additions & 0 deletions internal/app/messages/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package messages

import (
"time"
)

const (
TopicPayments = "payments"
TopicConnectors = "connectors"

EventVersion = "v1"
EventApp = "payments"

EventTypeSavedPayments = "SAVED_PAYMENT"
EventTypeSavedAccounts = "SAVED_ACCOUNT"
EventTypeConnectorReset = "CONNECTOR_RESET"
)

type EventMessage struct {
Date time.Time `json:"date"`
App string `json:"app"`
Version string `json:"version"`
Type string `json:"type"`
Payload any `json:"payload"`
}
45 changes: 45 additions & 0 deletions internal/app/messages/payments.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package messages

import (
"time"

"github.com/formancehq/payments/internal/app/models"
)

type paymentMessagePayload struct {
ID string `json:"id"`
Reference string `json:"reference"`
CreatedAt time.Time `json:"createdAt"`
Provider string `json:"provider"`
Type models.PaymentType `json:"type"`
Status models.PaymentStatus `json:"status"`
Scheme models.PaymentScheme `json:"scheme"`
Asset models.PaymentAsset `json:"asset"`

// TODO: Remove 'initialAmount' once frontend has switched to 'amount
InitialAmount int64 `json:"initialAmount"`
Amount int64 `json:"amount"`
}

func NewEventSavedPayments(payment *models.Payment, provider models.ConnectorProvider) EventMessage {
payload := paymentMessagePayload{
ID: payment.ID.String(),
Reference: payment.Reference,
Type: payment.Type,
Status: payment.Status,
InitialAmount: payment.Amount,
Scheme: payment.Scheme,
Asset: payment.Asset,
CreatedAt: payment.CreatedAt,
Amount: payment.Amount,
Provider: provider.String(),
}

return EventMessage{
Date: time.Now().UTC(),
App: EventApp,
Version: EventVersion,
Type: EventTypeSavedPayments,
Payload: payload,
}
}

0 comments on commit c6b8c3d

Please sign in to comment.