Skip to content

Commit

Permalink
feat: transfer initiation (#96)
Browse files Browse the repository at this point in the history
* feat: transfer initiation POC

Signed-off-by: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com>

* feat: transfer initiation with background processing and status reporting

Signed-off-by: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com>

* fix: pluralize transfers

Signed-off-by: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com>

* fix: oas spec

Signed-off-by: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com>

* fix: linter issues

Signed-off-by: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com>

* fix: migration conflict

Signed-off-by: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com>

---------

Signed-off-by: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com>
  • Loading branch information
darkmatterpool authored Feb 3, 2023
1 parent 09953b5 commit 5e3b644
Show file tree
Hide file tree
Showing 34 changed files with 886 additions and 71 deletions.
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
linters-settings:
interfacebloat:
max: 20
dupl:
threshold: 30
funlen:
Expand Down
150 changes: 150 additions & 0 deletions internal/app/api/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"net/http"
"time"

"github.com/pkg/errors"

"github.com/formancehq/payments/internal/app/storage"

"github.com/google/uuid"
Expand Down Expand Up @@ -251,6 +253,154 @@ func reset[Config models.ConnectorConfigObject](connectorManager *integration.Co
}
}

type transferRequest struct {
Amount int64 `json:"amount"`
Source string `json:"source"`
Destination string `json:"destination"`
Asset string `json:"asset"`

currency string
}

func (req *transferRequest) validate() error {
if req.Amount <= 0 {
return errors.New("amount must be greater than 0")
}

if req.Asset == "" {
return errors.New("asset is required")
}

if len(req.Asset) < 3 { //nolint:gomnd // allow length 3 for POC
return errors.New("asset is invalid")
}

req.currency = req.Asset[:3]

if req.Destination == "" {
return errors.New("destination is required")
}

return nil
}

type initiateTransferResponse struct {
ID string `json:"id"`
}

func initiateTransfer[Config models.ConnectorConfigObject](connectorManager *integration.ConnectorManager[Config],
) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req transferRequest

err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
handleError(w, r, err)

return
}

err = req.validate()
if err != nil {
handleErrorBadRequest(w, r, err)

return
}

installed, err := connectorManager.IsInstalled(r.Context())
if err != nil {
handleError(w, r, err)

return
}

if !installed {
handleError(w, r, errors.New("connector not installed"))

return
}

transfer := integration.Transfer{
Source: req.Source,
Destination: req.Destination,
Currency: req.currency,
Amount: req.Amount,
}

transferID, err := connectorManager.InitiateTransfer(r.Context(), transfer)
if err != nil {
handleError(w, r, err)

return
}

err = json.NewEncoder(w).Encode(api.BaseResponse[initiateTransferResponse]{
Data: &initiateTransferResponse{
ID: transferID.String(),
},
})
if err != nil {
panic(err)
}
}
}

type listTransfersResponseElement struct {
ID string `json:"id"`
Source string `json:"source"`
Destination string `json:"destination"`
Amount int64 `json:"amount"`
Currency string `json:"asset"`
Status string `json:"status"`
Error *string `json:"error"`
}

func listTransfers[Config models.ConnectorConfigObject](connectorManager *integration.ConnectorManager[Config],
) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
installed, err := connectorManager.IsInstalled(r.Context())
if err != nil {
handleError(w, r, err)

return
}

if !installed {
handleError(w, r, errors.New("connector not installed"))

return
}

transfers, err := connectorManager.ListTransfers(r.Context())
if err != nil {
handleError(w, r, err)

return
}

response := make([]listTransfersResponseElement, len(transfers))

for transferIdx := range transfers {
response[transferIdx] = listTransfersResponseElement{
ID: transfers[transferIdx].ID.String(),
Source: transfers[transferIdx].Source,
Destination: transfers[transferIdx].Destination,
Amount: transfers[transferIdx].Amount,
Currency: transfers[transferIdx].Currency,
Status: transfers[transferIdx].Status.String(),
Error: transfers[transferIdx].Error,
}
}

err = json.NewEncoder(w).Encode(api.BaseResponse[[]listTransfersResponseElement]{
Data: &response,
})
if err != nil {
panic(err)
}
}
}

func connectorNotInstalled[Config models.ConnectorConfigObject](connectorManager *integration.ConnectorManager[Config],
w http.ResponseWriter, r *http.Request,
) bool {
Expand Down
2 changes: 1 addition & 1 deletion internal/app/api/connectormodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func addConnector[ConnectorConfig models.ConnectorConfigObject](loader integrati
fx.Provide(func(store *storage.Storage,
publisher publish.Publisher,
) *integration.ConnectorManager[ConnectorConfig] {
logger := logging.GetLogger(context.Background())
logger := logging.GetLogger(context.TODO())

schedulerFactory := integration.TaskSchedulerFactoryFn(func(
resolver task.Resolver, maxTasks int,
Expand Down
7 changes: 5 additions & 2 deletions internal/app/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ func httpRouter(store *storage.Storage, serviceInfo api.ServiceInfo, connectorHa

connectorGroup.Path("/configs").Handler(connectorConfigsHandler())

// TODO: It's not ideal to define it explicitly here
// Refactor it when refactoring the HTTP lib.
// Deprecated
// TODO: Remove this endpoint
// Use /connectors/stripe/transfers instead
connectorGroup.Path("/stripe/transfers").Methods(http.MethodPost).
Handler(handleStripeTransfers(store))

Expand All @@ -74,6 +75,8 @@ func connectorRouter[Config models.ConnectorConfigObject](
addRoute(r, provider, "/reset", http.MethodPost, reset(manager))
addRoute(r, provider, "/tasks", http.MethodGet, listTasks(manager))
addRoute(r, provider, "/tasks/{taskID}", http.MethodGet, readTask(manager))
addRoute(r, provider, "/transfers", http.MethodPost, initiateTransfer(manager))
addRoute(r, provider, "/transfers", http.MethodGet, listTransfers(manager))

return r
}
Expand Down
5 changes: 5 additions & 0 deletions internal/app/connectors/currencycloud/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ type Connector struct {
cfg Config
}

func (c *Connector) InitiateTransfer(ctx task.ConnectorContext, transfer models.Transfer) error {
// TODO implement me
panic("implement me")
}

func (c *Connector) Install(ctx task.ConnectorContext) error {
taskDescriptor, err := models.EncodeTaskDescriptor(TaskDescriptor{Name: taskNameFetchTransactions})
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions internal/app/connectors/dummypay/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"github.com/formancehq/payments/internal/app/integration"

"github.com/formancehq/payments/internal/app/models"

"github.com/formancehq/payments/internal/app/task"
Expand All @@ -21,6 +23,11 @@ type Connector struct {
fs fs
}

func (c *Connector) InitiateTransfer(ctx task.ConnectorContext, transfer models.Transfer) error {
// TODO implement me
panic("implement me")
}

// Install executes post-installation steps to read and generate files.
// It is called after the connector is installed.
func (c *Connector) Install(ctx task.ConnectorContext) error {
Expand Down Expand Up @@ -70,6 +77,8 @@ func (c *Connector) Resolve(descriptor models.TaskDescriptor) task.Task {
return handleResolve(c.cfg, taskDescriptor, c.fs)
}

var _ integration.Connector = &Connector{}

func newConnector(logger logging.Logger, cfg Config, fs fs) *Connector {
return &Connector{
logger: logger.WithFields(map[string]any{
Expand Down
4 changes: 2 additions & 2 deletions internal/app/connectors/dummypay/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestConnector(t *testing.T) {
t.Parallel()

config := Config{}
logger := logging.GetLogger(context.Background())
logger := logging.GetLogger(context.TODO())

fileSystem := newTestFS()

Expand Down Expand Up @@ -75,5 +75,5 @@ func TestConnector(t *testing.T) {
reflect.ValueOf(connector.Resolve(taskDescriptor)).String(),
)

assert.NoError(t, connector.Uninstall(context.Background()))
assert.NoError(t, connector.Uninstall(context.TODO()))
}
2 changes: 1 addition & 1 deletion internal/app/connectors/dummypay/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestLoader(t *testing.T) {
t.Parallel()

config := Config{}
logger := logging.GetLogger(context.Background())
logger := logging.GetLogger(context.TODO())

loader := NewLoader()

Expand Down
5 changes: 5 additions & 0 deletions internal/app/connectors/modulr/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ type Connector struct {
cfg Config
}

func (c *Connector) InitiateTransfer(ctx task.ConnectorContext, transfer models.Transfer) error {
// TODO implement me
panic("implement me")
}

func (c *Connector) Install(ctx task.ConnectorContext) error {
taskDescriptor, err := models.EncodeTaskDescriptor(TaskDescriptor{
Name: "Fetch accounts from client",
Expand Down
18 changes: 18 additions & 0 deletions internal/app/connectors/stripe/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package stripe
import (
"context"

"github.com/google/uuid"

"github.com/formancehq/payments/internal/app/models"

"github.com/formancehq/payments/internal/app/integration"
Expand Down Expand Up @@ -44,9 +46,25 @@ func (c *Connector) Resolve(descriptor models.TaskDescriptor) task.Task {
return MainTask(c.cfg)
}

if taskDescriptor.TransferID != uuid.Nil {
return TransferTask(c.cfg, taskDescriptor.TransferID)
}

return ConnectedAccountTask(c.cfg, taskDescriptor.Account)
}

func (c *Connector) InitiateTransfer(ctx task.ConnectorContext, transfer models.Transfer) error {
descriptor, err := models.EncodeTaskDescriptor(TaskDescriptor{
Name: "Task to initiate transfer",
TransferID: transfer.ID,
})
if err != nil {
return err
}

return ctx.Scheduler().Schedule(descriptor, false)
}

var _ integration.Connector = &Connector{}

func newConnector(logger logging.Logger, cfg Config) *Connector {
Expand Down
9 changes: 6 additions & 3 deletions internal/app/connectors/stripe/descriptor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package stripe

import "github.com/google/uuid"

type TaskDescriptor struct {
Name string `json:"name" yaml:"name" bson:"name"`
Main bool `json:"main,omitempty" yaml:"main" bson:"main"`
Account string `json:"account,omitempty" yaml:"account" bson:"account"`
Name string `json:"name" yaml:"name" bson:"name"`
Main bool `json:"main,omitempty" yaml:"main" bson:"main"`
Account string `json:"account,omitempty" yaml:"account" bson:"account"`
TransferID uuid.UUID `json:"transferID,omitempty" yaml:"transferID" bson:"transferID"`
}
6 changes: 3 additions & 3 deletions internal/app/connectors/stripe/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ func TestStopTailing(t *testing.T) {
MoreRecentID: "tx2",
})

logger := logging.GetLogger(context.Background())
logger := logging.GetLogger(context.TODO())
trigger := NewTimelineTrigger(logger, NoOpIngester, timeline)
r := NewRunner(logger, trigger, time.Second)

go func() {
_ = r.Run(context.Background())
_ = r.Run(context.TODO())
}()

defer func() {
_ = r.Stop(context.Background())
_ = r.Stop(context.TODO())
}()

require.False(t, timeline.state.NoMoreHistory)
Expand Down
Loading

0 comments on commit 5e3b644

Please sign in to comment.