Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: payments metadata #48

Merged
merged 3 commits into from
Oct 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ linters-settings:
- performance
- style
cyclop:
max-complexity: 20
max-complexity: 30
gocyclo:
min-complexity: 20
min-complexity: 30
goimports:
local-prefixes: github.com/numary/payments
govet:
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ lint:
lint-fix:
golangci-lint run --fix

run-tests:
go test -race -count=1 ./...

47 changes: 44 additions & 3 deletions internal/pkg/connectors/dummypay/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dummypay

import (
"encoding/json"
"fmt"
"time"
)
Expand All @@ -11,16 +12,16 @@ type Config struct {
Directory string `json:"directory" yaml:"directory" bson:"directory"`

// FilePollingPeriod is the period between file polling.
FilePollingPeriod time.Duration `json:"filePollingPeriod" yaml:"filePollingPeriod" bson:"filePollingPeriod"`
FilePollingPeriod Duration `json:"filePollingPeriod" yaml:"filePollingPeriod" bson:"filePollingPeriod"`

// FileGenerationPeriod is the period between file generation
FileGenerationPeriod time.Duration `json:"fileGenerationPeriod" yaml:"fileGenerationPeriod" bson:"fileGenerationPeriod"`
FileGenerationPeriod Duration `json:"fileGenerationPeriod" yaml:"fileGenerationPeriod" bson:"fileGenerationPeriod"`
}

// String returns a string representation of the configuration.
func (cfg Config) String() string {
return fmt.Sprintf("directory: %s, filePollingPeriod: %s, fileGenerationPeriod: %s",
cfg.Directory, cfg.FilePollingPeriod, cfg.FileGenerationPeriod)
cfg.Directory, cfg.FilePollingPeriod.String(), cfg.FileGenerationPeriod.String())
}

// Validate validates the configuration.
Expand All @@ -44,3 +45,43 @@ func (cfg Config) Validate() error {

return nil
}

type Duration time.Duration

func (d *Duration) String() string {
return time.Duration(*d).String()
}

func (d *Duration) Duration() time.Duration {
return time.Duration(*d)
}

func (d *Duration) MarshalJSON() ([]byte, error) {
return json.Marshal(time.Duration(*d).String())
}

func (d *Duration) UnmarshalJSON(b []byte) error {
var durationValue interface{}

if err := json.Unmarshal(b, &durationValue); err != nil {
return err
}

switch value := durationValue.(type) {
case float64:
*d = Duration(time.Duration(value))

return nil
case string:
tmp, err := time.ParseDuration(value)
if err != nil {
return err
}

*d = Duration(tmp)

return nil
default:
return ErrDurationInvalid
}
}
4 changes: 2 additions & 2 deletions internal/pkg/connectors/dummypay/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ func TestConfigString(t *testing.T) {

config := Config{
Directory: "test",
FilePollingPeriod: time.Second,
FileGenerationPeriod: time.Minute,
FilePollingPeriod: Duration(time.Second),
FileGenerationPeriod: Duration(time.Minute),
}

assert.Equal(t, "directory: test, filePollingPeriod: 1s, fileGenerationPeriod: 1m0s", config.String())
Expand Down
3 changes: 3 additions & 0 deletions internal/pkg/connectors/dummypay/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@ var (

// ErrMissingTask is returned when the task is missing.
ErrMissingTask = errors.New("task is not implemented")

// ErrDurationInvalid is returned when the duration is invalid.
ErrDurationInvalid = errors.New("duration is invalid")
)
4 changes: 2 additions & 2 deletions internal/pkg/connectors/dummypay/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ const (
// ApplyDefaults applies default values to the configuration.
func (l *Loader) ApplyDefaults(cfg Config) Config {
if cfg.FileGenerationPeriod == 0 {
cfg.FileGenerationPeriod = defaultFileGenerationPeriod
cfg.FileGenerationPeriod = Duration(defaultFileGenerationPeriod)
}

if cfg.FilePollingPeriod == 0 {
cfg.FilePollingPeriod = defaultFilePollingPeriod
cfg.FilePollingPeriod = Duration(defaultFilePollingPeriod)
}

return cfg
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/connectors/dummypay/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func TestLoader(t *testing.T) {
assert.Equal(t, connectorName, loader.Name())
assert.Equal(t, 10, loader.AllowTasks())
assert.Equal(t, Config{
FilePollingPeriod: 10 * time.Second,
FileGenerationPeriod: 5 * time.Second,
FilePollingPeriod: Duration(10 * time.Second),
FileGenerationPeriod: Duration(5 * time.Second),
}, loader.ApplyDefaults(config))

assert.EqualValues(t, newConnector(logger, config, newFS()), loader.Load(logger, config))
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/connectors/dummypay/task_generate_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func taskGenerateFiles(config Config, fs fs) task.Task {
select {
case <-ctx.Done():
return nil
case <-time.After(config.FileGenerationPeriod):
case <-time.After(config.FileGenerationPeriod.Duration()):
err := generateFile(config, fs)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/connectors/dummypay/task_read_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func taskReadFiles(config Config, fs fs) task.Task {
select {
case <-ctx.Done():
return nil
case <-time.After(config.FilePollingPeriod):
case <-time.After(config.FilePollingPeriod.Duration()):
files, err := parseFilesToIngest(config, fs)
if err != nil {
return fmt.Errorf("error parsing files to ingest: %w", err)
Expand Down
46 changes: 43 additions & 3 deletions internal/pkg/ingestion/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type BatchElement struct {
Referenced payments.Referenced
Payment *payments.Data
Adjustment *payments.Adjustment
Metadata payments.Metadata
Forward bool
}

Expand Down Expand Up @@ -60,11 +61,38 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym

var update bson.M

if elem.Adjustment != nil && elem.Payment != nil {
if elem.Adjustment == nil && elem.Payment == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case should have been covered by a unit test

return nil, errors.New("either adjustment or payment must be provided")
}

var err error
var metadataChanges payments.MetadataChanges

if elem.Payment != nil {
ret := i.db.Collection(payments.Collection).FindOne(
ctx,
payments.Identifier{
Referenced: elem.Referenced,
Provider: i.provider,
})
if ret.Err() != nil && !errors.Is(ret.Err(), mongo.ErrNoDocuments) {
logger.Errorf("Error retrieving payment: %s", ret.Err())

return nil, fmt.Errorf("error retrieving payment: %w", ret.Err())
}

if ret != nil && ret.Err() == nil {
payment := payments.Payment{}

err := ret.Decode(&payment)
if err != nil {
return nil, err
}

metadataChanges = payment.MergeMetadata(elem.Metadata)

elem.Metadata = metadataChanges.After
}
}

switch {
case elem.Forward && elem.Adjustment != nil:
Expand All @@ -78,7 +106,7 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym
"$set": bson.M{
"status": elem.Adjustment.Status,
"raw": elem.Adjustment.Raw,
"data": elem.Adjustment.Date,
"date": elem.Adjustment.Date,
},
}
case elem.Forward && elem.Payment != nil:
Expand All @@ -97,6 +125,7 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym
Raw: elem.Payment.Raw,
},
},
Metadata: elem.Metadata,
},
}
case !elem.Forward && elem.Adjustment != nil:
Expand Down Expand Up @@ -166,6 +195,17 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym
return nil, err
}

if metadataChanges.HasChanged() {
logger.WithFields(map[string]interface{}{
"metadata": payment.Metadata,
}).Debugf("Metadata changed")

_, err = i.db.Collection(payments.MetadataChangelogCollection).InsertOne(ctx, metadataChanges)
if err != nil {
return nil, err
}
}

allPayments = append(allPayments, payment)
}

Expand Down
9 changes: 9 additions & 0 deletions internal/pkg/ingestion/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ func TestIngester(t *testing.T) {
ingester := NewDefaultIngester(provider, uuid.New(), mt.DB, sharedlogging.NewNoOpLogger(), nil)

mt.AddMockResponses(
mtest.CreateCursorResponse(1, "test.test", mtest.FirstBatch, bson.D{
{Key: "ok", Value: 1},
{Key: "value", Value: bson.D{}},
}),
bson.D{
{Key: "ok", Value: 1},
{Key: "value", Value: bson.D{}},
}, // Find payment update
bson.D{
{Key: "ok", Value: 1},
{Key: "value", Value: bson.D{}},
Expand All @@ -51,6 +59,7 @@ func TestIngester(t *testing.T) {
}, State{
Counter: 1,
})

require.NoError(t, err)
})
}
7 changes: 4 additions & 3 deletions internal/pkg/payments/collections.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package payments

const (
Collection = "Payments"
ConnectorsCollection = "Connectors"
TasksCollection = "Tasks"
Collection = "Payments"
ConnectorsCollection = "Connectors"
TasksCollection = "Tasks"
MetadataChangelogCollection = "MetadataChangelog"
)
66 changes: 66 additions & 0 deletions internal/pkg/payments/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package payments

import (
"fmt"
"time"
)

type Metadata map[string]any

type MetadataChanges struct {
PaymentID string `json:"paymentID" bson:"paymentID"`
OccurredAt time.Time `json:"occurredAt" bson:"occurredAt"`
Before Metadata `json:"before" bson:"before"`
After Metadata `json:"after" bson:"after"`
}

func (mc MetadataChanges) HasChanged() bool {
if mc.Before == nil {
return false
}

return !mc.Before.Equal(mc.After)
}

func (p *Payment) MergeMetadata(metadata Metadata) MetadataChanges {
changes := MetadataChanges{
PaymentID: p.Identifier.String(),
OccurredAt: time.Now(),
Before: copyMap(p.Metadata),
}

if p.Metadata == nil {
p.Metadata = make(Metadata)
}

for key, value := range metadata {
p.Metadata[key] = value
}

changes.After = p.Metadata

return changes
}

func (m Metadata) Equal(comparableMetadata Metadata) bool {
if len(m) != len(comparableMetadata) {
return false
}

for key, value := range m {
if v, ok := comparableMetadata[key]; !ok || fmt.Sprint(v) != fmt.Sprint(value) {
return false
}
}

return true
}

func copyMap[K string, V any](m map[K]V) map[K]V {
result := make(map[K]V)
for k, v := range m {
result[k] = v
}

return result
}
1 change: 1 addition & 0 deletions internal/pkg/payments/payment.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type Payment struct {
Identifier `bson:",inline"`
Data `bson:",inline"`
Adjustments []Adjustment `json:"adjustments" bson:"adjustments"`
Metadata Metadata `json:"metadata" bson:"metadata"`
}

func (p Payment) HasInitialValue() bool {
Expand Down