Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: send kafka message when connector is reset #82

Merged
merged 1 commit into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/app/api/connectormodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func addConnector[ConnectorConfig models.ConnectorConfigObject](loader integrati
})

return integration.NewConnectorManager[ConnectorConfig](logger,
store, loader, schedulerFactory)
store, loader, schedulerFactory, publisher)
}),
fx.Provide(fx.Annotate(func(cm *integration.ConnectorManager[ConnectorConfig],
) connectorHandler {
Expand Down
8 changes: 7 additions & 1 deletion internal/app/ingestion/accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

"github.com/formancehq/payments/internal/app/messages"

"github.com/formancehq/payments/internal/app/models"
)

Expand Down Expand Up @@ -44,7 +46,11 @@ func (i *DefaultIngester) IngestAccounts(ctx context.Context, batch AccountBatch
return fmt.Errorf("error upserting accounts: %w", err)
}

i.publish(ctx, TopicAccounts, NewEventSavedAccounts(accounts))
err := i.publisher.Publish(ctx, messages.TopicPayments,
messages.NewEventSavedAccounts(accounts))
if err != nil {
i.logger.Errorf("Publishing message: %w", err)
}

endedAt := time.Now()

Expand Down
105 changes: 0 additions & 105 deletions internal/app/ingestion/message.go

This file was deleted.

12 changes: 10 additions & 2 deletions internal/app/ingestion/payments.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"time"

"github.com/formancehq/payments/internal/app/messages"

"github.com/formancehq/payments/internal/app/models"
)

Expand Down Expand Up @@ -58,12 +60,18 @@ func (i *DefaultIngester) IngestPayments(ctx context.Context, batch PaymentBatch
return fmt.Errorf("error marshaling task state: %w", err)
}

if err := i.repo.UpdateTaskState(ctx, i.provider, taskDescriptor, taskState); err != nil {
if err = i.repo.UpdateTaskState(ctx, i.provider, taskDescriptor, taskState); err != nil {
return fmt.Errorf("error updating task state: %w", err)
}

for paymentIdx := range allPayments {
i.publish(ctx, TopicPayments, NewEventSavedPayments(allPayments[paymentIdx], i.provider))
err = i.publisher.Publish(ctx, messages.TopicPayments,
messages.NewEventSavedPayments(allPayments[paymentIdx], i.provider))
if err != nil {
i.logger.Errorf("Publishing message: %w", err)

continue
}
}

endedAt := time.Now()
Expand Down
20 changes: 19 additions & 1 deletion internal/app/integration/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package integration
import (
"context"

"github.com/formancehq/payments/internal/app/messages"

"github.com/formancehq/go-libs/sharedpublish"

"github.com/formancehq/payments/internal/app/storage"

"github.com/google/uuid"
Expand All @@ -29,6 +33,7 @@ type ConnectorManager[Config models.ConnectorConfigObject] struct {
store Repository
schedulerFactory TaskSchedulerFactory
scheduler *task.DefaultTaskScheduler
publisher sharedpublish.Publisher
}

func (l *ConnectorManager[ConnectorConfig]) Enable(ctx context.Context) error {
Expand Down Expand Up @@ -236,14 +241,26 @@ func (l *ConnectorManager[ConnectorConfig]) Reset(ctx context.Context) error {
return err
}

return l.Install(ctx, *config)
err = l.Install(ctx, *config)
if err != nil {
return err
}

err = l.publisher.Publish(ctx, messages.TopicPayments,
messages.NewEventResetConnector(l.loader.Name()))
if err != nil {
l.logger.Errorf("Publishing message: %w", err)
}

return nil
}

func NewConnectorManager[ConnectorConfig models.ConnectorConfigObject](
logger sharedlogging.Logger,
store Repository,
loader Loader[ConnectorConfig],
schedulerFactory TaskSchedulerFactory,
publisher sharedpublish.Publisher,
) *ConnectorManager[ConnectorConfig] {
return &ConnectorManager[ConnectorConfig]{
logger: logger.WithFields(map[string]interface{}{
Expand All @@ -253,5 +270,6 @@ func NewConnectorManager[ConnectorConfig models.ConnectorConfigObject](
store: store,
loader: loader,
schedulerFactory: schedulerFactory,
publisher: publisher,
}
}
2 changes: 1 addition & 1 deletion internal/app/integration/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func withManager[ConnectorConfig models.ConnectorConfigObject](builder *Connecto
WithAllowedTasks(1).
Build()
manager := NewConnectorManager[ConnectorConfig](logger, managerStore, loader,
schedulerFactory)
schedulerFactory, nil)

defer func() {
_ = manager.Uninstall(context.Background())
Expand Down
37 changes: 37 additions & 0 deletions internal/app/messages/accounts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package messages

import (
"time"

"github.com/formancehq/payments/internal/app/models"
)

type accountMessagePayload struct {
ID string `json:"id"`
CreatedAt time.Time `json:"createdAt"`
Reference string `json:"reference"`
Provider string `json:"provider"`
Type models.AccountType `json:"type"`
}

func NewEventSavedAccounts(accounts []models.Account) EventMessage {
payload := make([]accountMessagePayload, len(accounts))

for accountIdx, account := range accounts {
payload[accountIdx] = accountMessagePayload{
ID: account.ID.String(),
CreatedAt: account.CreatedAt,
Reference: account.Reference,
Provider: account.Provider,
Type: account.Type,
}
}

return EventMessage{
Date: time.Now().UTC(),
App: EventApp,
Version: EventVersion,
Type: EventTypeSavedAccounts,
Payload: payload,
}
}
25 changes: 25 additions & 0 deletions internal/app/messages/connectors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package messages

import (
"time"

"github.com/formancehq/payments/internal/app/models"
)

type connectorMessagePayload struct {
CreatedAt time.Time `json:"createdAt"`
Connector models.ConnectorProvider `json:"connector"`
}

func NewEventResetConnector(connector models.ConnectorProvider) EventMessage {
return EventMessage{
Date: time.Now().UTC(),
App: EventApp,
Version: EventVersion,
Type: EventTypeConnectorReset,
Payload: connectorMessagePayload{
CreatedAt: time.Now().UTC(),
Connector: connector,
},
}
}
25 changes: 25 additions & 0 deletions internal/app/messages/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package messages

import (
"time"
)

const (
TopicPayments = "payments"
TopicConnectors = "connectors"

EventVersion = "v1"
EventApp = "payments"

EventTypeSavedPayments = "SAVED_PAYMENT"
EventTypeSavedAccounts = "SAVED_ACCOUNT"
EventTypeConnectorReset = "CONNECTOR_RESET"
)

type EventMessage struct {
Date time.Time `json:"date"`
App string `json:"app"`
Version string `json:"version"`
Type string `json:"type"`
Payload any `json:"payload"`
}
45 changes: 45 additions & 0 deletions internal/app/messages/payments.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package messages

import (
"time"

"github.com/formancehq/payments/internal/app/models"
)

type paymentMessagePayload struct {
ID string `json:"id"`
Reference string `json:"reference"`
CreatedAt time.Time `json:"createdAt"`
Provider string `json:"provider"`
Type models.PaymentType `json:"type"`
Status models.PaymentStatus `json:"status"`
Scheme models.PaymentScheme `json:"scheme"`
Asset models.PaymentAsset `json:"asset"`

// TODO: Remove 'initialAmount' once frontend has switched to 'amount
InitialAmount int64 `json:"initialAmount"`
Amount int64 `json:"amount"`
}

func NewEventSavedPayments(payment *models.Payment, provider models.ConnectorProvider) EventMessage {
payload := paymentMessagePayload{
ID: payment.ID.String(),
Reference: payment.Reference,
Type: payment.Type,
Status: payment.Status,
InitialAmount: payment.Amount,
Scheme: payment.Scheme,
Asset: payment.Asset,
CreatedAt: payment.CreatedAt,
Amount: payment.Amount,
Provider: provider.String(),
}

return EventMessage{
Date: time.Now().UTC(),
App: EventApp,
Version: EventVersion,
Type: EventTypeSavedPayments,
Payload: payload,
}
}