Skip to content

Commit

Permalink
feat: payments metadata (#48)
Browse files Browse the repository at this point in the history
* feat: payments metadata

Signed-off-by: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com>

* fix: lint & tests

Signed-off-by: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com>

* fix: tests

Signed-off-by: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com>

Signed-off-by: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com>
  • Loading branch information
darkmatterpool authored Oct 31, 2022
1 parent 891d658 commit 4ea8b5d
Show file tree
Hide file tree
Showing 14 changed files with 183 additions and 19 deletions.
4 changes: 2 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ linters-settings:
- performance
- style
cyclop:
max-complexity: 20
max-complexity: 30
gocyclo:
min-complexity: 20
min-complexity: 30
goimports:
local-prefixes: github.com/numary/payments
govet:
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ lint:
lint-fix:
golangci-lint run --fix

run-tests:
go test -race -count=1 ./...

47 changes: 44 additions & 3 deletions internal/pkg/connectors/dummypay/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dummypay

import (
"encoding/json"
"fmt"
"time"
)
Expand All @@ -11,16 +12,16 @@ type Config struct {
Directory string `json:"directory" yaml:"directory" bson:"directory"`

// FilePollingPeriod is the period between file polling.
FilePollingPeriod time.Duration `json:"filePollingPeriod" yaml:"filePollingPeriod" bson:"filePollingPeriod"`
FilePollingPeriod Duration `json:"filePollingPeriod" yaml:"filePollingPeriod" bson:"filePollingPeriod"`

// FileGenerationPeriod is the period between file generation
FileGenerationPeriod time.Duration `json:"fileGenerationPeriod" yaml:"fileGenerationPeriod" bson:"fileGenerationPeriod"`
FileGenerationPeriod Duration `json:"fileGenerationPeriod" yaml:"fileGenerationPeriod" bson:"fileGenerationPeriod"`
}

// String returns a string representation of the configuration.
func (cfg Config) String() string {
return fmt.Sprintf("directory: %s, filePollingPeriod: %s, fileGenerationPeriod: %s",
cfg.Directory, cfg.FilePollingPeriod, cfg.FileGenerationPeriod)
cfg.Directory, cfg.FilePollingPeriod.String(), cfg.FileGenerationPeriod.String())
}

// Validate validates the configuration.
Expand All @@ -44,3 +45,43 @@ func (cfg Config) Validate() error {

return nil
}

type Duration time.Duration

func (d *Duration) String() string {
return time.Duration(*d).String()
}

func (d *Duration) Duration() time.Duration {
return time.Duration(*d)
}

func (d *Duration) MarshalJSON() ([]byte, error) {
return json.Marshal(time.Duration(*d).String())
}

func (d *Duration) UnmarshalJSON(b []byte) error {
var durationValue interface{}

if err := json.Unmarshal(b, &durationValue); err != nil {
return err
}

switch value := durationValue.(type) {
case float64:
*d = Duration(time.Duration(value))

return nil
case string:
tmp, err := time.ParseDuration(value)
if err != nil {
return err
}

*d = Duration(tmp)

return nil
default:
return ErrDurationInvalid
}
}
4 changes: 2 additions & 2 deletions internal/pkg/connectors/dummypay/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ func TestConfigString(t *testing.T) {

config := Config{
Directory: "test",
FilePollingPeriod: time.Second,
FileGenerationPeriod: time.Minute,
FilePollingPeriod: Duration(time.Second),
FileGenerationPeriod: Duration(time.Minute),
}

assert.Equal(t, "directory: test, filePollingPeriod: 1s, fileGenerationPeriod: 1m0s", config.String())
Expand Down
3 changes: 3 additions & 0 deletions internal/pkg/connectors/dummypay/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@ var (

// ErrMissingTask is returned when the task is missing.
ErrMissingTask = errors.New("task is not implemented")

// ErrDurationInvalid is returned when the duration is invalid.
ErrDurationInvalid = errors.New("duration is invalid")
)
4 changes: 2 additions & 2 deletions internal/pkg/connectors/dummypay/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ const (
// ApplyDefaults applies default values to the configuration.
func (l *Loader) ApplyDefaults(cfg Config) Config {
if cfg.FileGenerationPeriod == 0 {
cfg.FileGenerationPeriod = defaultFileGenerationPeriod
cfg.FileGenerationPeriod = Duration(defaultFileGenerationPeriod)
}

if cfg.FilePollingPeriod == 0 {
cfg.FilePollingPeriod = defaultFilePollingPeriod
cfg.FilePollingPeriod = Duration(defaultFilePollingPeriod)
}

return cfg
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/connectors/dummypay/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func TestLoader(t *testing.T) {
assert.Equal(t, connectorName, loader.Name())
assert.Equal(t, 10, loader.AllowTasks())
assert.Equal(t, Config{
FilePollingPeriod: 10 * time.Second,
FileGenerationPeriod: 5 * time.Second,
FilePollingPeriod: Duration(10 * time.Second),
FileGenerationPeriod: Duration(5 * time.Second),
}, loader.ApplyDefaults(config))

assert.EqualValues(t, newConnector(logger, config, newFS()), loader.Load(logger, config))
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/connectors/dummypay/task_generate_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func taskGenerateFiles(config Config, fs fs) task.Task {
select {
case <-ctx.Done():
return nil
case <-time.After(config.FileGenerationPeriod):
case <-time.After(config.FileGenerationPeriod.Duration()):
err := generateFile(config, fs)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/connectors/dummypay/task_read_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func taskReadFiles(config Config, fs fs) task.Task {
select {
case <-ctx.Done():
return nil
case <-time.After(config.FilePollingPeriod):
case <-time.After(config.FilePollingPeriod.Duration()):
files, err := parseFilesToIngest(config, fs)
if err != nil {
return fmt.Errorf("error parsing files to ingest: %w", err)
Expand Down
46 changes: 43 additions & 3 deletions internal/pkg/ingestion/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type BatchElement struct {
Referenced payments.Referenced
Payment *payments.Data
Adjustment *payments.Adjustment
Metadata payments.Metadata
Forward bool
}

Expand Down Expand Up @@ -60,11 +61,38 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym

var update bson.M

if elem.Adjustment != nil && elem.Payment != nil {
if elem.Adjustment == nil && elem.Payment == nil {
return nil, errors.New("either adjustment or payment must be provided")
}

var err error
var metadataChanges payments.MetadataChanges

if elem.Payment != nil {
ret := i.db.Collection(payments.Collection).FindOne(
ctx,
payments.Identifier{
Referenced: elem.Referenced,
Provider: i.provider,
})
if ret.Err() != nil && !errors.Is(ret.Err(), mongo.ErrNoDocuments) {
logger.Errorf("Error retrieving payment: %s", ret.Err())

return nil, fmt.Errorf("error retrieving payment: %w", ret.Err())
}

if ret != nil && ret.Err() == nil {
payment := payments.Payment{}

err := ret.Decode(&payment)
if err != nil {
return nil, err
}

metadataChanges = payment.MergeMetadata(elem.Metadata)

elem.Metadata = metadataChanges.After
}
}

switch {
case elem.Forward && elem.Adjustment != nil:
Expand All @@ -78,7 +106,7 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym
"$set": bson.M{
"status": elem.Adjustment.Status,
"raw": elem.Adjustment.Raw,
"data": elem.Adjustment.Date,
"date": elem.Adjustment.Date,
},
}
case elem.Forward && elem.Payment != nil:
Expand All @@ -97,6 +125,7 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym
Raw: elem.Payment.Raw,
},
},
Metadata: elem.Metadata,
},
}
case !elem.Forward && elem.Adjustment != nil:
Expand Down Expand Up @@ -166,6 +195,17 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym
return nil, err
}

if metadataChanges.HasChanged() {
logger.WithFields(map[string]interface{}{
"metadata": payment.Metadata,
}).Debugf("Metadata changed")

_, err = i.db.Collection(payments.MetadataChangelogCollection).InsertOne(ctx, metadataChanges)
if err != nil {
return nil, err
}
}

allPayments = append(allPayments, payment)
}

Expand Down
9 changes: 9 additions & 0 deletions internal/pkg/ingestion/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ func TestIngester(t *testing.T) {
ingester := NewDefaultIngester(provider, uuid.New(), mt.DB, sharedlogging.NewNoOpLogger(), nil)

mt.AddMockResponses(
mtest.CreateCursorResponse(1, "test.test", mtest.FirstBatch, bson.D{
{Key: "ok", Value: 1},
{Key: "value", Value: bson.D{}},
}),
bson.D{
{Key: "ok", Value: 1},
{Key: "value", Value: bson.D{}},
}, // Find payment update
bson.D{
{Key: "ok", Value: 1},
{Key: "value", Value: bson.D{}},
Expand All @@ -51,6 +59,7 @@ func TestIngester(t *testing.T) {
}, State{
Counter: 1,
})

require.NoError(t, err)
})
}
7 changes: 4 additions & 3 deletions internal/pkg/payments/collections.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package payments

const (
Collection = "Payments"
ConnectorsCollection = "Connectors"
TasksCollection = "Tasks"
Collection = "Payments"
ConnectorsCollection = "Connectors"
TasksCollection = "Tasks"
MetadataChangelogCollection = "MetadataChangelog"
)
66 changes: 66 additions & 0 deletions internal/pkg/payments/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package payments

import (
"fmt"
"time"
)

type Metadata map[string]any

type MetadataChanges struct {
PaymentID string `json:"paymentID" bson:"paymentID"`
OccurredAt time.Time `json:"occurredAt" bson:"occurredAt"`
Before Metadata `json:"before" bson:"before"`
After Metadata `json:"after" bson:"after"`
}

func (mc MetadataChanges) HasChanged() bool {
if mc.Before == nil {
return false
}

return !mc.Before.Equal(mc.After)
}

func (p *Payment) MergeMetadata(metadata Metadata) MetadataChanges {
changes := MetadataChanges{
PaymentID: p.Identifier.String(),
OccurredAt: time.Now(),
Before: copyMap(p.Metadata),
}

if p.Metadata == nil {
p.Metadata = make(Metadata)
}

for key, value := range metadata {
p.Metadata[key] = value
}

changes.After = p.Metadata

return changes
}

func (m Metadata) Equal(comparableMetadata Metadata) bool {
if len(m) != len(comparableMetadata) {
return false
}

for key, value := range m {
if v, ok := comparableMetadata[key]; !ok || fmt.Sprint(v) != fmt.Sprint(value) {
return false
}
}

return true
}

func copyMap[K string, V any](m map[K]V) map[K]V {
result := make(map[K]V)
for k, v := range m {
result[k] = v
}

return result
}
1 change: 1 addition & 0 deletions internal/pkg/payments/payment.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ type Payment struct {
Identifier `bson:",inline"`
Data `bson:",inline"`
Adjustments []Adjustment `json:"adjustments" bson:"adjustments"`
Metadata Metadata `json:"metadata" bson:"metadata"`
}

func (p Payment) HasInitialValue() bool {
Expand Down

0 comments on commit 4ea8b5d

Please sign in to comment.