diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md new file mode 100644 index 00000000..f7757cdf --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -0,0 +1,27 @@ +--- +name: Bug report +about: Create a report to help us improve +title: '' +labels: bug +assignees: '' + +--- + +**Describe the bug** +A clear and concise description of what the bug is. + +**To Reproduce** +Steps to reproduce the behavior: + +**Expected behavior** +A clear and concise description of what you expected to happen. + +**Logs** +If applicable, add logs to help explain your problem. + +**Environment (please complete the following information):** + - OS: [e.g. ubuntu 20.04] + - Numary Version [e.g. 1.0.0-beta.4] + +**Additional context** +Add any other context about the problem here. diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md new file mode 100644 index 00000000..b36319ba --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature_request.md @@ -0,0 +1,22 @@ +--- +name: Feature request +about: Suggest an idea for this project +title: '' +labels: enhancement, rfc +assignees: '' + +--- + +**Is your feature request related to a problem? Please describe.** +A clear and concise description of what the problem is. Ex. I'm always frustrated when [...] + +**Summary** + +**Solution proposal** +A clear and concise description of what you want to happen. + +**Describe alternatives you've considered** +A clear and concise description of any alternative solutions or features you've considered. + +**Additional context** +Add any other context or screenshots about the feature request here. diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 00000000..99602049 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,7 @@ +version: 2 +updates: + + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "weekly" diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml new file mode 100644 index 00000000..773d9fba --- /dev/null +++ b/.github/workflows/codeql.yml @@ -0,0 +1,33 @@ +name: "CodeQL" + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + schedule: + - cron: '35 21 * * 5' + +jobs: + analyze: + name: Analyze + runs-on: ubuntu-latest + permissions: + actions: read + contents: read + security-events: write + strategy: + fail-fast: false + matrix: + language: [ 'go' ] + steps: + - name: Checkout repository + uses: actions/checkout@v3 + - name: Initialize CodeQL + uses: github/codeql-action/init@v2 + with: + languages: ${{ matrix.language }} + - name: Autobuild + uses: github/codeql-action/autobuild@v2 + - name: Perform CodeQL Analysis + uses: github/codeql-action/analyze@v2 diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 00000000..3416ff0a --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,92 @@ +on: + push: + branches: + - 'main' + - 'features/**' + - 'feature/**' + - 'feat/**' + - 'fix/**' + - 'hotfix/**' + pull_request: + types: [ assigned, opened, synchronize, reopened ] + release: + types: [ prereleased, released ] + +name: Main +jobs: + pr-style: + if: github.event_name == 'pull_request' + uses: numary/gh-workflows/.github/workflows/pr-style.yml@main + + lint: + uses: numary/gh-workflows/.github/workflows/golang-lint.yml@main + + test: + uses: numary/gh-workflows/.github/workflows/golang-test.yml@main + + goreleaser-build: + if: github.event_name != 'release' + uses: numary/gh-workflows/.github/workflows/goreleaser-build.yml@main + needs: + - lint + - test + + goreleaser-release: + if: github.event_name == 'release' + uses: numary/gh-workflows/.github/workflows/goreleaser-release.yml@main + secrets: + FURY_TOKEN: ${{ secrets.FURY_TOKEN }} + NUMARY_GITHUB_TOKEN: ${{ secrets.NUMARY_GITHUB_TOKEN }} + needs: + - lint + - test + + docker-build-push: + runs-on: ubuntu-latest + needs: + - lint + - test + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-go@v3 + with: + go-version-file: 'go.mod' + cache: true + - run: go mod vendor + - uses: docker/setup-qemu-action@v2 + - uses: docker/setup-buildx-action@v2 + - uses: docker/login-action@v2 + with: + registry: ghcr.io + username: "NumaryBot" + password: ${{ secrets.NUMARY_GITHUB_TOKEN }} + - if: github.event.action == 'released' + uses: docker/build-push-action@v3 + with: + context: . + platforms: linux/amd64,linux/arm64 + push: true + tags: ghcr.io/formancehq/payments:latest,ghcr.io/formancehq/payments:${{ github.event.release.tag_name }} + build-args: | + APP_SHA=${{ github.sha }} + VERSION=${{ github.event.release.tag_name }} + - if: github.event.action == 'prereleased' + uses: docker/build-push-action@v3 + with: + context: . + platforms: linux/amd64,linux/arm64 + push: true + tags: ghcr.io/formancehq/payments:${{ github.event.release.tag_name }} + build-args: | + APP_SHA=${{ github.sha }} + VERSION=${{ github.event.release.tag_name }} + - if: github.event.action != 'released' || github.event.action != 'prereleased' + uses: docker/build-push-action@v3 + with: + context: . + platforms: linux/amd64,linux/arm64 + push: true + tags: ghcr.io/formancehq/payments:${{ github.sha }} + build-args: | + APP_SHA=${{ github.sha }} + VERSION=develop diff --git a/.gitrepo b/.gitrepo index cb7951fb..c765f0b6 100644 --- a/.gitrepo +++ b/.gitrepo @@ -6,7 +6,7 @@ [subrepo] remote = payments branch = main - commit = e4c0f219fda7416c2a3d30fe7ec4184870134999 - parent = b2827a506b6324812752e3689b2a4845480a397e + commit = 5e3b64477ac01e4101e043d7615f6eb7e89770cd + parent = dbe6ffd24f70f31891411c03c2377ad30d32160c method = merge cmdver = 0.4.5 diff --git a/.golangci.yml b/.golangci.yml index e1845e02..f72d4d05 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,4 +1,6 @@ linters-settings: + interfacebloat: + max: 20 dupl: threshold: 30 funlen: diff --git a/.goreleaser.yml b/.goreleaser.yml index 8f25add6..56fe0d0e 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -1,35 +1,44 @@ project_name: payments - -monorepo: - tag_prefix: components/payments/ - dir: ./components/payments/ +env: + - GO111MODULE=on + - GOPROXY=https://proxy.golang.org +before: + hooks: + - go mod download builds: - binary: payments id: payments ldflags: - - -X github.com/numary/payments/cmd.BuildDate={{ .Date }} - - -X github.com/numary/payments/cmd.Version={{ .Version }} - - -X github.com/numary/payments/cmd.Commit={{ .ShortCommit }} + - -X github.com/formancehq/payments/cmd.BuildDate={{ .Date }} + - -X github.com/formancehq/payments/cmd.Version={{ .Version }} + - -X github.com/formancehq/payments/cmd.Commit={{ .ShortCommit }} - -extldflags "-static" env: - CGO_ENABLED=0 goos: + - windows - linux + - darwin goarch: - amd64 - arm64 + flags: + - -tags=json1 archives: - - id: "{{.ProjectName}}" + - id: "payments" builds: - payments format: tar.gz + format_overrides: + - goos: windows + format: zip name_template: "{{.ProjectName}}_{{.Os}}-{{.Arch}}" checksum: - name_template: '{{.ProjectName}}_checksums.txt' + name_template: 'checksums.txt' snapshot: name_template: "{{ .Tag }}" @@ -58,47 +67,40 @@ changelog: release: prerelease: auto - draft: true footer: | + **Full Changelog**: https://github.com/formancehq/payments/compare/{{ .PreviousTag }}...{{ .Tag }} ## What to do next? - - Read the [documentation](https://docs.formance.com/) - - Join our [Slack server](https://formance.com/slack) + - Read the [documentation](https://docs.formance.com/oss/payments/get-started/installation) + - Join our [Discord server](https://discord.gg/xyHvcbzk4w) + +brews: + - tap: + owner: numary + name: homebrew-tap + name: payments + folder: Formula + homepage: https://formance.com + skip_upload: 'false' + test: | + system "#{bin}/payments version" + install: | + bin.install "payments" +nfpms: + - id: packages + package_name: payments + file_name_template: "{{ .ProjectName }}_{{ .Version }}_{{ .Os }}_{{ .Arch }}" + builds: + - payments + homepage: https://formance.com + maintainer: Maxence Maireaux + formats: + - deb + - rpm -dockers: - - image_templates: ["ghcr.io/formancehq/{{ .ProjectName }}:v{{ .Version }}-amd64"] - dockerfile: build.Dockerfile - use: buildx - build_flag_templates: - - --platform=linux/amd64 - - --label=org.opencontainers.image.title={{ .ProjectName }} - - --label=org.opencontainers.image.description={{ .ProjectName }} - - --label=org.opencontainers.image.url=https://github.com/formancehq/stack - - --label=org.opencontainers.image.source=https://github.com/formancehq/stack - - --label=org.opencontainers.image.version={{ .Version }} - - --label=org.opencontainers.image.created={{ time "2006-01-02T15:04:05Z07:00" }} - - --label=org.opencontainers.image.revision={{ .FullCommit }} - - --label=org.opencontainers.image.licenses=MIT - - image_templates: [ "ghcr.io/formancehq/{{ .ProjectName }}:v{{ .Version }}-arm64" ] - goarch: arm64 - dockerfile: build.Dockerfile - use: buildx - build_flag_templates: - - --platform=linux/arm64/v8 - - --label=org.opencontainers.image.title={{ .ProjectName }} - - --label=org.opencontainers.image.description={{ .ProjectName }} - - --label=org.opencontainers.image.url=https://github.com/formancehq/stack - - --label=org.opencontainers.image.source=https://github.com/formancehq/stack - - --label=org.opencontainers.image.version={{ .Version }} - - --label=org.opencontainers.image.created={{ time "2006-01-02T15:04:05Z07:00" }} - - --label=org.opencontainers.image.revision={{ .FullCommit }} - - --label=org.opencontainers.image.licenses=MIT -docker_manifests: - - name_template: ghcr.io/formancehq/{{ .ProjectName }}:v{{ .Version }} - image_templates: - - ghcr.io/formancehq/{{ .ProjectName }}:v{{ .Version }}-amd64 - - ghcr.io/formancehq/{{ .ProjectName }}:v{{ .Version }}-arm64 - - name_template: ghcr.io/formancehq/{{ .ProjectName }}:latest - image_templates: - - ghcr.io/formancehq/{{ .ProjectName }}:v{{ .Version }}-amd64 - - ghcr.io/formancehq/{{ .ProjectName }}:v{{ .Version }}-arm64 +publishers: + - name: fury.io + ids: + - packages + dir: "{{ dir .ArtifactPath }}" + cmd: curl -F package=@{{ .ArtifactName }} https://{{ .Env.FURY_TOKEN }}@push.fury.io/numary/ diff --git a/Dockerfile b/Dockerfile index 795ae6b6..ce2b6610 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,19 +1,35 @@ -FROM golang:1.18 AS builder +FROM golang:1.19.3-bullseye AS builder + +RUN apt-get update && \ + apt-get install -y gcc-aarch64-linux-gnu gcc-x86-64-linux-gnu && \ + ln -s /usr/bin/aarch64-linux-gnu-gcc /usr/bin/arm64-linux-gnu-gcc && \ + ln -s /usr/bin/x86_64-linux-gnu-gcc /usr/bin/amd64-linux-gnu-gcc + +ARG TARGETARCH ARG APP_SHA ARG VERSION -WORKDIR /src + +WORKDIR /go/src/github.com/formancehq/payments + +# get deps first so it's cached COPY . . -WORKDIR /src/components/payments -RUN go mod download -RUN GOOS=linux go build -o payments \ - -ldflags="-X $(cat go.mod |head -1|cut -d \ -f2)/cmd.Version=${VERSION} \ - -X $(cat go.mod |head -1|cut -d \ -f2)/cmd.BuildDate=$(date +%s) \ - -X $(cat go.mod |head -1|cut -d \ -f2)/cmd.Commit=${APP_SHA}" ./ - -FROM ubuntu:jammy -RUN apt update && apt install -y ca-certificates curl && rm -rf /var/lib/apt/lists/* -COPY --from=builder /src/components/payments/payments /payments -EXPOSE 3068 -ENV OTEL_SERVICE_NAME payments -ENTRYPOINT ["/payments"] -CMD ["serve"] + +RUN go mod vendor + +RUN CGO_ENABLED=0 GOOS=linux GOARCH=$TARGETARCH \ + CC=$TARGETARCH-linux-gnu-gcc \ + go build -o bin/payments \ + -ldflags="-X github.com/formancehq/payments/cmd.Version=${VERSION} \ + -X github.com/formancehq/payments/cmd.BuildDate=$(date +%s) \ + -X github.com/formancehq/payments/cmd.Commit=${APP_SHA}" ./ + +FROM scratch + +COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ +COPY --from=builder /go/src/github.com/formancehq/payments/bin/payments /usr/local/bin/payments + +EXPOSE 8080 + +ENTRYPOINT ["payments"] +ENV OTEL_SERVICE_NAME=payments +CMD ["server"] diff --git a/build.Dockerfile b/build.Dockerfile deleted file mode 100644 index a3e341d4..00000000 --- a/build.Dockerfile +++ /dev/null @@ -1,6 +0,0 @@ -FROM ubuntu:jammy -RUN apt update && apt install -y ca-certificates curl && rm -rf /var/lib/apt/lists/* -COPY payments /usr/bin/payments -ENV OTEL_SERVICE_NAME payments -ENTRYPOINT ["/usr/bin/payments"] -CMD ["serve"] diff --git a/cmd/migrate.go b/cmd/migrate.go index 0d12d6b6..01a97e54 100644 --- a/cmd/migrate.go +++ b/cmd/migrate.go @@ -4,10 +4,11 @@ import ( "fmt" "log" + "github.com/formancehq/payments/internal/app/migrations" + "github.com/spf13/viper" - // allow blank import to initiate migrations. - _ "github.com/formancehq/payments/internal/app/migrations" + // Import the postgres driver. _ "github.com/lib/pq" "github.com/pressly/goose/v3" @@ -48,6 +49,15 @@ func runMigrate(cmd *cobra.Command, args []string) error { return fmt.Errorf("postgres uri is not set") } + cfgEncryptionKey := viper.GetString(configEncryptionKeyFlag) + if cfgEncryptionKey == "" { + cfgEncryptionKey = cmd.Flag(configEncryptionKeyFlag).Value.String() + } + + if cfgEncryptionKey != "" { + migrations.EncryptionKey = cfgEncryptionKey + } + database, err := goose.OpenDBWithDriver("postgres", postgresURI) if err != nil { return fmt.Errorf("failed to open database: %w", err) diff --git a/cmd/root.go b/cmd/root.go index 4f3facb8..f60b270a 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -43,9 +43,11 @@ func rootCommand() *cobra.Command { root.PersistentFlags().Bool(debugFlag, false, "Debug mode") migrate.Flags().String(postgresURIFlag, "postgres://localhost/payments", "PostgreSQL DB address") + migrate.Flags().String(configEncryptionKeyFlag, "", "Config encryption key") server.Flags().BoolP("toggle", "t", false, "Help message for toggle") server.Flags().String(postgresURIFlag, "postgres://localhost/payments", "PostgreSQL DB address") + server.Flags().String(configEncryptionKeyFlag, "", "Config encryption key") server.Flags().String(envFlag, "local", "Environment") server.Flags().Bool(publisherKafkaEnabledFlag, false, "Publish write events to kafka") server.Flags().StringSlice(publisherKafkaBrokerFlag, []string{}, "Kafka address is kafka enabled") diff --git a/cmd/server.go b/cmd/server.go index b872227f..53759785 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -30,6 +30,7 @@ import ( //nolint:gosec // false positive const ( postgresURIFlag = "postgres-uri" + configEncryptionKeyFlag = "config-encryption-key" otelTracesFlag = "otel-traces" envFlag = "env" publisherKafkaEnabledFlag = "publisher-kafka-enabled" @@ -157,7 +158,12 @@ func prepareDatabaseOptions() (fx.Option, error) { return nil, errors.New("missing postgres uri") } - return storage.Module(postgresURI), nil + configEncryptionKey := viper.GetString(configEncryptionKeyFlag) + if configEncryptionKey == "" { + return nil, errors.New("missing config encryption key") + } + + return storage.Module(postgresURI, configEncryptionKey), nil } func topicsMapping() map[string]string { diff --git a/internal/app/api/connector.go b/internal/app/api/connector.go index 7cbf60e2..6f2256cd 100644 --- a/internal/app/api/connector.go +++ b/internal/app/api/connector.go @@ -1,12 +1,12 @@ package api import ( - "context" "encoding/json" - "errors" "net/http" "time" + "github.com/pkg/errors" + "github.com/formancehq/payments/internal/app/storage" "github.com/google/uuid" @@ -51,6 +51,10 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) { func readConfig[Config models.ConnectorConfigObject](connectorManager *integration.ConnectorManager[Config], ) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + if connectorNotInstalled(connectorManager, w, r) { + return + } + config, err := connectorManager.ReadConfig(r.Context()) if err != nil { handleError(w, r, err) @@ -81,6 +85,10 @@ type listTasksResponseElement struct { func listTasks[Config models.ConnectorConfigObject](connectorManager *integration.ConnectorManager[Config], ) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + if connectorNotInstalled(connectorManager, w, r) { + return + } + pageSize, err := pageSizeQueryParam(r) if err != nil { handleValidationError(w, r, err) @@ -134,6 +142,10 @@ func listTasks[Config models.ConnectorConfigObject](connectorManager *integratio func readTask[Config models.ConnectorConfigObject](connectorManager *integration.ConnectorManager[Config], ) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + if connectorNotInstalled(connectorManager, w, r) { + return + } + taskID, err := uuid.Parse(mux.Vars(r)["taskID"]) if err != nil { handleErrorBadRequest(w, r, err) @@ -171,6 +183,10 @@ func readTask[Config models.ConnectorConfigObject](connectorManager *integration func uninstall[Config models.ConnectorConfigObject](connectorManager *integration.ConnectorManager[Config], ) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + if connectorNotInstalled(connectorManager, w, r) { + return + } + err := connectorManager.Uninstall(r.Context()) if err != nil { handleError(w, r, err) @@ -185,7 +201,7 @@ func uninstall[Config models.ConnectorConfigObject](connectorManager *integratio func install[Config models.ConnectorConfigObject](connectorManager *integration.ConnectorManager[Config], ) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - installed, err := connectorManager.IsInstalled(context.Background()) + installed, err := connectorManager.IsInstalled(r.Context()) if err != nil { handleError(w, r, err) @@ -222,7 +238,127 @@ func install[Config models.ConnectorConfigObject](connectorManager *integration. func reset[Config models.ConnectorConfigObject](connectorManager *integration.ConnectorManager[Config], ) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - installed, err := connectorManager.IsInstalled(context.Background()) + if connectorNotInstalled(connectorManager, w, r) { + return + } + + err := connectorManager.Reset(r.Context()) + if err != nil { + handleError(w, r, err) + + return + } + + w.WriteHeader(http.StatusNoContent) + } +} + +type transferRequest struct { + Amount int64 `json:"amount"` + Source string `json:"source"` + Destination string `json:"destination"` + Asset string `json:"asset"` + + currency string +} + +func (req *transferRequest) validate() error { + if req.Amount <= 0 { + return errors.New("amount must be greater than 0") + } + + if req.Asset == "" { + return errors.New("asset is required") + } + + if len(req.Asset) < 3 { //nolint:gomnd // allow length 3 for POC + return errors.New("asset is invalid") + } + + req.currency = req.Asset[:3] + + if req.Destination == "" { + return errors.New("destination is required") + } + + return nil +} + +type initiateTransferResponse struct { + ID string `json:"id"` +} + +func initiateTransfer[Config models.ConnectorConfigObject](connectorManager *integration.ConnectorManager[Config], +) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req transferRequest + + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + handleError(w, r, err) + + return + } + + err = req.validate() + if err != nil { + handleErrorBadRequest(w, r, err) + + return + } + + installed, err := connectorManager.IsInstalled(r.Context()) + if err != nil { + handleError(w, r, err) + + return + } + + if !installed { + handleError(w, r, errors.New("connector not installed")) + + return + } + + transfer := integration.Transfer{ + Source: req.Source, + Destination: req.Destination, + Currency: req.currency, + Amount: req.Amount, + } + + transferID, err := connectorManager.InitiateTransfer(r.Context(), transfer) + if err != nil { + handleError(w, r, err) + + return + } + + err = json.NewEncoder(w).Encode(api.BaseResponse[initiateTransferResponse]{ + Data: &initiateTransferResponse{ + ID: transferID.String(), + }, + }) + if err != nil { + panic(err) + } + } +} + +type listTransfersResponseElement struct { + ID string `json:"id"` + Source string `json:"source"` + Destination string `json:"destination"` + Amount int64 `json:"amount"` + Currency string `json:"asset"` + Status string `json:"status"` + Error *string `json:"error"` +} + +func listTransfers[Config models.ConnectorConfigObject](connectorManager *integration.ConnectorManager[Config], +) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + installed, err := connectorManager.IsInstalled(r.Context()) if err != nil { handleError(w, r, err) @@ -235,13 +371,51 @@ func reset[Config models.ConnectorConfigObject](connectorManager *integration.Co return } - err = connectorManager.Reset(r.Context()) + transfers, err := connectorManager.ListTransfers(r.Context()) if err != nil { handleError(w, r, err) return } - w.WriteHeader(http.StatusNoContent) + response := make([]listTransfersResponseElement, len(transfers)) + + for transferIdx := range transfers { + response[transferIdx] = listTransfersResponseElement{ + ID: transfers[transferIdx].ID.String(), + Source: transfers[transferIdx].Source, + Destination: transfers[transferIdx].Destination, + Amount: transfers[transferIdx].Amount, + Currency: transfers[transferIdx].Currency, + Status: transfers[transferIdx].Status.String(), + Error: transfers[transferIdx].Error, + } + } + + err = json.NewEncoder(w).Encode(api.BaseResponse[[]listTransfersResponseElement]{ + Data: &response, + }) + if err != nil { + panic(err) + } } } + +func connectorNotInstalled[Config models.ConnectorConfigObject](connectorManager *integration.ConnectorManager[Config], + w http.ResponseWriter, r *http.Request, +) bool { + installed, err := connectorManager.IsInstalled(r.Context()) + if err != nil { + handleError(w, r, err) + + return true + } + + if !installed { + handleErrorBadRequest(w, r, integration.ErrNotInstalled) + + return true + } + + return false +} diff --git a/internal/app/api/connectormodule.go b/internal/app/api/connectormodule.go index be8f0d40..5498429e 100644 --- a/internal/app/api/connectormodule.go +++ b/internal/app/api/connectormodule.go @@ -32,7 +32,7 @@ func addConnector[ConnectorConfig models.ConnectorConfigObject](loader integrati fx.Provide(func(store *storage.Storage, publisher publish.Publisher, ) *integration.ConnectorManager[ConnectorConfig] { - logger := logging.GetLogger(context.Background()) + logger := logging.GetLogger(context.TODO()) schedulerFactory := integration.TaskSchedulerFactoryFn(func( resolver task.Resolver, maxTasks int, diff --git a/internal/app/api/router.go b/internal/app/api/router.go index 0c807539..2a074640 100644 --- a/internal/app/api/router.go +++ b/internal/app/api/router.go @@ -46,8 +46,9 @@ func httpRouter(store *storage.Storage, serviceInfo api.ServiceInfo, connectorHa connectorGroup.Path("/configs").Handler(connectorConfigsHandler()) - // TODO: It's not ideal to define it explicitly here - // Refactor it when refactoring the HTTP lib. + // Deprecated + // TODO: Remove this endpoint + // Use /connectors/stripe/transfers instead connectorGroup.Path("/stripe/transfers").Methods(http.MethodPost). Handler(handleStripeTransfers(store)) @@ -74,6 +75,8 @@ func connectorRouter[Config models.ConnectorConfigObject]( addRoute(r, provider, "/reset", http.MethodPost, reset(manager)) addRoute(r, provider, "/tasks", http.MethodGet, listTasks(manager)) addRoute(r, provider, "/tasks/{taskID}", http.MethodGet, readTask(manager)) + addRoute(r, provider, "/transfers", http.MethodPost, initiateTransfer(manager)) + addRoute(r, provider, "/transfers", http.MethodGet, listTransfers(manager)) return r } diff --git a/internal/app/api/stripe.go b/internal/app/api/stripe.go index 1f17272b..e0507a7a 100644 --- a/internal/app/api/stripe.go +++ b/internal/app/api/stripe.go @@ -5,6 +5,8 @@ import ( "encoding/json" "net/http" + "github.com/formancehq/payments/internal/app/integration" + "github.com/formancehq/go-libs/api" "github.com/formancehq/payments/internal/app/models" @@ -49,13 +51,27 @@ func (req *stripeTransferRequest) validate() error { type stripeTransfersRepository interface { GetConfig(ctx context.Context, connectorName models.ConnectorProvider, cfg any) error + IsInstalled(ctx context.Context, provider models.ConnectorProvider) (bool, error) } func handleStripeTransfers(repo stripeTransfersRepository) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + installed, err := repo.IsInstalled(r.Context(), stripeConnector.Name) + if err != nil { + handleError(w, r, err) + + return + } + + if !installed { + handleErrorBadRequest(w, r, integration.ErrNotInstalled) + + return + } + var cfg stripeConnector.Config - if err := repo.GetConfig(r.Context(), stripeConnector.Name, &cfg); err != nil { + if err = repo.GetConfig(r.Context(), stripeConnector.Name, &cfg); err != nil { handleError(w, r, err) return @@ -65,7 +81,7 @@ func handleStripeTransfers(repo stripeTransfersRepository) http.HandlerFunc { var transferRequest stripeTransferRequest - err := json.NewDecoder(r.Body).Decode(&transferRequest) + err = json.NewDecoder(r.Body).Decode(&transferRequest) if err != nil { handleError(w, r, err) diff --git a/internal/app/connectors/bankingcircle/config.go b/internal/app/connectors/bankingcircle/config.go index 233e09c6..cc257a3d 100644 --- a/internal/app/connectors/bankingcircle/config.go +++ b/internal/app/connectors/bankingcircle/config.go @@ -2,6 +2,7 @@ package bankingcircle import ( "encoding/json" + "fmt" "github.com/formancehq/payments/internal/app/connectors/configtemplate" ) @@ -13,6 +14,12 @@ type Config struct { AuthorizationEndpoint string `json:"authorizationEndpoint" yaml:"authorizationEndpoint" bson:"authorizationEndpoint"` } +// String obfuscates sensitive fields and returns a string representation of the config. +// This is used for logging. +func (c Config) String() string { + return fmt.Sprintf("username=%s, password=****, endpoint=%s, authorizationEndpoint=%s", c.Username, c.Endpoint, c.AuthorizationEndpoint) +} + func (c Config) Validate() error { if c.Username == "" { return ErrMissingUsername diff --git a/internal/app/connectors/currencycloud/config.go b/internal/app/connectors/currencycloud/config.go index ea9c3673..3015566c 100644 --- a/internal/app/connectors/currencycloud/config.go +++ b/internal/app/connectors/currencycloud/config.go @@ -2,6 +2,7 @@ package currencycloud import ( "encoding/json" + "fmt" "time" "github.com/formancehq/payments/internal/app/connectors/configtemplate" @@ -14,6 +15,12 @@ type Config struct { PollingPeriod Duration `json:"pollingPeriod" bson:"pollingPeriod"` } +// String obfuscates sensitive fields and returns a string representation of the config. +// This is used for logging. +func (c Config) String() string { + return fmt.Sprintf("loginID=%s, endpoint=%s, pollingPeriod=%s, apiKey=****", c.LoginID, c.Endpoint, c.PollingPeriod.String()) +} + func (c Config) Validate() error { if c.APIKey == "" { return ErrMissingAPIKey diff --git a/internal/app/connectors/currencycloud/connector.go b/internal/app/connectors/currencycloud/connector.go index 69105ba5..cb6e3876 100644 --- a/internal/app/connectors/currencycloud/connector.go +++ b/internal/app/connectors/currencycloud/connector.go @@ -17,6 +17,11 @@ type Connector struct { cfg Config } +func (c *Connector) InitiateTransfer(ctx task.ConnectorContext, transfer models.Transfer) error { + // TODO implement me + panic("implement me") +} + func (c *Connector) Install(ctx task.ConnectorContext) error { taskDescriptor, err := models.EncodeTaskDescriptor(TaskDescriptor{Name: taskNameFetchTransactions}) if err != nil { diff --git a/internal/app/connectors/dummypay/config.go b/internal/app/connectors/dummypay/config.go index f45ffe50..40cde2a2 100644 --- a/internal/app/connectors/dummypay/config.go +++ b/internal/app/connectors/dummypay/config.go @@ -23,7 +23,7 @@ type Config struct { // String returns a string representation of the configuration. func (c Config) String() string { - return fmt.Sprintf("directory: %s, filePollingPeriod: %s, fileGenerationPeriod: %s", + return fmt.Sprintf("directory=%s, filePollingPeriod=%s, fileGenerationPeriod=%s", c.Directory, c.FilePollingPeriod.String(), c.FileGenerationPeriod.String()) } diff --git a/internal/app/connectors/dummypay/config_test.go b/internal/app/connectors/dummypay/config_test.go index 3375c6da..8eba6239 100644 --- a/internal/app/connectors/dummypay/config_test.go +++ b/internal/app/connectors/dummypay/config_test.go @@ -19,7 +19,7 @@ func TestConfigString(t *testing.T) { FileGenerationPeriod: connectors.Duration{Duration: time.Minute}, } - assert.Equal(t, "directory: test, filePollingPeriod: 1s, fileGenerationPeriod: 1m0s", config.String()) + assert.Equal(t, "directory=test, filePollingPeriod=1s, fileGenerationPeriod=1m0s", config.String()) } // TestConfigValidate tests the validation of the config. diff --git a/internal/app/connectors/dummypay/connector.go b/internal/app/connectors/dummypay/connector.go index ca394866..9cfef268 100644 --- a/internal/app/connectors/dummypay/connector.go +++ b/internal/app/connectors/dummypay/connector.go @@ -4,6 +4,8 @@ import ( "context" "fmt" + "github.com/formancehq/payments/internal/app/integration" + "github.com/formancehq/payments/internal/app/models" "github.com/formancehq/payments/internal/app/task" @@ -21,6 +23,11 @@ type Connector struct { fs fs } +func (c *Connector) InitiateTransfer(ctx task.ConnectorContext, transfer models.Transfer) error { + // TODO implement me + panic("implement me") +} + // Install executes post-installation steps to read and generate files. // It is called after the connector is installed. func (c *Connector) Install(ctx task.ConnectorContext) error { @@ -70,6 +77,8 @@ func (c *Connector) Resolve(descriptor models.TaskDescriptor) task.Task { return handleResolve(c.cfg, taskDescriptor, c.fs) } +var _ integration.Connector = &Connector{} + func newConnector(logger logging.Logger, cfg Config, fs fs) *Connector { return &Connector{ logger: logger.WithFields(map[string]any{ diff --git a/internal/app/connectors/dummypay/connector_test.go b/internal/app/connectors/dummypay/connector_test.go index 39576448..46ebe01d 100644 --- a/internal/app/connectors/dummypay/connector_test.go +++ b/internal/app/connectors/dummypay/connector_test.go @@ -37,7 +37,7 @@ func TestConnector(t *testing.T) { t.Parallel() config := Config{} - logger := logging.GetLogger(context.Background()) + logger := logging.GetLogger(context.TODO()) fileSystem := newTestFS() @@ -75,5 +75,5 @@ func TestConnector(t *testing.T) { reflect.ValueOf(connector.Resolve(taskDescriptor)).String(), ) - assert.NoError(t, connector.Uninstall(context.Background())) + assert.NoError(t, connector.Uninstall(context.TODO())) } diff --git a/internal/app/connectors/dummypay/loader_test.go b/internal/app/connectors/dummypay/loader_test.go index 3e0776e1..26a6c427 100644 --- a/internal/app/connectors/dummypay/loader_test.go +++ b/internal/app/connectors/dummypay/loader_test.go @@ -15,7 +15,7 @@ func TestLoader(t *testing.T) { t.Parallel() config := Config{} - logger := logging.GetLogger(context.Background()) + logger := logging.GetLogger(context.TODO()) loader := NewLoader() diff --git a/internal/app/connectors/modulr/config.go b/internal/app/connectors/modulr/config.go index da0ba040..b2ca2636 100644 --- a/internal/app/connectors/modulr/config.go +++ b/internal/app/connectors/modulr/config.go @@ -2,6 +2,7 @@ package modulr import ( "encoding/json" + "fmt" "github.com/formancehq/payments/internal/app/connectors/configtemplate" ) @@ -12,6 +13,12 @@ type Config struct { Endpoint string `json:"endpoint" bson:"endpoint"` } +// String obfuscates sensitive fields and returns a string representation of the config. +// This is used for logging. +func (c Config) String() string { + return fmt.Sprintf("endpoint=%s, apiSecret=***, apiKey=****", c.Endpoint) +} + func (c Config) Validate() error { if c.APIKey == "" { return ErrMissingAPIKey diff --git a/internal/app/connectors/modulr/connector.go b/internal/app/connectors/modulr/connector.go index f660d132..4b3b3a49 100644 --- a/internal/app/connectors/modulr/connector.go +++ b/internal/app/connectors/modulr/connector.go @@ -17,6 +17,11 @@ type Connector struct { cfg Config } +func (c *Connector) InitiateTransfer(ctx task.ConnectorContext, transfer models.Transfer) error { + // TODO implement me + panic("implement me") +} + func (c *Connector) Install(ctx task.ConnectorContext) error { taskDescriptor, err := models.EncodeTaskDescriptor(TaskDescriptor{ Name: "Fetch accounts from client", diff --git a/internal/app/connectors/stripe/config.go b/internal/app/connectors/stripe/config.go index 2edb0634..ec61d930 100644 --- a/internal/app/connectors/stripe/config.go +++ b/internal/app/connectors/stripe/config.go @@ -16,8 +16,10 @@ type Config struct { TimelineConfig `bson:",inline"` } +// String obfuscates sensitive fields and returns a string representation of the config. +// This is used for logging. func (c Config) String() string { - return fmt.Sprintf("pollingPeriod=%d, pageSize=%d, apiKey=%s", c.PollingPeriod, c.PageSize, c.APIKey) + return fmt.Sprintf("pollingPeriod=%d, pageSize=%d, apiKey=****", c.PollingPeriod, c.PageSize) } func (c Config) Validate() error { diff --git a/internal/app/connectors/stripe/connector.go b/internal/app/connectors/stripe/connector.go index 06596c0b..da3a943b 100644 --- a/internal/app/connectors/stripe/connector.go +++ b/internal/app/connectors/stripe/connector.go @@ -3,6 +3,8 @@ package stripe import ( "context" + "github.com/google/uuid" + "github.com/formancehq/payments/internal/app/models" "github.com/formancehq/payments/internal/app/integration" @@ -44,9 +46,25 @@ func (c *Connector) Resolve(descriptor models.TaskDescriptor) task.Task { return MainTask(c.cfg) } + if taskDescriptor.TransferID != uuid.Nil { + return TransferTask(c.cfg, taskDescriptor.TransferID) + } + return ConnectedAccountTask(c.cfg, taskDescriptor.Account) } +func (c *Connector) InitiateTransfer(ctx task.ConnectorContext, transfer models.Transfer) error { + descriptor, err := models.EncodeTaskDescriptor(TaskDescriptor{ + Name: "Task to initiate transfer", + TransferID: transfer.ID, + }) + if err != nil { + return err + } + + return ctx.Scheduler().Schedule(descriptor, false) +} + var _ integration.Connector = &Connector{} func newConnector(logger logging.Logger, cfg Config) *Connector { diff --git a/internal/app/connectors/stripe/descriptor.go b/internal/app/connectors/stripe/descriptor.go index dd460de7..b6e79db1 100644 --- a/internal/app/connectors/stripe/descriptor.go +++ b/internal/app/connectors/stripe/descriptor.go @@ -1,7 +1,10 @@ package stripe +import "github.com/google/uuid" + type TaskDescriptor struct { - Name string `json:"name" yaml:"name" bson:"name"` - Main bool `json:"main,omitempty" yaml:"main" bson:"main"` - Account string `json:"account,omitempty" yaml:"account" bson:"account"` + Name string `json:"name" yaml:"name" bson:"name"` + Main bool `json:"main,omitempty" yaml:"main" bson:"main"` + Account string `json:"account,omitempty" yaml:"account" bson:"account"` + TransferID uuid.UUID `json:"transferID,omitempty" yaml:"transferID" bson:"transferID"` } diff --git a/internal/app/connectors/stripe/runner_test.go b/internal/app/connectors/stripe/runner_test.go index 13a556fd..690b3a48 100644 --- a/internal/app/connectors/stripe/runner_test.go +++ b/internal/app/connectors/stripe/runner_test.go @@ -28,16 +28,16 @@ func TestStopTailing(t *testing.T) { MoreRecentID: "tx2", }) - logger := logging.GetLogger(context.Background()) + logger := logging.GetLogger(context.TODO()) trigger := NewTimelineTrigger(logger, NoOpIngester, timeline) r := NewRunner(logger, trigger, time.Second) go func() { - _ = r.Run(context.Background()) + _ = r.Run(context.TODO()) }() defer func() { - _ = r.Stop(context.Background()) + _ = r.Stop(context.TODO()) }() require.False(t, timeline.state.NoMoreHistory) diff --git a/internal/app/connectors/stripe/task_transfer.go b/internal/app/connectors/stripe/task_transfer.go new file mode 100644 index 00000000..c7e35cf4 --- /dev/null +++ b/internal/app/connectors/stripe/task_transfer.go @@ -0,0 +1,59 @@ +package stripe + +import ( + "context" + "fmt" + + "github.com/formancehq/payments/internal/app/models" + + "github.com/stripe/stripe-go/v72/transfer" + + "github.com/formancehq/go-libs/logging" + + "github.com/google/uuid" + "github.com/stripe/stripe-go/v72" + + "github.com/formancehq/payments/internal/app/ingestion" +) + +func TransferTask(config Config, transferID uuid.UUID) func(ctx context.Context, ingester ingestion.Ingester, logger logging.Logger) error { + return func(ctx context.Context, ingester ingestion.Ingester, logger logging.Logger) error { + transferToExecute, err := ingester.GetTransfer(ctx, transferID) + if err != nil { + return fmt.Errorf("failed to get transfer: %w", err) + } + + stripe.Key = config.APIKey + + params := &stripe.TransferParams{ + Params: stripe.Params{ + Context: ctx, + }, + Amount: stripe.Int64(transferToExecute.Amount), + Currency: stripe.String(transferToExecute.Currency), + Destination: stripe.String(transferToExecute.Destination), + } + + var ( + transferError string + transferStatus = models.TransferStatusSucceeded + ) + + transferResponse, err := transfer.New(params) + if err != nil { + transferError = err.Error() + logger.Errorf("failed to create transfer (%s): %v", transferID, err) + } + + if transferError != "" { + transferStatus = models.TransferStatusFailed + } + + err = ingester.UpdateTransferStatus(ctx, transferID, transferStatus, transferResponse.BalanceTransaction.ID, transferError) + if err != nil { + return fmt.Errorf("failed to update transfer status: %w", err) + } + + return nil + } +} diff --git a/internal/app/connectors/stripe/timeline_test.go b/internal/app/connectors/stripe/timeline_test.go index f576717d..c5a1af62 100644 --- a/internal/app/connectors/stripe/timeline_test.go +++ b/internal/app/connectors/stripe/timeline_test.go @@ -34,7 +34,7 @@ func TestTimeline(t *testing.T) { RespondsWith(true, tx1, tx2) ret := make([]*stripe.BalanceTransaction, 0) - hasMore, state, commit, err := timeline.Tail(context.Background(), &ret) + hasMore, state, commit, err := timeline.Tail(context.TODO(), &ret) require.NoError(t, err) require.True(t, hasMore) require.Equal(t, TimelineState{ @@ -54,7 +54,7 @@ func TestTimeline(t *testing.T) { mock.Expect().Limit(2).StartingAfter(tx2.ID).RespondsWith(false, tx3) - hasMore, state, _, err = timeline.Tail(context.Background(), &ret) + hasMore, state, _, err = timeline.Tail(context.TODO(), &ret) require.NoError(t, err) require.False(t, hasMore) require.Equal(t, TimelineState{ diff --git a/internal/app/connectors/stripe/timeline_trigger_test.go b/internal/app/connectors/stripe/timeline_trigger_test.go index 9292272f..b6c3ff59 100644 --- a/internal/app/connectors/stripe/timeline_trigger_test.go +++ b/internal/app/connectors/stripe/timeline_trigger_test.go @@ -24,7 +24,7 @@ func TestTimelineTrigger(t *testing.T) { ingestedTx := make([]*stripe.BalanceTransaction, 0) trigger := NewTimelineTrigger( - logging.GetLogger(context.Background()), + logging.GetLogger(context.TODO()), IngesterFn(func(ctx context.Context, batch []*stripe.BalanceTransaction, commitState TimelineState, tail bool) error { ingestedTx = append(ingestedTx, batch...) @@ -53,7 +53,7 @@ func TestTimelineTrigger(t *testing.T) { mock.Expect().Limit(2).RespondsWith(i < txCount/2-2, allTxs[txCount/2-i-2], allTxs[txCount/2-i-1]) } - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second)) + ctx, cancel := context.WithDeadline(context.TODO(), time.Now().Add(time.Second)) defer cancel() require.NoError(t, trigger.Fetch(ctx)) @@ -73,7 +73,7 @@ func TestCancelTimelineTrigger(t *testing.T) { waiting := make(chan struct{}) trigger := NewTimelineTrigger( - logging.GetLogger(context.Background()), + logging.GetLogger(context.TODO()), IngesterFn(func(ctx context.Context, batch []*stripe.BalanceTransaction, commitState TimelineState, tail bool) error { close(waiting) // Instruct the test the trigger is in fetching state <-ctx.Done() @@ -93,13 +93,13 @@ func TestCancelTimelineTrigger(t *testing.T) { go func() { // TODO: Handle error - _ = trigger.Fetch(context.Background()) + _ = trigger.Fetch(context.TODO()) }() select { case <-time.After(time.Second): t.Fatalf("timeout") case <-waiting: - trigger.Cancel(context.Background()) + trigger.Cancel(context.TODO()) require.NotEmpty(t, mock.expectations) } } diff --git a/internal/app/connectors/stripe/translate.go b/internal/app/connectors/stripe/translate.go index 4249100c..a5b9a040 100644 --- a/internal/app/connectors/stripe/translate.go +++ b/internal/app/connectors/stripe/translate.go @@ -173,7 +173,7 @@ func CreateBatchElement(balanceTransaction *stripe.BalanceTransaction, forward b switch balanceTransaction.Type { case stripe.BalanceTransactionTypeCharge: payment = models.Payment{ - Reference: balanceTransaction.Source.Charge.ID, + Reference: balanceTransaction.ID, Type: models.PaymentTypePayIn, Status: models.PaymentStatusSucceeded, Amount: balanceTransaction.Source.Charge.Amount, @@ -184,7 +184,7 @@ func CreateBatchElement(balanceTransaction *stripe.BalanceTransaction, forward b } case stripe.BalanceTransactionTypePayout: payment = models.Payment{ - Reference: balanceTransaction.Source.Payout.ID, + Reference: balanceTransaction.ID, Type: models.PaymentTypePayOut, Status: convertPayoutStatus(balanceTransaction.Source.Payout.Status), Amount: balanceTransaction.Source.Payout.Amount, @@ -204,7 +204,7 @@ func CreateBatchElement(balanceTransaction *stripe.BalanceTransaction, forward b } case stripe.BalanceTransactionTypeTransfer: payment = models.Payment{ - Reference: balanceTransaction.Source.Transfer.ID, + Reference: balanceTransaction.ID, Type: models.PaymentTypePayOut, Status: models.PaymentStatusSucceeded, Amount: balanceTransaction.Source.Transfer.Amount, @@ -215,11 +215,11 @@ func CreateBatchElement(balanceTransaction *stripe.BalanceTransaction, forward b } case stripe.BalanceTransactionTypeRefund: payment = models.Payment{ - Reference: balanceTransaction.Source.Refund.Charge.ID, + Reference: balanceTransaction.ID, Type: models.PaymentTypePayOut, Adjustments: []*models.Adjustment{ { - Reference: balanceTransaction.Source.Refund.Charge.ID, + Reference: balanceTransaction.ID, Status: models.PaymentStatusSucceeded, Amount: balanceTransaction.Amount, CreatedAt: time.Unix(balanceTransaction.Created, 0), @@ -229,7 +229,7 @@ func CreateBatchElement(balanceTransaction *stripe.BalanceTransaction, forward b } case stripe.BalanceTransactionTypePayment: payment = models.Payment{ - Reference: balanceTransaction.Source.Charge.ID, + Reference: balanceTransaction.ID, Type: models.PaymentTypePayIn, Status: models.PaymentStatusSucceeded, Amount: balanceTransaction.Source.Charge.Amount, @@ -240,12 +240,12 @@ func CreateBatchElement(balanceTransaction *stripe.BalanceTransaction, forward b } case stripe.BalanceTransactionTypePayoutCancel: payment = models.Payment{ - Reference: balanceTransaction.Source.Payout.ID, + Reference: balanceTransaction.ID, Type: models.PaymentTypePayOut, Status: models.PaymentStatusFailed, Adjustments: []*models.Adjustment{ { - Reference: balanceTransaction.Source.Payout.ID, + Reference: balanceTransaction.ID, Status: convertPayoutStatus(balanceTransaction.Source.Payout.Status), CreatedAt: time.Unix(balanceTransaction.Created, 0), RawData: rawData, @@ -255,12 +255,12 @@ func CreateBatchElement(balanceTransaction *stripe.BalanceTransaction, forward b } case stripe.BalanceTransactionTypePayoutFailure: payment = models.Payment{ - Reference: balanceTransaction.Source.Payout.ID, + Reference: balanceTransaction.ID, Type: models.PaymentTypePayIn, Status: models.PaymentStatusFailed, Adjustments: []*models.Adjustment{ { - Reference: balanceTransaction.Source.Payout.ID, + Reference: balanceTransaction.ID, Status: convertPayoutStatus(balanceTransaction.Source.Payout.Status), CreatedAt: time.Unix(balanceTransaction.Created, 0), RawData: rawData, @@ -270,12 +270,12 @@ func CreateBatchElement(balanceTransaction *stripe.BalanceTransaction, forward b } case stripe.BalanceTransactionTypePaymentRefund: payment = models.Payment{ - Reference: balanceTransaction.Source.Refund.Charge.ID, + Reference: balanceTransaction.ID, Type: models.PaymentTypePayOut, Status: models.PaymentStatusSucceeded, Adjustments: []*models.Adjustment{ { - Reference: balanceTransaction.Source.Refund.Charge.ID, + Reference: balanceTransaction.ID, Status: models.PaymentStatusSucceeded, Amount: balanceTransaction.Amount, CreatedAt: time.Unix(balanceTransaction.Created, 0), @@ -285,11 +285,11 @@ func CreateBatchElement(balanceTransaction *stripe.BalanceTransaction, forward b } case stripe.BalanceTransactionTypeAdjustment: payment = models.Payment{ - Reference: balanceTransaction.Source.Dispute.Charge.ID, + Reference: balanceTransaction.ID, Type: models.PaymentTypePayOut, Adjustments: []*models.Adjustment{ { - Reference: balanceTransaction.Source.Dispute.Charge.ID, + Reference: balanceTransaction.ID, Status: models.PaymentStatusCancelled, Amount: balanceTransaction.Amount, CreatedAt: time.Unix(balanceTransaction.Created, 0), diff --git a/internal/app/connectors/wise/client.go b/internal/app/connectors/wise/client.go index 754e3708..658208bd 100644 --- a/internal/app/connectors/wise/client.go +++ b/internal/app/connectors/wise/client.go @@ -1,6 +1,7 @@ package wise import ( + "bytes" "context" "encoding/json" "fmt" @@ -8,6 +9,8 @@ import ( "net/http" "time" + "github.com/google/uuid" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) @@ -161,6 +164,66 @@ func (w *client) getTransfers(ctx context.Context, profile *profile) ([]transfer return transfers, nil } +type quote struct { + ID uuid.UUID `json:"id"` +} + +func (w *client) createQuote(profileID uint64, currency string, amount int64) (quote, error) { + var response quote + + req, err := json.Marshal(map[string]interface{}{ + "sourceCurrency": currency, + "targetCurrency": currency, + "sourceAmount": amount, + }) + if err != nil { + return response, err + } + + res, err := w.httpClient.Post(w.endpoint("v3/profiles/"+fmt.Sprint(profileID)+"/quotes"), "application/json", bytes.NewBuffer(req)) + if err != nil { + return response, err + } + + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) + if err != nil { + return response, fmt.Errorf("failed to read response body: %w", err) + } + + err = json.Unmarshal(body, &response) + if err != nil { + return response, fmt.Errorf("failed to unmarshal profiles: %w", err) + } + + return response, nil +} + +func (w *client) createTransfer(quote quote, targetAccount uint64, transactionID string) error { + req, err := json.Marshal(map[string]interface{}{ + "targetAccount": targetAccount, + "quoteUuid": quote.ID.String(), + "customerTransactionId": transactionID, + }) + if err != nil { + return err + } + + res, err := w.httpClient.Post(w.endpoint("v1/transfers"), "application/json", bytes.NewBuffer(req)) + if err != nil { + return err + } + + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status code: %d", res.StatusCode) + } + + return nil +} + func newClient(apiKey string) *client { httpClient := &http.Client{ Transport: &apiTransport{ diff --git a/internal/app/connectors/wise/config.go b/internal/app/connectors/wise/config.go index b1c01d51..31552dd6 100644 --- a/internal/app/connectors/wise/config.go +++ b/internal/app/connectors/wise/config.go @@ -10,6 +10,12 @@ type Config struct { APIKey string `json:"apiKey" yaml:"apiKey" bson:"apiKey"` } +// String obfuscates sensitive fields and returns a string representation of the config. +// This is used for logging. +func (c Config) String() string { + return "apiKey=***" +} + func (c Config) Validate() error { if c.APIKey == "" { return ErrMissingAPIKey diff --git a/internal/app/connectors/wise/connector.go b/internal/app/connectors/wise/connector.go index ab71883b..066d8098 100644 --- a/internal/app/connectors/wise/connector.go +++ b/internal/app/connectors/wise/connector.go @@ -17,6 +17,25 @@ type Connector struct { cfg Config } +func (c *Connector) InitiateTransfer(ctx task.ConnectorContext, transfer models.Transfer) error { + descriptor, err := models.EncodeTaskDescriptor(TaskDescriptor{ + Name: "Initiate transfer", + Key: taskNameTransfer, + Transfer: Transfer{ + ID: transfer.ID, + Source: transfer.Source, + Destination: transfer.Destination, + Amount: transfer.Amount, + Currency: transfer.Currency, + }, + }) + if err != nil { + return err + } + + return ctx.Scheduler().Schedule(descriptor, true) +} + func (c *Connector) Install(ctx task.ConnectorContext) error { descriptor, err := models.EncodeTaskDescriptor(TaskDescriptor{ Name: "Fetch profiles from client", diff --git a/internal/app/connectors/wise/task_resolve.go b/internal/app/connectors/wise/task_resolve.go index 4d7b7e34..caee2ca5 100644 --- a/internal/app/connectors/wise/task_resolve.go +++ b/internal/app/connectors/wise/task_resolve.go @@ -3,6 +3,8 @@ package wise import ( "fmt" + "github.com/google/uuid" + "github.com/formancehq/payments/internal/app/task" "github.com/formancehq/go-libs/logging" @@ -11,13 +13,23 @@ import ( const ( taskNameFetchTransfers = "fetch-transfers" taskNameFetchProfiles = "fetch-profiles" + taskNameTransfer = "transfer" ) // TaskDescriptor is the definition of a task. type TaskDescriptor struct { - Name string `json:"name" yaml:"name" bson:"name"` - Key string `json:"key" yaml:"key" bson:"key"` - ProfileID uint64 `json:"profileID" yaml:"profileID" bson:"profileID"` + Name string `json:"name" yaml:"name" bson:"name"` + Key string `json:"key" yaml:"key" bson:"key"` + ProfileID uint64 `json:"profileID" yaml:"profileID" bson:"profileID"` + Transfer Transfer `json:"transfer" yaml:"transfer" bson:"transfer"` +} + +type Transfer struct { + ID uuid.UUID `json:"id" yaml:"id" bson:"id"` + Source string `json:"source" yaml:"source" bson:"source"` + Destination string `json:"destination" yaml:"destination" bson:"destination"` + Amount int64 `json:"amount" yaml:"amount" bson:"amount"` + Currency string `json:"currency" yaml:"currency" bson:"currency"` } func resolveTasks(logger logging.Logger, config Config) func(taskDefinition TaskDescriptor) task.Task { @@ -29,6 +41,8 @@ func resolveTasks(logger logging.Logger, config Config) func(taskDefinition Task return taskFetchProfiles(logger, client) case taskNameFetchTransfers: return taskFetchTransfers(logger, client, taskDefinition.ProfileID) + case taskNameTransfer: + return taskTransfer(logger, client, taskDefinition.Transfer) } // This should never happen. diff --git a/internal/app/connectors/wise/task_transfer.go b/internal/app/connectors/wise/task_transfer.go new file mode 100644 index 00000000..32a9a4cd --- /dev/null +++ b/internal/app/connectors/wise/task_transfer.go @@ -0,0 +1,54 @@ +package wise + +import ( + "context" + "fmt" + "strconv" + + "github.com/google/uuid" + + "github.com/formancehq/payments/internal/app/task" + + "github.com/formancehq/go-libs/logging" +) + +func taskTransfer(logger logging.Logger, client *client, transfer Transfer) task.Task { + return func( + ctx context.Context, + scheduler task.Scheduler, + ) error { + profiles, err := client.getProfiles() + if err != nil { + return err + } + + var profileID uint64 + + for _, profile := range profiles { + if fmt.Sprint(profile.ID) == transfer.Source { + profileID = profile.ID + } + } + + quote, err := client.createQuote(profileID, transfer.Currency, transfer.Amount) + if err != nil { + return err + } + + destinationAccount, err := strconv.ParseUint(transfer.Destination, 10, 64) + if err != nil { + return err + } + + transactionID := uuid.New().String() + + err = client.createTransfer(quote, destinationAccount, transactionID) + if err != nil { + return err + } + + logger.Infof("transfer created: %s", transactionID) + + return nil + } +} diff --git a/internal/app/ingestion/ingester.go b/internal/app/ingestion/ingester.go index 1bf9ca0f..4fb61f77 100644 --- a/internal/app/ingestion/ingester.go +++ b/internal/app/ingestion/ingester.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" + "github.com/google/uuid" + "github.com/formancehq/go-libs/logging" "github.com/formancehq/go-libs/publish" "github.com/formancehq/payments/internal/app/models" @@ -12,6 +14,8 @@ import ( type Ingester interface { IngestPayments(ctx context.Context, batch PaymentBatch, commitState any) error IngestAccounts(ctx context.Context, batch AccountBatch) error + GetTransfer(ctx context.Context, transferID uuid.UUID) (models.Transfer, error) + UpdateTransferStatus(ctx context.Context, transferID uuid.UUID, status models.TransferStatus, reference, err string) error } type DefaultIngester struct { @@ -26,6 +30,9 @@ type Repository interface { UpsertAccounts(ctx context.Context, provider models.ConnectorProvider, accounts []models.Account) error UpsertPayments(ctx context.Context, provider models.ConnectorProvider, payments []*models.Payment) error UpdateTaskState(ctx context.Context, provider models.ConnectorProvider, descriptor models.TaskDescriptor, state json.RawMessage) error + + GetTransfer(ctx context.Context, transferID uuid.UUID) (models.Transfer, error) + UpdateTransferStatus(ctx context.Context, transferID uuid.UUID, status models.TransferStatus, reference, err string) error } func NewDefaultIngester( diff --git a/internal/app/ingestion/transfers.go b/internal/app/ingestion/transfers.go new file mode 100644 index 00000000..6b9ed514 --- /dev/null +++ b/internal/app/ingestion/transfers.go @@ -0,0 +1,18 @@ +package ingestion + +import ( + "context" + + "github.com/formancehq/payments/internal/app/models" + "github.com/google/uuid" +) + +func (i *DefaultIngester) GetTransfer(ctx context.Context, transferID uuid.UUID) (models.Transfer, error) { + return i.repo.GetTransfer(ctx, transferID) +} + +func (i *DefaultIngester) UpdateTransferStatus(ctx context.Context, transferID uuid.UUID, + status models.TransferStatus, reference, err string, +) error { + return i.repo.UpdateTransferStatus(ctx, transferID, status, reference, err) +} diff --git a/internal/app/integration/connector.go b/internal/app/integration/connector.go index 687099d1..a4c97a89 100644 --- a/internal/app/integration/connector.go +++ b/internal/app/integration/connector.go @@ -16,13 +16,16 @@ type Connector interface { Uninstall(ctx context.Context) error // Resolve is used to recover state of a failed or restarted task Resolve(descriptor models.TaskDescriptor) task.Task + // InitiateTransfer is used to initiate a transfer from the connector to a bank account. + InitiateTransfer(ctx task.ConnectorContext, transfer models.Transfer) error } type ConnectorBuilder struct { - name string - uninstall func(ctx context.Context) error - resolve func(descriptor models.TaskDescriptor) task.Task - install func(ctx task.ConnectorContext) error + name string + uninstall func(ctx context.Context) error + resolve func(descriptor models.TaskDescriptor) task.Task + install func(ctx task.ConnectorContext) error + initiateTransfer func(ctx task.ConnectorContext, transfer models.Transfer) error } func (b *ConnectorBuilder) WithUninstall( @@ -47,10 +50,11 @@ func (b *ConnectorBuilder) WithInstall(installFunction func(ctx task.ConnectorCo func (b *ConnectorBuilder) Build() Connector { return &BuiltConnector{ - name: b.name, - uninstall: b.uninstall, - resolve: b.resolve, - install: b.install, + name: b.name, + uninstall: b.uninstall, + resolve: b.resolve, + install: b.install, + initiateTransfer: b.initiateTransfer, } } @@ -59,10 +63,19 @@ func NewConnectorBuilder() *ConnectorBuilder { } type BuiltConnector struct { - name string - uninstall func(ctx context.Context) error - resolve func(name models.TaskDescriptor) task.Task - install func(ctx task.ConnectorContext) error + name string + uninstall func(ctx context.Context) error + resolve func(name models.TaskDescriptor) task.Task + install func(ctx task.ConnectorContext) error + initiateTransfer func(ctx task.ConnectorContext, transfer models.Transfer) error +} + +func (b *BuiltConnector) InitiateTransfer(ctx task.ConnectorContext, transfer models.Transfer) error { + if b.initiateTransfer != nil { + return b.initiateTransfer(ctx, transfer) + } + + return nil } func (b *BuiltConnector) Name() string { diff --git a/internal/app/integration/manager.go b/internal/app/integration/manager.go index b3c106fb..7f74ada3 100644 --- a/internal/app/integration/manager.go +++ b/internal/app/integration/manager.go @@ -2,6 +2,7 @@ package integration import ( "context" + "fmt" "github.com/formancehq/payments/internal/app/messages" @@ -107,7 +108,7 @@ func (l *ConnectorManager[ConnectorConfig]) Install(ctx context.Context, config return err } - err = l.connector.Install(task.NewConnectorContext(context.Background(), l.scheduler)) + err = l.connector.Install(task.NewConnectorContext(context.TODO(), l.scheduler)) if err != nil { l.logger.Errorf("Error starting connector: %s", err) @@ -213,8 +214,8 @@ func (l *ConnectorManager[ConnectorConfig]) IsEnabled(ctx context.Context) (bool return l.store.IsEnabled(ctx, l.loader.Name()) } -func (l *ConnectorManager[ConnectorConfig]) FindAll(ctx context.Context) ([]models.Connector, error) { - return l.store.FindAll(ctx) +func (l *ConnectorManager[ConnectorConfig]) FindAll(ctx context.Context) ([]*models.Connector, error) { + return l.store.ListConnectors(ctx) } func (l *ConnectorManager[ConnectorConfig]) IsInstalled(ctx context.Context) (bool, error) { @@ -255,6 +256,37 @@ func (l *ConnectorManager[ConnectorConfig]) Reset(ctx context.Context) error { return nil } +type Transfer struct { + Source string + Destination string + Currency string + Amount int64 +} + +func (l *ConnectorManager[ConnectorConfig]) InitiateTransfer(ctx context.Context, transfer Transfer) (uuid.UUID, error) { + newTransfer, err := l.store.CreateNewTransfer(ctx, l.loader.Name(), + transfer.Source, transfer.Destination, transfer.Currency, transfer.Amount) + if err != nil { + return uuid.Nil, fmt.Errorf("creating new transfer: %w", err) + } + + err = l.connector.InitiateTransfer(task.NewConnectorContext(ctx, l.scheduler), newTransfer) + if err != nil { + return uuid.Nil, fmt.Errorf("initiating transfer: %w", err) + } + + return newTransfer.ID, nil +} + +func (l *ConnectorManager[ConnectorConfig]) ListTransfers(ctx context.Context) ([]models.Transfer, error) { + transfers, err := l.store.ListTransfers(ctx, l.loader.Name()) + if err != nil { + return nil, fmt.Errorf("retrieving transfers: %w", err) + } + + return transfers, nil +} + func NewConnectorManager[ConnectorConfig models.ConnectorConfigObject]( logger logging.Logger, store Repository, diff --git a/internal/app/integration/manager_test.go b/internal/app/integration/manager_test.go index 08620215..1cc3d143 100644 --- a/internal/app/integration/manager_test.go +++ b/internal/app/integration/manager_test.go @@ -68,7 +68,7 @@ func withManager[ConnectorConfig models.ConnectorConfigObject](builder *Connecto schedulerFactory, nil) defer func() { - _ = manager.Uninstall(context.Background()) + _ = manager.Uninstall(context.TODO()) }() callback(&testContext[ConnectorConfig]{ @@ -91,11 +91,11 @@ func TestInstallConnector(t *testing.T) { return nil }) withManager(builder, func(tc *testContext[models.EmptyConnectorConfig]) { - err := tc.manager.Install(context.Background(), models.EmptyConnectorConfig{}) + err := tc.manager.Install(context.TODO(), models.EmptyConnectorConfig{}) require.NoError(t, err) require.True(t, ChanClosed(installed)) - err = tc.manager.Install(context.Background(), models.EmptyConnectorConfig{}) + err = tc.manager.Install(context.TODO(), models.EmptyConnectorConfig{}) require.Equal(t, ErrAlreadyInstalled, err) }) } @@ -127,15 +127,15 @@ func TestUninstallConnector(t *testing.T) { return nil }) withManager(builder, func(tc *testContext[models.EmptyConnectorConfig]) { - err := tc.manager.Install(context.Background(), models.EmptyConnectorConfig{}) + err := tc.manager.Install(context.TODO(), models.EmptyConnectorConfig{}) require.NoError(t, err) <-taskStarted - require.NoError(t, tc.manager.Uninstall(context.Background())) + require.NoError(t, tc.manager.Uninstall(context.TODO())) require.True(t, ChanClosed(uninstalled)) // TODO: We need to give a chance to the connector to properly stop execution require.True(t, ChanClosed(taskTerminated)) - isInstalled, err := tc.manager.IsInstalled(context.Background()) + isInstalled, err := tc.manager.IsInstalled(context.TODO()) require.NoError(t, err) require.False(t, isInstalled) }) @@ -152,15 +152,15 @@ func TestDisableConnector(t *testing.T) { return nil }) withManager[models.EmptyConnectorConfig](builder, func(tc *testContext[models.EmptyConnectorConfig]) { - err := tc.manager.Install(context.Background(), models.EmptyConnectorConfig{}) + err := tc.manager.Install(context.TODO(), models.EmptyConnectorConfig{}) require.NoError(t, err) - enabled, err := tc.manager.IsEnabled(context.Background()) + enabled, err := tc.manager.IsEnabled(context.TODO()) require.NoError(t, err) require.True(t, enabled) - require.NoError(t, tc.manager.Disable(context.Background())) - enabled, err = tc.manager.IsEnabled(context.Background()) + require.NoError(t, tc.manager.Disable(context.TODO())) + enabled, err = tc.manager.IsEnabled(context.TODO()) require.NoError(t, err) require.False(t, enabled) }) @@ -171,10 +171,10 @@ func TestEnableConnector(t *testing.T) { builder := NewConnectorBuilder() withManager[models.EmptyConnectorConfig](builder, func(tc *testContext[models.EmptyConnectorConfig]) { - err := tc.connectorStore.Enable(context.Background(), tc.loader.Name()) + err := tc.connectorStore.Enable(context.TODO(), tc.loader.Name()) require.NoError(t, err) - err = tc.manager.Install(context.Background(), models.EmptyConnectorConfig{}) + err = tc.manager.Install(context.TODO(), models.EmptyConnectorConfig{}) require.NoError(t, err) }) } @@ -187,10 +187,10 @@ func TestRestoreEnabledConnector(t *testing.T) { cfg, err := models.EmptyConnectorConfig{}.Marshal() require.NoError(t, err) - err = tc.connectorStore.Install(context.Background(), tc.loader.Name(), cfg) + err = tc.connectorStore.Install(context.TODO(), tc.loader.Name(), cfg) require.NoError(t, err) - err = tc.manager.Restore(context.Background()) + err = tc.manager.Restore(context.TODO()) require.NoError(t, err) require.NotNil(t, tc.manager.connector) }) @@ -201,7 +201,7 @@ func TestRestoreNotInstalledConnector(t *testing.T) { builder := NewConnectorBuilder() withManager(builder, func(tc *testContext[models.EmptyConnectorConfig]) { - err := tc.manager.Restore(context.Background()) + err := tc.manager.Restore(context.TODO()) require.Equal(t, ErrNotInstalled, err) }) } diff --git a/internal/app/integration/store.go b/internal/app/integration/store.go index 8fe0ea8d..66f16e5e 100644 --- a/internal/app/integration/store.go +++ b/internal/app/integration/store.go @@ -8,7 +8,7 @@ import ( ) type Repository interface { - FindAll(ctx context.Context) ([]models.Connector, error) + ListConnectors(ctx context.Context) ([]*models.Connector, error) IsInstalled(ctx context.Context, name models.ConnectorProvider) (bool, error) Install(ctx context.Context, name models.ConnectorProvider, config json.RawMessage) error Uninstall(ctx context.Context, name models.ConnectorProvider) error @@ -17,4 +17,7 @@ type Repository interface { Disable(ctx context.Context, name models.ConnectorProvider) error IsEnabled(ctx context.Context, name models.ConnectorProvider) (bool, error) GetConnector(ctx context.Context, name models.ConnectorProvider) (*models.Connector, error) + CreateNewTransfer(ctx context.Context, name models.ConnectorProvider, + source, destination, currency string, amount int64) (models.Transfer, error) + ListTransfers(ctx context.Context, name models.ConnectorProvider) ([]models.Transfer, error) } diff --git a/internal/app/integration/storememory.go b/internal/app/integration/storememory.go index 3208b602..0411d396 100644 --- a/internal/app/integration/storememory.go +++ b/internal/app/integration/storememory.go @@ -21,8 +21,8 @@ func (i *InMemoryConnectorStore) Uninstall(ctx context.Context, name models.Conn return nil } -func (i *InMemoryConnectorStore) FindAll(_ context.Context) ([]models.Connector, error) { - return []models.Connector{}, nil +func (i *InMemoryConnectorStore) ListConnectors(_ context.Context) ([]*models.Connector, error) { + return []*models.Connector{}, nil } func (i *InMemoryConnectorStore) IsInstalled(ctx context.Context, name models.ConnectorProvider) (bool, error) { @@ -88,6 +88,14 @@ func (i *InMemoryConnectorStore) ReadConfig(ctx context.Context, name models.Con return nil } +func (i *InMemoryConnectorStore) CreateNewTransfer(ctx context.Context, name models.ConnectorProvider, source, destination, currency string, amount int64) (models.Transfer, error) { + return models.Transfer{}, nil +} + +func (i *InMemoryConnectorStore) ListTransfers(ctx context.Context, name models.ConnectorProvider) ([]models.Transfer, error) { + return []models.Transfer{}, nil +} + var _ Repository = &InMemoryConnectorStore{} func NewInMemoryStore() *InMemoryConnectorStore { diff --git a/internal/app/migrations/006_conifg_encryption.go b/internal/app/migrations/006_conifg_encryption.go new file mode 100644 index 00000000..b33e15ee --- /dev/null +++ b/internal/app/migrations/006_conifg_encryption.go @@ -0,0 +1,95 @@ +package migrations + +import ( + "database/sql" + "fmt" + + "github.com/pkg/errors" + + "github.com/pressly/goose/v3" +) + +// EncryptionKey is set from the migration utility to specify default encryption key to migrate to. +// This can remain empty. Then the config will be removed. +// +//nolint:gochecknoglobals // This is a global variable by design. +var EncryptionKey string + +func init() { + up := func(tx *sql.Tx) error { + var exists bool + + err := tx.QueryRow("SELECT EXISTS(SELECT 1 FROM connectors.connector)").Scan(&exists) + if err != nil { + return fmt.Errorf("failed to check if connectors table exists: %w", err) + } + + if exists && EncryptionKey == "" { + return errors.New("encryption key is not set") + } + + _, err = tx.Exec(` + CREATE EXTENSION IF NOT EXISTS pgcrypto; + ALTER TABLE connectors.connector RENAME COLUMN config TO config_unencrypted; + ALTER TABLE connectors.connector ADD COLUMN config bytea NULL; + `) + if err != nil { + return fmt.Errorf("failed to create config column: %w", err) + } + + _, err = tx.Exec(` + UPDATE connectors.connector SET config = pgp_sym_encrypt(config_unencrypted::TEXT, $1, 'compress-algo=1, cipher-algo=aes256'); + `, EncryptionKey) + if err != nil { + return fmt.Errorf("failed to encrypt config: %w", err) + } + + _, err = tx.Exec(` + ALTER TABLE connectors.connector DROP COLUMN config_unencrypted; + `) + if err != nil { + return fmt.Errorf("failed to drop config_unencrypted column: %w", err) + } + + return nil + } + + down := func(tx *sql.Tx) error { + var exists bool + + err := tx.QueryRow("SELECT EXISTS(SELECT 1 FROM connectors.connector)").Scan(&exists) + if err != nil { + return fmt.Errorf("failed to check if connectors table exists: %w", err) + } + + if exists && EncryptionKey == "" { + return errors.New("encryption key is not set") + } + + _, err = tx.Exec(` + ALTER TABLE connectors.connector RENAME COLUMN config TO config_encrypted; + ALTER TABLE connectors.connector ADD COLUMN config JSON NULL; + `) + if err != nil { + return fmt.Errorf("failed to create config column: %w", err) + } + + _, err = tx.Exec(` + UPDATE connectors.connector SET config = pgp_sym_decrypt(config_encrypted, $1, 'compress-algo=1, cipher-algo=aes256')::JSON; + `, EncryptionKey) + if err != nil { + return fmt.Errorf("failed to decrypt config: %w", err) + } + + _, err = tx.Exec(` + ALTER TABLE connectors.connector DROP COLUMN config_encrypted; + `) + if err != nil { + return fmt.Errorf("failed to drop config_encrypted column: %w", err) + } + + return nil + } + + goose.AddMigration(up, down) +} diff --git a/internal/app/migrations/007_transfers.go b/internal/app/migrations/007_transfers.go new file mode 100644 index 00000000..f6d81b82 --- /dev/null +++ b/internal/app/migrations/007_transfers.go @@ -0,0 +1,65 @@ +package migrations + +import ( + "database/sql" + + "github.com/pressly/goose/v3" +) + +func init() { + up := func(tx *sql.Tx) error { + _, err := tx.Exec(` + CREATE TYPE transfer_status AS ENUM ('PENDING', 'SUCCEEDED', 'FAILED'); + + CREATE TABLE payments.transfers ( + id uuid NOT NULL DEFAULT gen_random_uuid(), + connector_id uuid NOT NULL, + payment_id uuid NULL, + reference text UNIQUE, + created_at timestamp with time zone NOT NULL DEFAULT NOW() CHECK (created_at<=NOW()), + amount bigint NOT NULL DEFAULT 0, + currency text NOT NULL, + source text NOT NULL, + destination text NOT NULL, + status transfer_status NOT NULL DEFAULT 'PENDING', + error text NULL, + CONSTRAINT transfer_pk PRIMARY KEY (id) + ); + + ALTER TABLE payments.transfers ADD CONSTRAINT transfer_connector + FOREIGN KEY (connector_id) + REFERENCES connectors.connector (id) + ON DELETE CASCADE + NOT DEFERRABLE + INITIALLY IMMEDIATE + ; + + ALTER TABLE payments.transfers ADD CONSTRAINT transfer_payment + FOREIGN KEY (payment_id) + REFERENCES payments.payment (id) + ON DELETE CASCADE + NOT DEFERRABLE + INITIALLY IMMEDIATE + ; + `) + if err != nil { + return err + } + + return nil + } + + down := func(tx *sql.Tx) error { + _, err := tx.Exec(` + DROP TABLE payments.transfers; + DROP TYPE transfer_status; + `) + if err != nil { + return err + } + + return nil + } + + goose.AddMigration(up, down) +} diff --git a/internal/app/models/connector.go b/internal/app/models/connector.go index 892dd61e..51046398 100644 --- a/internal/app/models/connector.go +++ b/internal/app/models/connector.go @@ -19,13 +19,25 @@ type Connector struct { Provider ConnectorProvider Enabled bool - // TODO: Enable DB-level encryption - Config json.RawMessage + // EncryptedConfig is a PGP-encrypted JSON string. + EncryptedConfig string `bun:"config"` + + // Config is a decrypted config. It is not stored in the database. + Config json.RawMessage `bun:"decrypted_config,scanonly"` Tasks []*Task `bun:"rel:has-many,join:id=connector_id"` Payments []*Payment `bun:"rel:has-many,join:id=connector_id"` } +func (c Connector) String() string { + c.EncryptedConfig = "****" + c.Config = nil + + var t any = c + + return fmt.Sprintf("%+v", t) +} + type ConnectorProvider string const ( diff --git a/internal/app/models/transfer.go b/internal/app/models/transfer.go new file mode 100644 index 00000000..7580a351 --- /dev/null +++ b/internal/app/models/transfer.go @@ -0,0 +1,44 @@ +package models + +import ( + "time" + + "github.com/uptrace/bun" + + "github.com/google/uuid" +) + +type Transfer struct { + bun.BaseModel `bun:"payments.transfer"` + + ID uuid.UUID `bun:",pk,nullzero"` + ConnectorID uuid.UUID `bun:",nullzero"` + PaymentID *uuid.UUID + CreatedAt time.Time `bun:",nullzero"` + + Reference *string + Amount int64 + Status TransferStatus + Currency string + Source string + Destination string + + Error *string + + Payment *Payment `bun:"rel:has-one,join:payment_id=id"` + Connector *Connector `bun:"rel:has-one,join:connector_id=id"` +} + +type ( + TransferStatus string +) + +const ( + TransferStatusPending TransferStatus = "PENDING" + TransferStatusSucceeded TransferStatus = "SUCCEEDED" + TransferStatusFailed TransferStatus = "FAILED" +) + +func (t TransferStatus) String() string { + return string(t) +} diff --git a/internal/app/storage/accounts.go b/internal/app/storage/accounts.go index 2ea80a2c..d7dd207e 100644 --- a/internal/app/storage/accounts.go +++ b/internal/app/storage/accounts.go @@ -3,6 +3,7 @@ package storage import ( "context" "fmt" + "sort" "time" "github.com/formancehq/payments/internal/app/models" @@ -51,19 +52,35 @@ func (s *Storage) ListAccounts(ctx context.Context, pagination Paginator) ([]*mo var ( hasMore = len(accounts) > pagination.pageSize + hasPrevious bool firstReference, lastReference string ) if hasMore { - accounts = accounts[:pagination.pageSize] + if pagination.cursor.Next || pagination.cursor.Reference == "" { + accounts = accounts[:pagination.pageSize] + } else { + accounts = accounts[1:] + } } + sort.Slice(accounts, func(i, j int) bool { + return accounts[i].CreatedAt.After(accounts[j].CreatedAt) + }) + if len(accounts) > 0 { firstReference = accounts[0].CreatedAt.Format(time.RFC3339Nano) lastReference = accounts[len(accounts)-1].CreatedAt.Format(time.RFC3339Nano) + + query = s.db.NewSelect().Model(&accounts) + + hasPrevious, err = pagination.hasPrevious(ctx, query, "account.created_at", firstReference) + if err != nil { + return nil, PaginationDetails{}, fmt.Errorf("failed to check if there is a previous page: %w", err) + } } - paginationDetails, err := pagination.paginationDetails(hasMore, firstReference, lastReference) + paginationDetails, err := pagination.paginationDetails(hasMore, hasPrevious, firstReference, lastReference) if err != nil { return nil, PaginationDetails{}, fmt.Errorf("failed to get pagination details: %w", err) } diff --git a/internal/app/storage/connectors.go b/internal/app/storage/connectors.go index 8ba0c154..1ce82a94 100644 --- a/internal/app/storage/connectors.go +++ b/internal/app/storage/connectors.go @@ -9,20 +9,25 @@ import ( ) func (s *Storage) ListConnectors(ctx context.Context) ([]*models.Connector, error) { - var res []*models.Connector - err := s.db.NewSelect().Model(&res).Scan(ctx) + var connectors []*models.Connector + + err := s.db.NewSelect(). + Model(&connectors). + ColumnExpr("*, pgp_sym_decrypt(config, ?, ?) AS decrypted_config", s.configEncryptionKey, encryptionOptions). + Scan(ctx) if err != nil { return nil, e("list connectors", err) } - return res, nil + return connectors, nil } func (s *Storage) GetConfig(ctx context.Context, connectorProvider models.ConnectorProvider, destination any) error { var connector models.Connector - err := s.db.NewSelect().Model(&connector). - Column("config"). + err := s.db.NewSelect(). + Model(&connector). + ColumnExpr("pgp_sym_decrypt(config, ?, ?) AS decrypted_config", s.configEncryptionKey, encryptionOptions). Where("provider = ?", connectorProvider). Scan(ctx) if err != nil { @@ -37,17 +42,6 @@ func (s *Storage) GetConfig(ctx context.Context, connectorProvider models.Connec return nil } -func (s *Storage) FindAll(ctx context.Context) ([]models.Connector, error) { - var connectors []models.Connector - - err := s.db.NewSelect().Model(&connectors).Scan(ctx) - if err != nil { - return nil, e("find all connectors", err) - } - - return connectors, err -} - func (s *Storage) IsInstalled(ctx context.Context, provider models.ConnectorProvider) (bool, error) { exists, err := s.db.NewSelect(). Model(&models.Connector{}). @@ -64,7 +58,6 @@ func (s *Storage) Install(ctx context.Context, provider models.ConnectorProvider connector := models.Connector{ Provider: provider, Enabled: true, - Config: config, } _, err := s.db.NewInsert().Model(&connector).Exec(ctx) @@ -72,7 +65,7 @@ func (s *Storage) Install(ctx context.Context, provider models.ConnectorProvider return e("install connector", err) } - return nil + return s.UpdateConfig(ctx, provider, config) } func (s *Storage) Uninstall(ctx context.Context, provider models.ConnectorProvider) error { @@ -90,7 +83,7 @@ func (s *Storage) Uninstall(ctx context.Context, provider models.ConnectorProvid func (s *Storage) UpdateConfig(ctx context.Context, provider models.ConnectorProvider, config json.RawMessage) error { _, err := s.db.NewUpdate(). Model(&models.Connector{}). - Set("config = ?", config). + Set("config = pgp_sym_encrypt(?::TEXT, ?, ?)", config, s.configEncryptionKey, encryptionOptions). Where("provider = ?", provider). Exec(ctx) if err != nil { @@ -131,6 +124,7 @@ func (s *Storage) IsEnabled(ctx context.Context, provider models.ConnectorProvid err := s.db.NewSelect(). Model(&connector). + Column("enabled"). Where("provider = ?", provider). Scan(ctx) if err != nil { @@ -145,6 +139,7 @@ func (s *Storage) GetConnector(ctx context.Context, provider models.ConnectorPro err := s.db.NewSelect(). Model(&connector). + ColumnExpr("*, pgp_sym_decrypt(config, ?, ?) AS decrypted_config", s.configEncryptionKey, encryptionOptions). Where("provider = ?", provider). Scan(ctx) if err != nil { diff --git a/internal/app/storage/module.go b/internal/app/storage/module.go index eb82307a..0ba80d52 100644 --- a/internal/app/storage/module.go +++ b/internal/app/storage/module.go @@ -20,7 +20,7 @@ import ( const dbName = "paymentsDB" -func Module(uri string) fx.Option { +func Module(uri, configEncryptionKey string) fx.Option { return fx.Options( fx.Provide(func() (*pgx.ConnConfig, error) { config, err := pgx.ParseConfig(uri) @@ -40,7 +40,7 @@ func Module(uri string) fx.Option { db.AddQueryHook(bunotel.NewQueryHook(bunotel.WithDBName(dbName))) - return newStorage(db) + return newStorage(db, configEncryptionKey) }), fx.Invoke(func(lc fx.Lifecycle, repo *Storage) { diff --git a/internal/app/storage/paginate.go b/internal/app/storage/paginate.go index 8cf1ff56..202de7fa 100644 --- a/internal/app/storage/paginate.go +++ b/internal/app/storage/paginate.go @@ -1,6 +1,7 @@ package storage import ( + "context" "encoding/base64" "encoding/json" "fmt" @@ -70,14 +71,14 @@ func Paginate(pageSize int, token string, sorter Sorter) (Paginator, error) { } func (p Paginator) apply(query *bun.SelectQuery, column string) *bun.SelectQuery { - query = query.Limit(p.pageSize + 1).Order(column + " DESC") + query = query.Limit(p.pageSize + 1) if p.cursor.Reference == "" { if p.sorter != nil { query = p.sorter.apply(query) } - return query + return query.Order(column + " DESC") } if p.cursor.Sorter != nil { @@ -85,20 +86,43 @@ func (p Paginator) apply(query *bun.SelectQuery, column string) *bun.SelectQuery } if p.cursor.Next { - return query.Where(fmt.Sprintf("%s < ?", column), p.cursor.Reference) + return query.Where(fmt.Sprintf("%s < ?", column), p.cursor.Reference).Order(column + " DESC") } - return query.Where(fmt.Sprintf("%s > ?", column), p.cursor.Reference) + return query.Where(fmt.Sprintf("%s >= ?", column), p.cursor.Reference).Order(column + " ASC") } -func (p Paginator) paginationDetails(hasMore bool, firstReference, lastReference string) (PaginationDetails, error) { +func (p Paginator) hasPrevious(ctx context.Context, query *bun.SelectQuery, column, reference string) (bool, error) { + query = query.Limit(1).Order(column + " DESC") + + if p.cursor.Reference == "" { + if p.sorter != nil { + query = p.sorter.apply(query) + } + } + + if p.cursor.Sorter != nil { + query = p.cursor.Sorter.apply(query) + } + + query = query.Where(fmt.Sprintf("%s > ?", column), reference) + + exists, err := query.Exists(ctx) + if err != nil { + return false, fmt.Errorf("error checking if previous page exists: %w", err) + } + + return exists, nil +} + +func (p Paginator) paginationDetails(hasMore, hasPrevious bool, firstReference, lastReference string) (PaginationDetails, error) { var ( previousPage string nextPage string err error ) - if p.cursor.Reference != "" { + if hasPrevious && firstReference != "" { previousPage, err = baseCursor{ Reference: firstReference, Sorter: p.sorter, @@ -109,7 +133,7 @@ func (p Paginator) paginationDetails(hasMore bool, firstReference, lastReference } } - if hasMore { + if hasMore && lastReference != "" { nextPage, err = baseCursor{ Reference: lastReference, Sorter: p.sorter, diff --git a/internal/app/storage/paginate_test.go b/internal/app/storage/paginate_test.go index 6418cf82..65261f5c 100644 --- a/internal/app/storage/paginate_test.go +++ b/internal/app/storage/paginate_test.go @@ -159,6 +159,7 @@ func TestPaginatorPaginationDetails(t *testing.T) { type args struct { hasMore bool + hasPrevious bool firstReference string lastReference string } @@ -175,7 +176,7 @@ func TestPaginatorPaginationDetails(t *testing.T) { } tokenNext, err := baseCursor{ - Reference: "", + Reference: "abc", Sorter: nil, Next: true, }.Encode() @@ -193,21 +194,21 @@ func TestPaginatorPaginationDetails(t *testing.T) { { name: "no cursor", fields: fields{pageSize: 10, token: "", cursor: baseCursor{}, sorter: nil}, - args: args{hasMore: false, firstReference: "", lastReference: ""}, + args: args{hasMore: false, hasPrevious: false, firstReference: "", lastReference: ""}, want: PaginationDetails{PageSize: 10, HasMore: false}, wantErr: false, }, { name: "with cursor", fields: fields{pageSize: 10, token: "", cursor: cursor, sorter: nil}, - args: args{hasMore: false, firstReference: "abc", lastReference: ""}, + args: args{hasMore: false, hasPrevious: true, firstReference: "abc", lastReference: ""}, want: PaginationDetails{PageSize: 10, HasMore: false, PreviousPage: token}, wantErr: false, }, { name: "has more", fields: fields{pageSize: 10, token: "", cursor: baseCursor{}, sorter: nil}, - args: args{hasMore: true, firstReference: "", lastReference: ""}, + args: args{hasMore: true, hasPrevious: false, firstReference: "", lastReference: "abc"}, want: PaginationDetails{PageSize: 10, HasMore: true, NextPage: tokenNext}, wantErr: false, }, @@ -226,7 +227,7 @@ func TestPaginatorPaginationDetails(t *testing.T) { sorter: tt.fields.sorter, } - got, err := p.paginationDetails(tt.args.hasMore, tt.args.firstReference, tt.args.lastReference) + got, err := p.paginationDetails(tt.args.hasMore, tt.args.hasPrevious, tt.args.firstReference, tt.args.lastReference) if (err != nil) != tt.wantErr { t.Errorf("paginationDetails() error = %v, wantErr %v", err, tt.wantErr) diff --git a/internal/app/storage/payments.go b/internal/app/storage/payments.go index 10f09662..2b3dbf1c 100644 --- a/internal/app/storage/payments.go +++ b/internal/app/storage/payments.go @@ -3,6 +3,7 @@ package storage import ( "context" "fmt" + "sort" "time" "github.com/uptrace/bun" @@ -29,19 +30,35 @@ func (s *Storage) ListPayments(ctx context.Context, pagination Paginator) ([]*mo var ( hasMore = len(payments) > pagination.pageSize + hasPrevious bool firstReference, lastReference string ) if hasMore { - payments = payments[:pagination.pageSize] + if pagination.cursor.Next || pagination.cursor.Reference == "" { + payments = payments[:pagination.pageSize] + } else { + payments = payments[1:] + } } + sort.Slice(payments, func(i, j int) bool { + return payments[i].CreatedAt.After(payments[j].CreatedAt) + }) + if len(payments) > 0 { firstReference = payments[0].CreatedAt.Format(time.RFC3339Nano) lastReference = payments[len(payments)-1].CreatedAt.Format(time.RFC3339Nano) + + query = s.db.NewSelect().Model(&payments) + + hasPrevious, err = pagination.hasPrevious(ctx, query, "payment.created_at", firstReference) + if err != nil { + return nil, PaginationDetails{}, fmt.Errorf("failed to check if there is a previous page: %w", err) + } } - paginationDetails, err := pagination.paginationDetails(hasMore, firstReference, lastReference) + paginationDetails, err := pagination.paginationDetails(hasMore, hasPrevious, firstReference, lastReference) if err != nil { return nil, PaginationDetails{}, fmt.Errorf("failed to get pagination details: %w", err) } @@ -163,5 +180,10 @@ func (s *Storage) UpsertPayments(ctx context.Context, provider models.ConnectorP } } + err = s.UpdateTransfersFromPayments(ctx, payments) + if err != nil { + return e("failed to update transfers", err) + } + return nil } diff --git a/internal/app/storage/repository.go b/internal/app/storage/repository.go index 1804800e..29fa970c 100644 --- a/internal/app/storage/repository.go +++ b/internal/app/storage/repository.go @@ -6,14 +6,17 @@ import ( ) type Storage struct { - db *bun.DB + db *bun.DB + configEncryptionKey string } -func newStorage(db *bun.DB) *Storage { - return &Storage{db: db} +const encryptionOptions = "compress-algo=1, cipher-algo=aes256" + +func newStorage(db *bun.DB, configEncryptionKey string) *Storage { + return &Storage{db: db, configEncryptionKey: configEncryptionKey} } -// nolint:unused // used for SQL debugging purposes +//nolint:unused // used in debug mode func (s *Storage) debug() { s.db.AddQueryHook(bundebug.NewQueryHook(bundebug.WithVerbose(true))) } diff --git a/internal/app/storage/task.go b/internal/app/storage/task.go index e1176925..73e4f6a3 100644 --- a/internal/app/storage/task.go +++ b/internal/app/storage/task.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "sort" "time" "github.com/google/uuid" @@ -128,19 +129,36 @@ func (s *Storage) ListTasks(ctx context.Context, provider models.ConnectorProvid var ( hasMore = len(tasks) > pagination.pageSize + hasPrevious bool firstReference, lastReference string ) if hasMore { - tasks = tasks[:pagination.pageSize] + if pagination.cursor.Next || pagination.cursor.Reference == "" { + tasks = tasks[:pagination.pageSize] + } else { + tasks = tasks[1:] + } } + sort.Slice(tasks, func(i, j int) bool { + return tasks[i].CreatedAt.After(tasks[j].CreatedAt) + }) + if len(tasks) > 0 { firstReference = tasks[0].CreatedAt.Format(time.RFC3339Nano) lastReference = tasks[len(tasks)-1].CreatedAt.Format(time.RFC3339Nano) + + query = s.db.NewSelect().Model(&tasks). + Where("connector_id = ?", connector.ID) + + hasPrevious, err = pagination.hasPrevious(ctx, query, "task.created_at", firstReference) + if err != nil { + return nil, PaginationDetails{}, fmt.Errorf("failed to check if there is a previous page: %w", err) + } } - paginationDetails, err := pagination.paginationDetails(hasMore, firstReference, lastReference) + paginationDetails, err := pagination.paginationDetails(hasMore, hasPrevious, firstReference, lastReference) if err != nil { return nil, PaginationDetails{}, fmt.Errorf("failed to get pagination details: %w", err) } diff --git a/internal/app/storage/transfer.go b/internal/app/storage/transfer.go new file mode 100644 index 00000000..96232546 --- /dev/null +++ b/internal/app/storage/transfer.go @@ -0,0 +1,122 @@ +package storage + +import ( + "context" + + "github.com/pkg/errors" + + "github.com/uptrace/bun" + + "github.com/jackc/pgx/v5" + + "github.com/google/uuid" + + "github.com/formancehq/payments/internal/app/models" +) + +func (s *Storage) CreateNewTransfer(ctx context.Context, name models.ConnectorProvider, + source, destination, currency string, amount int64, +) (models.Transfer, error) { + connector, err := s.GetConnector(ctx, name) + if err != nil { + return models.Transfer{}, err + } + + newTransfer := models.Transfer{ + ConnectorID: connector.ID, + Amount: amount, + Status: models.TransferStatusPending, + Currency: currency, + Source: source, + Destination: destination, + } + + _, err = s.db.NewInsert().Model(&newTransfer).Exec(ctx) + if err != nil { + return models.Transfer{}, e("failed to create new transfer", err) + } + + return newTransfer, nil +} + +func (s *Storage) ListTransfers(ctx context.Context, name models.ConnectorProvider) ([]models.Transfer, error) { + connector, err := s.GetConnector(ctx, name) + if err != nil { + return nil, err + } + + var transfers []models.Transfer + err = s.db.NewSelect().Model(&transfers).Where("connector_id = ?", connector.ID).Scan(ctx) + if err != nil { + return nil, e("failed to list transfers", err) + } + + return transfers, nil +} + +func (s *Storage) GetTransfer(ctx context.Context, transferID uuid.UUID) (models.Transfer, error) { + var transfer models.Transfer + + err := s.db.NewSelect().Model(&transfer).Where("id = ?", transferID).Scan(ctx) + if err != nil { + return models.Transfer{}, e("failed to get transfer", err) + } + + return transfer, nil +} + +func (s *Storage) UpdateTransferStatus(ctx context.Context, transferID uuid.UUID, + status models.TransferStatus, reference, transferErr string, +) error { + _, err := s.db.NewUpdate().Model(&models.Transfer{}). + Set("status = ?", status). + Set("reference = ?", reference). + Set("error = ?", transferErr). + Where("id = ?", transferID). + Exec(ctx) + if err != nil { + return e("failed to update transfer status", err) + } + + return nil +} + +func (s *Storage) UpdateTransfersFromPayments(ctx context.Context, payments []*models.Payment) error { + var transfers []models.Transfer + + paymentReferences := make([]string, 0, len(payments)) + for paymentIdx := range payments { + paymentReferences[paymentIdx] = payments[paymentIdx].Reference + } + + err := s.db.NewSelect().Model(&transfers). + Where("reference IN (?)", bun.In(paymentReferences)). + Where("payment_id IS NULL"). + Scan(ctx) + if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return nil + } + + return e("failed to get transfer", err) + } + + for transferIdx := range transfers { + if transfers[transferIdx].Reference == nil { + continue + } + + for paymentIdx := range payments { + if payments[paymentIdx].Reference == *transfers[transferIdx].Reference { + transfers[transferIdx].PaymentID = &payments[paymentIdx].ID + } + } + } + + _, err = s.db.NewUpdate().Model(&transfers).Exec(ctx) + if err != nil { + return e("failed to update transfers", err) + } + + return nil +} diff --git a/internal/app/task/scheduler.go b/internal/app/task/scheduler.go index a51b6a9b..6eec5224 100644 --- a/internal/app/task/scheduler.go +++ b/internal/app/task/scheduler.go @@ -84,7 +84,7 @@ func (s *DefaultTaskScheduler) Schedule(descriptor models.TaskDescriptor, restar } if !restart { - _, err := s.ReadTaskByDescriptor(context.Background(), descriptor) + _, err := s.ReadTaskByDescriptor(context.TODO(), descriptor) if err == nil { return nil } @@ -186,7 +186,7 @@ func (s *DefaultTaskScheduler) deleteTask(holder *taskHolder) { return } - oldestPendingTask, err := s.store.ReadOldestPendingTask(context.Background(), s.provider) + oldestPendingTask, err := s.store.ReadOldestPendingTask(context.TODO(), s.provider) if err != nil { if errors.Is(err, storage.ErrNotFound) { return @@ -213,7 +213,7 @@ func (s *DefaultTaskScheduler) deleteTask(holder *taskHolder) { type StopChan chan chan struct{} func (s *DefaultTaskScheduler) startTask(descriptor models.TaskDescriptor) error { - task, err := s.store.FindAndUpsertTask(context.Background(), s.provider, descriptor, + task, err := s.store.FindAndUpsertTask(context.TODO(), s.provider, descriptor, models.TaskStatusActive, "") if err != nil { return errors.Wrap(err, "finding task and update") @@ -228,7 +228,7 @@ func (s *DefaultTaskScheduler) startTask(descriptor models.TaskDescriptor) error return ErrUnableToResolve } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.TODO()) ctx, span := otel.Tracer("com.formance.payments").Start(ctx, "Task", trace.WithAttributes( attribute.Stringer("id", task.ID), attribute.Stringer("connector", s.provider), @@ -339,7 +339,7 @@ func (s *DefaultTaskScheduler) stackTask(descriptor models.TaskDescriptor) error }).Infof("Stacking task") return s.store.UpdateTaskStatus( - context.Background(), s.provider, descriptor, models.TaskStatusPending, "") + context.TODO(), s.provider, descriptor, models.TaskStatusPending, "") } var _ Scheduler = &DefaultTaskScheduler{} diff --git a/internal/app/task/scheduler_test.go b/internal/app/task/scheduler_test.go index 4e92b011..5ea70269 100644 --- a/internal/app/task/scheduler_test.go +++ b/internal/app/task/scheduler_test.go @@ -210,7 +210,7 @@ func TestTaskScheduler(t *testing.T) { require.NoError(t, scheduler.Schedule(mainDescriptor, false)) require.Eventually(t, TaskActive(store, provider, mainDescriptor), time.Second, 100*time.Millisecond) - require.NoError(t, scheduler.Shutdown(context.Background())) + require.NoError(t, scheduler.Shutdown(context.TODO())) require.Eventually(t, TaskTerminated(store, provider, mainDescriptor), time.Second, 100*time.Millisecond) require.Eventually(t, TaskPending(store, provider, workerDescriptor), time.Second, 100*time.Millisecond) }) diff --git a/openapi.yaml b/openapi.yaml index 78247737..c1c7cda3 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -160,6 +160,31 @@ paths: responses: '200': $ref: '#/components/responses/Task' + /connectors/{connector}/transfers: + post: + summary: Transfer funds between Connector accounts + tags: + - Payments + operationId: connectorsTransfer + description: Execute a transfer between two accounts. + parameters: + - $ref: '#/components/parameters/Connector' + requestBody: + $ref: '#/components/requestBodies/Transfer' + responses: + '200': + $ref: '#/components/responses/Transfer' + get: + summary: List transfers and their statuses + tags: + - Payments + operationId: listConnectorsTransfers + description: List transfers + parameters: + - $ref: '#/components/parameters/Connector' + responses: + '200': + $ref: '#/components/responses/Transfers' /connectors/stripe/transfers: post: summary: Transfer funds between Stripe accounts @@ -297,6 +322,18 @@ components: application/json: schema: $ref: '#/components/schemas/StripeTransferResponse' + Transfer: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/TransferResponse' + Transfers: + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/TransfersResponse' # ---------------------- REQUESTS ---------------------- requestBodies: ConnectorConfig: @@ -317,6 +354,12 @@ components: application/json: schema: $ref: '#/components/schemas/PaymentMetadata' + Transfer: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/TransferRequest' # ---------------------- SCHEMAS ---------------------- schemas: @@ -466,7 +509,37 @@ components: - $ref: '#/components/schemas/TaskDummyPay' - $ref: '#/components/schemas/TaskModulr' - $ref: '#/components/schemas/TaskBankingCircle' - + TransferResponse: + type: object + properties: + id: + type: string + TransfersResponse: + type: object + properties: + data: + type: array + items: + type: object + properties: + id: + type: string + amount: + type: integer + format: int64 + minimum: 0 + asset: + type: string + destination: + type: string + source: + type: string + currency: + type: string + status: + type: string + error: + type: string # ---------------------- DATA MODELS ---------------------- Connector: type: string @@ -839,6 +912,27 @@ components: type: string key: type: string + TransferRequest: + type: object + required: + - asset + - amount + - destination + properties: + amount: + type: integer + format: int64 + minimum: 0 + example: 100 + asset: + type: string + example: USD + destination: + type: string + example: acct_1Gqj58KZcSIg2N2q + source: + type: string + example: acct_1Gqj58KZcSIg2N2q StripeTransferRequest: type: object properties: