From 6a9d822a73a79eb52fe299a13b3e154c04f113c7 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Thu, 30 Jun 2022 21:15:29 +0200 Subject: [PATCH 1/2] feat: Improve swagger. --- swagger.yml | 324 +++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 256 insertions(+), 68 deletions(-) diff --git a/swagger.yml b/swagger.yml index fcd9c41d..ce9b6cd8 100644 --- a/swagger.yml +++ b/swagger.yml @@ -1,74 +1,265 @@ openapi: 3.0.0 info: - title: Sample API - version: 0.1.9 + title: Payments API + version: 1.0.0 paths: - /: - put: - operationId: savePayment + /payments: + get: + summary: Returns a list of payments. + operationId: listPayments tags: - - payments - summary: Persist payment + - payments + parameters: + - name: limit + in: query + schema: + type: integer + description: Limit the number of payments to return, pagination can be achieved in conjunction with 'skip' parameter. + example: 10 + - name: skip + in: query + schema: + type: integer + description: How many payments to skip, pagination can be achieved in conjunction with 'limit' parameter. + example: 100 + - name: sort + in: query + schema: + type: array + items: + type: string + description: Field used to sort payments (Default is by date). + example: status + responses: + '200': # status code + description: A JSON array of payments + content: + application/json: + schema: + $ref: '#/components/schemas/ListPaymentsResponse' + /payments/{paymentId}: + get: + summary: Returns a payment. + operationId: getPayment + tags: + - payments + parameters: + - name: paymentId + in: path + schema: + type: string + description: The payment id + example: XXX + required: true + responses: + '200': # status code + description: A payment + content: + application/json: + schema: + $ref: '#/components/schemas/Payment' + /connectors/{connector}: + post: + summary: Install connector + operationId: installConnector + description: Install connector + parameters: + - name: connector + description: The connector code + in: path + schema: + type: string + enum: + - stripe + required: true requestBody: required: true content: application/json: schema: - $ref: '#/components/schemas/Payment' + $ref: '#/components/schemas/ConnectorConfig' responses: - '201': - description: A payment object was created - '204': - description: A payment object was updated + 204: + description: Connector has been installed + delete: + summary: Uninstall connector + operationId: uninstallConnector + description: Uninstall connector + parameters: + - name: connector + description: The connector code + in: path + schema: + type: string + enum: + - stripe + required: true + responses: + 204: + description: Connector has been uninstalled + /connectors/{connector}/config: get: - summary: Returns a list of payments. - operationId: listPayments - tags: - - payments + summary: Read connector config + operationId: readConnectorConfig + description: Read connector config parameters: - - name: limit - in: query - schema: - type: integer - - name: skip - in: query - schema: - type: integer - - name: sort - in: query - schema: - type: array - items: - type: string + - name: connector + description: The connector code + in: path + schema: + type: string + enum: + - stripe + required: true responses: - '200': # status code - description: A JSON array of payments + 200: + description: Connector config content: application/json: schema: - $ref: '#/components/schemas/ListPaymentsResponse' - + $ref: '#/components/schemas/ConnectorConfig' + /connectors/{connector}/reset: + post: + summary: Reset connector + operationId: resetConnector + description: Reset connector. Will remove the connector and ALL PAYMENTS generated with it. + parameters: + - name: connector + description: The connector code + in: path + schema: + type: string + enum: + - stripe + required: true + responses: + 204: + description: Connector has been reset + /connectors/{connector}/tasks: + get: + summary: List connector tasks + operationId: listConnectorTasks + description: List all tasks associated with this connector. + parameters: + - name: connector + description: The connector code + in: path + schema: + type: string + enum: + - stripe + required: true + responses: + 200: + description: Task list + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/ConnectorTask' + /connectors/{connector}/tasks/{taskId}: + get: + summary: Read a specific task of the connector + operationId: getConnectorTask + description: Get a specific task associated to the connector + parameters: + - name: connector + description: The connector code + in: path + schema: + type: string + enum: + - stripe + required: true + - name: taskId + description: The task id + example: task1 + in: path + schema: + type: string + required: true + responses: + 200: + description: The specified task + content: + application/json: + schema: + $ref: '#/components/schemas/ConnectorTask' components: schemas: + ConnectorConfig: + oneOf: + - $ref: '#/components/schemas/StripeConfig' + ConnectorTask: + oneOf: + - $ref: '#/components/schemas/StripeTask' + StripeConfig: + type: object + required: + - apiKey + properties: + pollingPeriod: + type: string + example: "60s" + description: The frequency at which the connector will try to fetch new BalanceTransaction objects from Stripe api + default: "120s" + apiKey: + type: string + example: XXX + pageSize: + type: number + description: | + Number of BalanceTransaction to fetch at each polling interval. + default: 10 + example: 50 + StripeTask: + type: object + properties: + oldestId: + type: string + description: The id of the oldest BalanceTransaction fetched from stripe for this account + oldestDate: + type: string + format: date-time + description: The creation date of the oldest BalanceTransaction fetched from stripe for this account + moreRecentId: + type: string + description: The id of the more recent BalanceTransaction fetched from stripe for this account + moreRecentDate: + type: string + format: date-time + description: The creation date of the more recent BalanceTransaction fetched from stripe for this account + noMoreHistory: + type: boolean + description: Indicate that there no more history to fetch on this account ListPaymentsResponse: type: object required: - - data + - data properties: data: type: array items: $ref: '#/components/schemas/Payment' + GetPaymentResponse: + type: object + required: + - data + properties: + data: + $ref: '#/components/schemas/Payment' Payment: type: object required: - - provider - - status - - value - - date - - id - - type - - scheme + - provider + - status + - asset + - amount + - date + - id + - type + - scheme properties: provider: type: string @@ -77,40 +268,37 @@ components: scheme: type: string enum: - - visa - - mastercard - - apple pay - - google pay - - sepa debit - - sepa credit - - sepa - - a2a - - ach debit - - ach - - rtp - - other + - visa + - mastercard + - apple pay + - google pay + - sepa debit + - sepa credit + - sepa + - a2a + - ach debit + - ach + - rtp + - other status: type: string type: type: string enum: - - pay-in - - payout - - other + - pay-in + - payout + - other id: type: string - value: - type: object - required: - - amount - - asset - properties: - amount: - type: integer - asset: - type: string + example: XXX + amount: + type: integer + example: 100 + asset: + type: string + example: USD date: type: string format: date-time raw: - nullable: true \ No newline at end of file + nullable: true From c244c587ecb146580b5f54a0b3ddbb2b69f5cbb2 Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Tue, 5 Jul 2022 08:47:15 +0200 Subject: [PATCH 2/2] doc: Add a tutorial about how to create a connector --- README.md | 19 +- cmd/root.go | 1 + docs/development.md | 24 + docs/samples-payin.json | 7 + docs/tuto-connector.md | 576 +++++++++++++++++++++++ pkg/bridge/connectors/example/example.go | 97 ++++ pkg/bridge/http/api.go | 7 +- pkg/bridge/integration/connector.go | 2 - pkg/bridge/integration/loader.go | 6 + pkg/bridge/integration/manager.go | 3 + pkg/bridge/task/scheduler.go | 4 +- 11 files changed, 730 insertions(+), 16 deletions(-) create mode 100644 docs/development.md create mode 100644 docs/samples-payin.json create mode 100644 docs/tuto-connector.md create mode 100644 pkg/bridge/connectors/example/example.go diff --git a/README.md b/README.md index b3fd3a84..6ce6262f 100644 --- a/README.md +++ b/README.md @@ -4,23 +4,26 @@ # Getting started -Payments works as a standalone binary, the latest of which can be downloaded from the [releases page](https://github.com/numary/payments/releases). You can move the binary to any executable path, such as to `/usr/local/bin`. Installations using brew, apt, yum or docker are also [available](https://docs.numary.com/oss/payments/get-started/installation). +Payments works as a standalone binary, the latest of which can be downloaded from the [releases page](https://github.com/numary/payments/releases). You can move the binary to any executable path, such as to `/usr/local/bin`. Installations using brew, apt, yum or docker are also [available](https://docs.formance.com/oss/payments/get-started/installation). ```SHELL payments ``` -# Documentation +# What is it? -You can find the complete Payments documentation at [docs.numary.com](https://docs.numary.com/oss/payments/get-started/installation) +Basically, a framework. -# Dashboard +A framework to ingest payin and payout coming from different payment providers (PSP). -A simple [dashboard](https://github.com/numary/control) is built in the payments binary, to make it easier to visualize transactions. It can be started with: +The framework contains connectors. Each connector is basically a translator for a PSP. +Translator, because the main role of a connector is to translate specific PSP payin/payout formats to a generalized format used at Formance. -control-screenshot +Because it is a framework, it is extensible. Please follow the guide below if you want to add your connector. -You can use the dashboard by heading to [control.numary.com](https://control.numary.com) which provides a hosted version that can connect to any payments instance. +# Contribute + +Please follow [this guide](./docs/development.md) if you want to contribute. # Roadmap & Community @@ -28,4 +31,4 @@ We keep an open roadmap of the upcoming releases and features [here](https://num If you need help, want to show us what you built or just hang out and chat about paymentss you are more than welcome on our [Discord](https://discord.gg/xyHvcbzk4w) - looking forward to see you there! -![Frame 1 (2)](https://user-images.githubusercontent.com/1770991/134163361-d86c5728-6075-4510-8de7-06df1f6ed740.png) \ No newline at end of file +![Frame 1 (2)](https://user-images.githubusercontent.com/1770991/134163361-d86c5728-6075-4510-8de7-06df1f6ed740.png) diff --git a/cmd/root.go b/cmd/root.go index d16f63fb..761a2413 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -5,6 +5,7 @@ import ( "os" "strings" + _ "github.com/bombsimon/logrusr/v3" "github.com/spf13/cobra" "github.com/spf13/viper" ) diff --git a/docs/development.md b/docs/development.md new file mode 100644 index 00000000..881e93c5 --- /dev/null +++ b/docs/development.md @@ -0,0 +1,24 @@ +# Development + +## Run + +A docker-compose contains all the stuff required to launch the service. + +Currently, the service use MongoDB as database, and it takes few seconds to start and is not ready when the payments service try to connect to him. +You can start MongoDB before and wait before start payments service using two terminal : +``` +docker compose up mongodb # Run on first terminal +``` +and +``` +docker compose up payments # Run on second terminal +``` + +Tests can be started regularly using standard go tooling, just use : +``` +go test ./... +``` + +## Develop a connector + +Want to develop a connector? [Follow this link](./tuto-connector.md) diff --git a/docs/samples-payin.json b/docs/samples-payin.json new file mode 100644 index 00000000..4259ffba --- /dev/null +++ b/docs/samples-payin.json @@ -0,0 +1,7 @@ +{ + "type": "payin", + "reference": "001", + "status": "succeeded", + "asset": "USD", + "initialAmount": 100 +} diff --git a/docs/tuto-connector.md b/docs/tuto-connector.md new file mode 100644 index 00000000..e1d7290f --- /dev/null +++ b/docs/tuto-connector.md @@ -0,0 +1,576 @@ +# Tutorial connector + +We are going to create a fake connector which read a directory. +In this directory, a fake bank service will create files. +Each files contain a payin or a payout as a json object. + +First, to create a connector, we need a loader. + +## The loader object + +```go +type Loader[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor payments.TaskDescriptor] interface { + // Name has to return the name of the connector. It must be constant and unique + Name() string + // Load is in charge of loading the connector + // It takes a logger and a ConnectorConfig object. + // At this point, the config must have been validated + Load(logger sharedlogging.Logger, config ConnectorConfig) Connector[TaskDescriptor] + // ApplyDefaults is used to fill default values of the provided configuration object. + ApplyDefaults(t ConnectorConfig) ConnectorConfig + // AllowTasks define how many task the connector can run + // If too many tasks are scheduled by the connector, + // those will be set to pending state and restarted later when some other tasks will be terminated + AllowTasks() int +} +``` + +A connector has a name. + +This name is provided by the loader by the method Name(). +Also, each connector define a config object using generics which has to implement interface payments.ConnectorConfigObject. +This interface only have a method Validate() error which is used by the code to validate an external config is valid before load the connector with it. +Since, some properties of the config may have some optionnal, the loader is also in charge of configuring default values on it. +This is done using the method ```ApplyDefaults(Config)```. + +The framework provide the capabilities to run tasks. +So each connector can start any number of tasks. +Those tasks will be scheduled by the framework. For example, if the service is restarted, the tasks will be restarted at reboot. +The number of tasks a connector can schedule is defined by the method AllowTasks(). + +To implement Loader interface, you can create your own struct implementing required methods, or you can use some utilities provided by the framework. +Let`s create a basic loader. + +```go +type ( + Config struct {} + TaskDescriptor struct {} +) + +func (cfg Config) Validate() error { + return nil +} + +var Loader = integration.NewLoaderBuilder[Config, TaskDescriptor]("example").Build() +``` + +Here, we built our loader. +The name of the connector is "example". +For now, the ```Config``` and ```TaskDescriptor``` are just empty structs, we will change it later. +Also, we didn't define any logic on our connector. + +It's time to plug our connector on the core. +Edit the file cmd/root.go and go at the end of the method HTTPModule(), you should find a code like this : +```go + ... + cdi.ConnectorModule[stripe.Config, stripe.TaskDescriptor]( + viper.GetBool(authBearerUseScopesFlag), + stripe.NewLoader(), + ), + ... +``` + +You can add your connector bellow that : +```go + ... + cdi.ConnectorModule[example.Config, example.TaskDescriptor]( + viper.GetBool(authBearerUseScopesFlag), + example.Loader, + ), + ... +``` + +Now you, can run the service and you should see something like this : +```bash +payments-payments-1 | time="2022-07-01T09:12:21Z" level=info msg="Restoring state" component=connector-manager provider=example +payments-payments-1 | time="2022-07-01T09:12:21Z" level=info msg="Not installed, skip" component=connector-manager provider=example +``` + +This indicates your connector is properly integrated. +You can install it like this : +```bash +curl http://localhost:8080/connectors/example -X POST +``` + +The service will display something like this : +```bash +payments-payments-1 | time="2022-07-01T10:04:53Z" level=info msg="Install connector example" component=connector-manager config="{}" provider=example +payments-payments-1 | time="2022-07-01T10:04:53Z" level=info msg="Connector installed" component=connector-manager provider=example +``` + +Your connector was installed! +It makes nothing but it is installed. + +Let's uninstall it before continue : +```bash +curl http://localhost:8080/connectors/example -X DELETE +``` + +You should see something like this : +```bash +payments-payments-1 | time="2022-07-01T10:06:16Z" level=info msg="Uninstalling connector" component=connector-manager provider=example +payments-payments-1 | time="2022-07-01T10:06:16Z" level=info msg="Stopping scheduler..." component=scheduler provider=example +payments-payments-1 | time="2022-07-01T10:06:16Z" level=info msg="Connector uninstalled" component=connector-manager provider=example +``` + +It's to time to add a bit of logic to our connector. + +As you may have noticed, the ```Loader``` has method named ```Load``` : +```go +... +Load(logger sharedlogging.Logger, config ConnectorConfig) Connector[TaskDescriptor] +... +``` + +The Load function take a logger provided by the framework and a config, probably provided by the api endpoint. +It has to return a Connector object. Here the interface : +```go +// Connector provide entry point to a payment provider +type Connector[TaskDescriptor payments.TaskDescriptor any] interface { + // Install is used to start the connector. The implementation if in charge of scheduling all required resources. + Install(ctx task.ConnectorContext[TaskDescriptor]) error + // Uninstall is used to uninstall the connector. It has to close all related resources opened by the connector. + Uninstall(ctx context.Context) error + // Resolve is used to recover state of a failed or restarted task + Resolve(descriptor TaskDescriptor) task.Task +} +``` + +When you made ```curl http://localhost:8080/connectors/example -X POST```, the framework called the ```Install()``` method. +When you made ```curl http://localhost:8080/connectors/example -X DELETE```, the framework called the ```Uninstall(ctx context.Context) error``` method. + +It's time to add some logic. We have to modify our loader but before let's add some property to our config : +```go +type ( + Config struct { + Directory string + } + ... +) + +func (cfg Config) Validate() error { + if cfg.Directory == "" { + return errors.New("missing directory to watch") + } + return nil +} +``` + +Here we defined only one property to our connector, "Directory", which indicates the directory when json files will be pushed. +Now, modify our loader : +```go +var Loader = integration.NewLoaderBuilder[Config, TaskDescriptor]("example"). + WithLoad(func(logger sharedlogging.Logger, config Config) integration.Connector[TaskDescriptor] { + return integration.NewConnectorBuilder[TaskDescriptor](). + WithInstall(func(ctx task.ConnectorContext[TaskDescriptor]) error { + return errors.New("not implemented") + }). + Build() + }). + Build() +``` + +Here we create a connector using a builtin builder, but you can implement the interface if you want. +We define a ```Install``` method which only returns an errors when installed. +You can retry to install your connector and see the error on the http response. + +The ```Install``` method take a ```task.ConnectorContext[TaskDescriptor]``` parameter : +```go +type ConnectorContext[TaskDescriptor payments.TaskDescriptor] interface { + Context() context.Context + Scheduler() Scheduler[TaskDescriptor] +} +``` + +Basically this context provides two things : +* a ```context.Context``` : If the connector make long-running processing, it should listen on this context to abort if necessary. +* a ```Scheduler[TaskDescriptor]```: A scheduler to run tasks + +But, what is a task ? + +A task is like a process than the framework will handle for you. It is basically a simple function. +When installed, a connector has the opportunity to schedule some tasks and let the system handle them for him. +A task has a descriptor. +The descriptor must be immutable and represents a specific task in the system. It can be anything. +A task also have a state. The state can change and the framework provides necessary apis to do that. We will come on that later. +As the descriptor, the state is freely defined by the connector. + +In our case, the main task seems evident as we have to list the target repository. +Secondary tasks will be defined to read each files present in the directory. +We can define our task descriptor to a string. The value will be the file name in case of secondary tasks and a hardcoded value of "directory" for the main task. + +Before add the logic, let's modify our previously introduced task descriptor : +```go +type ( + ... + TaskDescriptor string + ... +) + +``` + +Add some logic on our connector : +```go + ... + WithInstall(func(ctx task.ConnectorContext[TaskDescriptor]) error { + return ctx.Scheduler().Schedule("directory") + }). + ... +``` + +Here we instruct the framework to create the task with the descriptor "directory". +Cool! The framework can handle the task, restart it, log/save errors etc... +But it doesn't know about the logic. + +To do that, we have to use the last method of the connector : ```Resolve(descriptor TaskDescriptor) task.Task``` +This method is in charge of providing a ```task.Task``` instance given a descriptor. + +So, when calling ```ctx.Scheduler().Schedule("directory")```, the framework will call the ```Resolve``` method with "directory" as parameter. + +Let's implement the resolve method : +```go + ... + WithInstall(func(ctx task.ConnectorContext[TaskDescriptor]) error { + return ctx.Scheduler().Schedule("directory") + }). + WithResolve(func(descriptor TaskDescriptor) task.Task { + if descriptor == "directory" { + return func() { + // TODO + } + } + // Secondary tasks + return func() { + // TODO + } + }). + ... +``` + +Now, we have to implement the logic for each task. + +Let's start with the main task which read the directory : +```go + ... + WithResolve(func(descriptor TaskDescriptor) task.Task { + if descriptor == "directory" { + return func(ctx context.Context, logget sharedlogging.Logger, scheduler task.Scheduler) + for { + select { + case <-ctx.Done(): + return nil + case <-time.After(10 * time.Second): // Could be configurable using Config object + logger.Infof("Opening directory '%s'...", config.Directory) + dir, err := os.ReadDir(config.Directory) + if err != nil { + logger.Errorf("Error opening directory '%s': %s", config.Directory, err) + continue + } + + logger.Infof("Found %d files", len(dir)) + for _, file := range dir { + err = scheduler.Schedule(TaskDescriptor(file.Name())) + if err != nil { + logger.Errorf("Error scheduling task '%s': %s", file.Name(), err) + continue + } + } + } + } + } + } + return func() error { + return errors.New("not implemented") + } + }). + ... +``` + +Let's test our implementation. + +Start the server as usual and issue a curl request to install the connector : +```bash +curl http://localhost:8080/connectors/example -X POST -d '{"directory": "/tmp/payments"}' +``` + +Here we instruct the connector to watch the directory /tmp/payments. Check the app logs, you should see something like this : +```bash +payments-payments-1 | time="2022-07-01T12:29:05Z" level=info msg="Install connector example" component=connector-manager config="{/tmp/payments}" provider=example +payments-payments-1 | time="2022-07-01T12:29:05Z" level=info msg="Starting task..." component=scheduler provider=example task-id="ImRpcmVjdG9yeSI=" +payments-payments-1 | time="2022-07-01T12:29:05Z" level=info msg="Connector installed" component=connector-manager provider=example +payments-payments-1 | time="2022-07-01T13:26:51Z" level=info msg="Opening directory '/tmp/payments'..." component=scheduler provider=example task-id="ImRpcmVjdG9yeSI=" +payments-payments-1 | time="2022-07-01T13:26:51Z" level=error msg="Error opening directory '/tmp/payments': open /tmp/payments: no such file or directory" component=scheduler provider=example task-id="ImRpcmVjdG9yeSI=" +``` + +As expected, the task trigger an error because of non-existent /tmp/payments directory. + +You can see the tasks on api too : +```bash +# curl http://localhost:8080/connectors/example/tasks|jq +[ + { + "provider": "example", + "descriptor": "directory", + "createdAt": "2022-07-01T13:26:41.749Z", + "status": "active", + "error": "", + "state": {}, + "id": "ImRpcmVjdG9yeSI=" + } +] +``` + +As you can see, a task has an id. This id is simply the descriptor of the task encoded in canonical json and encoded as base 64. + +Let's create the missing directory: +```bash +docker compose exec payments mkdir /tmp/payments +``` + +After a few seconds, you should see thoses logs on app : +```bash +payments-payments-1 | time="2022-07-01T13:29:21Z" level=info msg="Opening directory '/tmp/payments'..." component=scheduler provider=example task-id="ImRpcmVjdG9yeSI=" +payments-payments-1 | time="2022-07-01T13:29:21Z" level=info msg="Found 0 files" component=scheduler provider=example task-id="ImRpcmVjdG9yeSI=" +``` + +Ok, create a payin file : +```bash +docker compose cp docs/samples-payin.json payments:/tmp/payments/001.json +``` + +You should see those lines on logs : +```bash +payments-payments-1 | time="2022-07-01T13:33:51Z" level=info msg="Opening directory '/tmp/payments'..." component=scheduler provider=example task-id="ImRpcmVjdG9yeSI=" +payments-payments-1 | time="2022-07-01T13:33:51Z" level=info msg="Found 1 files" component=scheduler provider=example task-id="ImRpcmVjdG9yeSI=" +payments-payments-1 | time="2022-07-01T13:33:52Z" level=info msg="Starting task..." component=scheduler provider=example task-id="IjAwMS5qc29uIg==" +payments-payments-1 | time="2022-07-01T13:33:52Z" level=error msg="Task terminated with error: not implemented" component=scheduler provider=example task-id="IjAwMS5qc29uIg==" +``` + +The log show our connector detect the file and trigger a new task for the file. +The task terminate with an error as the ```Resolve``` function does not handle the descriptor. We will do this later. + +Again, you can view the tasks on the api : +```bash +[ + { + "provider": "example", + "descriptor": "directory", + "createdAt": "2022-07-01T13:26:41.749Z", + "status": "active", + "error": "", + "state": "XXX", + "id": "ImRpcmVjdG9yeSI=" + }, + { + "provider": "example", + "descriptor": "001.json", + "createdAt": "2022-07-01T13:33:31.935Z", + "status": "failed", + "error": "not implemented", + "state": "XXX", + "id": "IjAwMS5qc29uIg==" + } +] +``` + +As you can see, as the first task is still active, the second is flagged as failed with an error message. + +It's time to implement the second task : +```go + ... + file, err := os.Open(filepath.Join(config.Directory, string(descriptor))) + if err != nil { + return err + } + + type JsonPayment struct { + payments.Data + Reference string `json:"reference"` + Type string `json:"type"` + } + + jsonPayment := &JsonPayment{} + err = json.NewDecoder(file).Decode(jsonPayment) + if err != nil { + return err + } + + return ingester.Ingest(ctx, ingestion.Batch{ + { + Referenced: payments.Referenced{ + Reference: jsonPayment.Reference, + Type: jsonPayment.Type, + }, + Payment: &jsonPayment.Data, + Forward: true, + }, + }, struct{}{}) + ... +``` + +Now restart the service, uninstall the connector, and reinstall it. + +Here the logs : +```bash +payments-payments-1 | time="2022-07-01T14:25:20Z" level=info msg="Install connector example" component=connector-manager config="{/tmp/payments}" provider=example +payments-payments-1 | time="2022-07-01T14:25:20Z" level=info msg="Starting task..." component=scheduler provider=example task-id="ImRpcmVjdG9yeSI=" +payments-payments-1 | time="2022-07-01T14:25:20Z" level=info msg="Connector installed" component=connector-manager provider=example +payments-payments-1 | time="2022-07-01T14:25:30Z" level=info msg="Opening directory '/tmp/payments'..." component=scheduler provider=example task-id="ImRpcmVjdG9yeSI=" +payments-payments-1 | time="2022-07-01T14:25:30Z" level=info msg="Found 1 files" component=scheduler provider=example task-id="ImRpcmVjdG9yeSI=" +payments-payments-1 | time="2022-07-01T14:25:30Z" level=info msg="Starting task..." component=scheduler provider=example task-id="IjAwMS5qc29uIg==" +payments-payments-1 | time="2022-07-01T14:25:30Z" level=info msg="Task terminated with success" component=scheduler provider=example task-id="IjAwMS5qc29uIg==" +``` + +As you can see, this time the second task has been started and was terminated with success. + +It should have created a payment on database. Let's check : +```bash +# curl http://localhost:8080/connectors/payments|jq +{ + "data": [ + { + "id": "eyJwcm92aWRlciI6ImV4YW1wbGUiLCJyZWZlcmVuY2UiOiIwMDEiLCJ0eXBlIjoicGF5aW4ifQ==", + "reference": "001", + "type": "payin", + "provider": "example", + "status": "succeeded", + "initialAmount": 100, + "scheme": "", + "asset": "USD", + "createdAt": "0001-01-01T00:00:00Z", + "raw": null, + "adjustments": [ + { + "status": "succeeded", + "amount": 100, + "date": "0001-01-01T00:00:00Z", + "raw": null, + "absolute": false + } + ] + } + ] +} +``` + +The last important part is the ```Ingester```. + +In the code of the second task, you should have seen this part : +```go +return ingester.Ingest(ctx.Context(), ingestion.Batch{ + { + Referenced: payments.Referenced{ + Reference: jsonPayment.Reference, + Type: jsonPayment.Type, + }, + Payment: &jsonPayment.Data, + Forward: true, + }, +}, struct{}{}) +``` +The ingester is in charge of accepting payments from a task and an eventual state to be persisted. + +In our case, we don't alter the state, but we could if we want (we passed an empty struct). + +If the connector is restarted, the task will be restarted with the previously state. + +The complete code : +```go +package example + +import ( + "context" + "encoding/json" + "errors" + "os" + "path/filepath" + "time" + + "github.com/numary/go-libs/sharedlogging" + payments "github.com/numary/payments/pkg" + "github.com/numary/payments/pkg/bridge/ingestion" + "github.com/numary/payments/pkg/bridge/integration" + "github.com/numary/payments/pkg/bridge/task" +) + +type ( + Config struct { + Directory string + } + TaskDescriptor string +) + +func (cfg Config) Validate() error { + if cfg.Directory == "" { + return errors.New("missing directory to watch") + } + return nil +} + +var Loader = integration.NewLoaderBuilder[Config, TaskDescriptor]("example"). + WithLoad(func(logger sharedlogging.Logger, config Config) integration.Connector[TaskDescriptor] { + return integration.NewConnectorBuilder[TaskDescriptor](). + WithInstall(func(ctx task.ConnectorContext[TaskDescriptor]) error { + return ctx.Scheduler().Schedule("directory", false) + }). + WithResolve(func(descriptor TaskDescriptor) task.Task { + if descriptor == "directory" { + return func(ctx context.Context, logger sharedlogging.Logger, scheduler task.Scheduler[TaskDescriptor]) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(10 * time.Second): // Could be configurable using Config object + logger.Infof("Opening directory '%s'...", config.Directory) + dir, err := os.ReadDir(config.Directory) + if err != nil { + logger.Errorf("Error opening directory '%s': %s", config.Directory, err) + continue + } + + logger.Infof("Found %d files", len(dir)) + for _, file := range dir { + err = scheduler.Schedule(TaskDescriptor(file.Name()), false) + if err != nil { + logger.Errorf("Error scheduling task '%s': %s", file.Name(), err) + continue + } + } + } + } + } + } + return func(ctx context.Context, ingester ingestion.Ingester, resolver task.StateResolver) error { + file, err := os.Open(filepath.Join(config.Directory, string(descriptor))) + if err != nil { + return err + } + + type JsonPayment struct { + payments.Data + Reference string `json:"reference"` + Type string `json:"type"` + } + + jsonPayment := &JsonPayment{} + err = json.NewDecoder(file).Decode(jsonPayment) + if err != nil { + return err + } + + return ingester.Ingest(ctx, ingestion.Batch{ + { + Referenced: payments.Referenced{ + Reference: jsonPayment.Reference, + Type: jsonPayment.Type, + }, + Payment: &jsonPayment.Data, + Forward: true, + }, + }, struct{}{}) + } + }). + Build() + }). + Build() +``` diff --git a/pkg/bridge/connectors/example/example.go b/pkg/bridge/connectors/example/example.go new file mode 100644 index 00000000..1329279a --- /dev/null +++ b/pkg/bridge/connectors/example/example.go @@ -0,0 +1,97 @@ +package example + +import ( + "context" + "encoding/json" + "errors" + "os" + "path/filepath" + "time" + + "github.com/numary/go-libs/sharedlogging" + payments "github.com/numary/payments/pkg" + "github.com/numary/payments/pkg/bridge/ingestion" + "github.com/numary/payments/pkg/bridge/integration" + "github.com/numary/payments/pkg/bridge/task" +) + +type ( + Config struct { + Directory string + } + TaskDescriptor string +) + +func (cfg Config) Validate() error { + if cfg.Directory == "" { + return errors.New("missing directory to watch") + } + return nil +} + +var Loader = integration.NewLoaderBuilder[Config, TaskDescriptor]("example"). + WithLoad(func(logger sharedlogging.Logger, config Config) integration.Connector[TaskDescriptor] { + return integration.NewConnectorBuilder[TaskDescriptor](). + WithInstall(func(ctx task.ConnectorContext[TaskDescriptor]) error { + return ctx.Scheduler().Schedule("directory", false) + }). + WithResolve(func(descriptor TaskDescriptor) task.Task { + if descriptor == "directory" { + return func(ctx context.Context, logger sharedlogging.Logger, scheduler task.Scheduler[TaskDescriptor]) error { + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(10 * time.Second): // Could be configurable using Config object + logger.Infof("Opening directory '%s'...", config.Directory) + dir, err := os.ReadDir(config.Directory) + if err != nil { + logger.Errorf("Error opening directory '%s': %s", config.Directory, err) + continue + } + + logger.Infof("Found %d files", len(dir)) + for _, file := range dir { + err = scheduler.Schedule(TaskDescriptor(file.Name()), false) + if err != nil { + logger.Errorf("Error scheduling task '%s': %s", file.Name(), err) + continue + } + } + } + } + } + } + return func(ctx context.Context, ingester ingestion.Ingester, resolver task.StateResolver) error { + file, err := os.Open(filepath.Join(config.Directory, string(descriptor))) + if err != nil { + return err + } + + type JsonPayment struct { + payments.Data + Reference string `json:"reference"` + Type string `json:"type"` + } + + jsonPayment := &JsonPayment{} + err = json.NewDecoder(file).Decode(jsonPayment) + if err != nil { + return err + } + + return ingester.Ingest(ctx, ingestion.Batch{ + { + Referenced: payments.Referenced{ + Reference: jsonPayment.Reference, + Type: jsonPayment.Type, + }, + Payment: &jsonPayment.Data, + Forward: true, + }, + }, struct{}{}) + } + }). + Build() + }). + Build() diff --git a/pkg/bridge/http/api.go b/pkg/bridge/http/api.go index 843e85ae..28136dfc 100644 --- a/pkg/bridge/http/api.go +++ b/pkg/bridge/http/api.go @@ -103,17 +103,16 @@ func Install[Config payments.ConnectorConfigObject, Descriptor payments.TaskDesc return } - var config *Config + var config Config if r.ContentLength > 0 { - config = new(Config) - err := json.NewDecoder(r.Body).Decode(config) + err := json.NewDecoder(r.Body).Decode(&config) if err != nil { handleError(w, r, err) return } } - err = cm.Install(r.Context(), *config) + err = cm.Install(r.Context(), config) if err != nil { handleError(w, r, err) return diff --git a/pkg/bridge/integration/connector.go b/pkg/bridge/integration/connector.go index 50233682..bc093100 100644 --- a/pkg/bridge/integration/connector.go +++ b/pkg/bridge/integration/connector.go @@ -8,8 +8,6 @@ import ( ) // Connector provide entry point to a payment provider -// It requires a payments.ConnectorConfigObject representing the configuration of the specific payment provider -// as well as a payments.ConnectorState object which represents the state of the connector type Connector[TaskDescriptor payments.TaskDescriptor] interface { // Install is used to start the connector. The implementation if in charge of scheduling all required resources. Install(ctx task.ConnectorContext[TaskDescriptor]) error diff --git a/pkg/bridge/integration/loader.go b/pkg/bridge/integration/loader.go index 7155089c..7da2d8ee 100644 --- a/pkg/bridge/integration/loader.go +++ b/pkg/bridge/integration/loader.go @@ -10,6 +10,9 @@ type Loader[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor payme Load(logger sharedlogging.Logger, config ConnectorConfig) Connector[TaskDescriptor] // ApplyDefaults is used to fill default values of the provided configuration object ApplyDefaults(t ConnectorConfig) ConnectorConfig + // AllowTasks define how many task the connector can run + // If too many tasks are scheduled by the connector, + // those will be set to pending state and restarted later when some other tasks will be terminated AllowTasks() int } @@ -66,6 +69,9 @@ func (b *BuiltLoader[ConnectorConfig, TaskDescriptor]) Name() string { } func (b *BuiltLoader[ConnectorConfig, TaskDescriptor]) Load(logger sharedlogging.Logger, config ConnectorConfig) Connector[TaskDescriptor] { + if b.loadFunction != nil { + return b.loadFunction(logger, config) + } return b.loadFunction(logger, config) } diff --git a/pkg/bridge/integration/manager.go b/pkg/bridge/integration/manager.go index 4b5c74f7..3a809b40 100644 --- a/pkg/bridge/integration/manager.go +++ b/pkg/bridge/integration/manager.go @@ -84,6 +84,9 @@ func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) Install(ctx context. } config = l.loader.ApplyDefaults(config) + if err := config.Validate(); err != nil { + return err + } l.load(config) diff --git a/pkg/bridge/task/scheduler.go b/pkg/bridge/task/scheduler.go index d68496f0..fa6f36f8 100644 --- a/pkg/bridge/task/scheduler.go +++ b/pkg/bridge/task/scheduler.go @@ -89,7 +89,7 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) Schedule(descriptor TaskDescripto } } - if len(s.tasks) >= s.maxTasks || s.stopped { + if s.maxTasks != 0 && len(s.tasks) >= s.maxTasks || s.stopped { err := s.stackTask(descriptor) if err != nil { return errors.Wrap(err, "stacking task") @@ -277,7 +277,6 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) startTask(descriptor TaskDescript debug.PrintStack() return } - logger.Infof("Task terminated with success") }() err := container.Invoke(task) @@ -285,6 +284,7 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) startTask(descriptor TaskDescript s.registerTaskError(ctx, holder, err) return } + logger.Infof("Task terminated with success") err = s.store.UpdateTaskStatus(ctx, s.provider, descriptor, payments.TaskStatusTerminated, "") if err != nil {