From f9e64ea6d4a4a17c8307e0ce7ce1268fc9b2cb81 Mon Sep 17 00:00:00 2001 From: Antoine Gelloz Date: Thu, 1 Dec 2022 10:06:22 +0100 Subject: [PATCH] fix: Rework API responses (#39) --- pkg/config.go | 24 +++- pkg/server/activate_one.go | 37 ------ pkg/server/activation.go | 63 +++++++++ pkg/server/deactivate_one.go | 37 ------ pkg/server/{delete_one.go => delete.go} | 9 +- pkg/server/{get_many.go => get.go} | 8 +- pkg/server/{health_check.go => health.go} | 0 pkg/server/{insert_one.go => insert.go} | 26 ++-- pkg/server/openapi.yaml | 105 +++++++-------- pkg/server/{change_secret.go => secret.go} | 11 +- pkg/server/service.go | 92 ------------- pkg/storage/mongo/mongo.go | 148 ++++++++++++--------- pkg/storage/store.go | 25 ++-- pkg/worker/messages/worker.go | 6 +- pkg/worker/retries/worker.go | 15 ++- test/main_test.go | 15 +++ test/server_test.go | 32 ++--- 17 files changed, 304 insertions(+), 349 deletions(-) delete mode 100644 pkg/server/activate_one.go create mode 100644 pkg/server/activation.go delete mode 100644 pkg/server/deactivate_one.go rename pkg/server/{delete_one.go => delete.go} (75%) rename pkg/server/{get_many.go => get.go} (91%) rename pkg/server/{health_check.go => health.go} (100%) rename pkg/server/{insert_one.go => insert.go} (66%) rename pkg/server/{change_secret.go => secret.go} (86%) delete mode 100644 pkg/server/service.go diff --git a/pkg/config.go b/pkg/config.go index d1d2a4c..cbf58eb 100644 --- a/pkg/config.go +++ b/pkg/config.go @@ -7,13 +7,9 @@ import ( "net/url" "strings" "time" -) -type ConfigUser struct { - Endpoint string `json:"endpoint" bson:"endpoint"` - Secret string `json:"secret" bson:"secret"` - EventTypes []string `json:"eventTypes" bson:"eventTypes"` -} + "github.com/google/uuid" +) type Config struct { ConfigUser `bson:"inline"` @@ -23,6 +19,22 @@ type Config struct { UpdatedAt time.Time `json:"updatedAt" bson:"updatedAt"` } +type ConfigUser struct { + Endpoint string `json:"endpoint" bson:"endpoint"` + Secret string `json:"secret" bson:"secret"` + EventTypes []string `json:"eventTypes" bson:"eventTypes"` +} + +func NewConfig(cfgUser ConfigUser) Config { + return Config{ + ConfigUser: cfgUser, + ID: uuid.NewString(), + Active: true, + CreatedAt: time.Now().UTC(), + UpdatedAt: time.Now().UTC(), + } +} + const ( KeySecret = "secret" KeyEventTypes = "eventTypes" diff --git a/pkg/server/activate_one.go b/pkg/server/activate_one.go deleted file mode 100644 index ba836f1..0000000 --- a/pkg/server/activate_one.go +++ /dev/null @@ -1,37 +0,0 @@ -package server - -import ( - "encoding/json" - "errors" - "net/http" - - "github.com/formancehq/go-libs/sharedapi" - "github.com/formancehq/go-libs/sharedlogging" - webhooks "github.com/formancehq/webhooks/pkg" - "github.com/go-chi/chi/v5" -) - -func (h *serverHandler) activateOneConfigHandle(w http.ResponseWriter, r *http.Request) { - id := chi.URLParam(r, PathParamId) - cursor, err := updateOneConfigActivation(r.Context(), true, id, h.store) - if err == nil { - sharedlogging.GetLogger(r.Context()).Infof("PUT %s/%s%s", PathConfigs, id, PathActivate) - resp := sharedapi.BaseResponse[webhooks.Config]{ - Cursor: &cursor, - } - if err := json.NewEncoder(w).Encode(resp); err != nil { - sharedlogging.GetLogger(r.Context()).Errorf("json.Encoder.Encode: %s", err) - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - return - } - } else if errors.Is(err, ErrConfigNotFound) { - sharedlogging.GetLogger(r.Context()).Infof("PUT %s/%s%s: %s", PathConfigs, id, PathActivate, ErrConfigNotFound) - http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) - } else if errors.Is(err, ErrConfigNotModified) { - sharedlogging.GetLogger(r.Context()).Infof("PUT %s/%s%s: %s", PathConfigs, id, PathActivate, ErrConfigNotModified) - w.WriteHeader(http.StatusNotModified) - } else { - sharedlogging.GetLogger(r.Context()).Errorf("PUT %s/%s%s: %s", PathConfigs, id, PathActivate, err) - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - } -} diff --git a/pkg/server/activation.go b/pkg/server/activation.go new file mode 100644 index 0000000..4b0a5b9 --- /dev/null +++ b/pkg/server/activation.go @@ -0,0 +1,63 @@ +package server + +import ( + "encoding/json" + "net/http" + + "github.com/formancehq/go-libs/sharedapi" + "github.com/formancehq/go-libs/sharedlogging" + webhooks "github.com/formancehq/webhooks/pkg" + "github.com/formancehq/webhooks/pkg/storage" + "github.com/go-chi/chi/v5" + "github.com/pkg/errors" +) + +func (h *serverHandler) activateOneConfigHandle(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, PathParamId) + c, err := h.store.UpdateOneConfigActivation(r.Context(), id, true) + if err == nil { + sharedlogging.GetLogger(r.Context()).Infof("PUT %s/%s%s", PathConfigs, id, PathActivate) + resp := sharedapi.BaseResponse[webhooks.Config]{ + Data: &c, + } + if err := json.NewEncoder(w).Encode(resp); err != nil { + sharedlogging.GetLogger(r.Context()).Errorf("json.Encoder.Encode: %s", err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + } else if errors.Is(err, storage.ErrConfigNotFound) { + sharedlogging.GetLogger(r.Context()).Infof("PUT %s/%s%s: %s", PathConfigs, id, PathActivate, storage.ErrConfigNotFound) + http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) + } else if errors.Is(err, storage.ErrConfigNotModified) { + sharedlogging.GetLogger(r.Context()).Infof("PUT %s/%s%s: %s", PathConfigs, id, PathActivate, storage.ErrConfigNotModified) + w.WriteHeader(http.StatusNotModified) + } else { + sharedlogging.GetLogger(r.Context()).Errorf("PUT %s/%s%s: %s", PathConfigs, id, PathActivate, err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + } +} + +func (h *serverHandler) deactivateOneConfigHandle(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, PathParamId) + c, err := h.store.UpdateOneConfigActivation(r.Context(), id, false) + if err == nil { + sharedlogging.GetLogger(r.Context()).Infof("PUT %s/%s%s", PathConfigs, id, PathDeactivate) + resp := sharedapi.BaseResponse[webhooks.Config]{ + Data: &c, + } + if err := json.NewEncoder(w).Encode(resp); err != nil { + sharedlogging.GetLogger(r.Context()).Errorf("json.Encoder.Encode: %s", err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + } else if errors.Is(err, storage.ErrConfigNotFound) { + sharedlogging.GetLogger(r.Context()).Infof("PUT %s/%s%s: %s", PathConfigs, id, PathDeactivate, storage.ErrConfigNotFound) + http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) + } else if errors.Is(err, storage.ErrConfigNotModified) { + sharedlogging.GetLogger(r.Context()).Infof("PUT %s/%s%s: %s", PathConfigs, id, PathDeactivate, storage.ErrConfigNotModified) + w.WriteHeader(http.StatusNotModified) + } else { + sharedlogging.GetLogger(r.Context()).Errorf("PUT %s/%s%s: %s", PathConfigs, id, PathDeactivate, err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + } +} diff --git a/pkg/server/deactivate_one.go b/pkg/server/deactivate_one.go deleted file mode 100644 index 8b07c35..0000000 --- a/pkg/server/deactivate_one.go +++ /dev/null @@ -1,37 +0,0 @@ -package server - -import ( - "encoding/json" - "errors" - "net/http" - - "github.com/formancehq/go-libs/sharedapi" - "github.com/formancehq/go-libs/sharedlogging" - webhooks "github.com/formancehq/webhooks/pkg" - "github.com/go-chi/chi/v5" -) - -func (h *serverHandler) deactivateOneConfigHandle(w http.ResponseWriter, r *http.Request) { - id := chi.URLParam(r, PathParamId) - cursor, err := updateOneConfigActivation(r.Context(), false, id, h.store) - if err == nil { - sharedlogging.GetLogger(r.Context()).Infof("PUT %s/%s%s", PathConfigs, id, PathDeactivate) - resp := sharedapi.BaseResponse[webhooks.Config]{ - Cursor: &cursor, - } - if err := json.NewEncoder(w).Encode(resp); err != nil { - sharedlogging.GetLogger(r.Context()).Errorf("json.Encoder.Encode: %s", err) - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - return - } - } else if errors.Is(err, ErrConfigNotFound) { - sharedlogging.GetLogger(r.Context()).Infof("PUT %s/%s%s: %s", PathConfigs, id, PathDeactivate, ErrConfigNotFound) - http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) - } else if errors.Is(err, ErrConfigNotModified) { - sharedlogging.GetLogger(r.Context()).Infof("PUT %s/%s%s: %s", PathConfigs, id, PathDeactivate, ErrConfigNotModified) - w.WriteHeader(http.StatusNotModified) - } else { - sharedlogging.GetLogger(r.Context()).Errorf("PUT %s/%s%s: %s", PathConfigs, id, PathDeactivate, err) - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - } -} diff --git a/pkg/server/delete_one.go b/pkg/server/delete.go similarity index 75% rename from pkg/server/delete_one.go rename to pkg/server/delete.go index db14a65..deeede4 100644 --- a/pkg/server/delete_one.go +++ b/pkg/server/delete.go @@ -1,20 +1,21 @@ package server import ( - "errors" "net/http" "github.com/formancehq/go-libs/sharedlogging" + "github.com/formancehq/webhooks/pkg/storage" "github.com/go-chi/chi/v5" + "github.com/pkg/errors" ) func (h *serverHandler) deleteOneConfigHandle(w http.ResponseWriter, r *http.Request) { id := chi.URLParam(r, PathParamId) - err := deleteOneConfig(r.Context(), id, h.store) + err := h.store.DeleteOneConfig(r.Context(), id) if err == nil { sharedlogging.GetLogger(r.Context()).Infof("DELETE %s/%s", PathConfigs, id) - } else if errors.Is(err, ErrConfigNotFound) { - sharedlogging.GetLogger(r.Context()).Infof("DELETE %s/%s: %s", PathConfigs, id, ErrConfigNotFound) + } else if errors.Is(err, storage.ErrConfigNotFound) { + sharedlogging.GetLogger(r.Context()).Infof("DELETE %s/%s: %s", PathConfigs, id, storage.ErrConfigNotFound) http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) } else { sharedlogging.GetLogger(r.Context()).Errorf("DELETE %s/%s: %s", PathConfigs, id, err) diff --git a/pkg/server/get_many.go b/pkg/server/get.go similarity index 91% rename from pkg/server/get_many.go rename to pkg/server/get.go index 4b347ac..2e2896c 100644 --- a/pkg/server/get_many.go +++ b/pkg/server/get.go @@ -18,7 +18,7 @@ func (h *serverHandler) getManyConfigsHandle(w http.ResponseWriter, r *http.Requ return } - cursor, err := h.store.FindManyConfigs(r.Context(), filter) + cfgs, err := h.store.FindManyConfigs(r.Context(), filter) if err != nil { sharedlogging.GetLogger(r.Context()).Errorf("storage.store.FindManyConfigs: %s", err) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) @@ -26,7 +26,9 @@ func (h *serverHandler) getManyConfigsHandle(w http.ResponseWriter, r *http.Requ } resp := sharedapi.BaseResponse[webhooks.Config]{ - Cursor: &cursor, + Cursor: &sharedapi.Cursor[webhooks.Config]{ + Data: cfgs, + }, } if err := json.NewEncoder(w).Encode(resp); err != nil { @@ -35,7 +37,7 @@ func (h *serverHandler) getManyConfigsHandle(w http.ResponseWriter, r *http.Requ return } - sharedlogging.GetLogger(r.Context()).Infof("GET /configs: %d results", len(cursor.Data)) + sharedlogging.GetLogger(r.Context()).Infof("GET /configs: %d results", len(resp.Cursor.Data)) } var ErrInvalidParams = errors.New("invalid params: only 'id' and 'endpoint' with a valid URL are accepted") diff --git a/pkg/server/health_check.go b/pkg/server/health.go similarity index 100% rename from pkg/server/health_check.go rename to pkg/server/health.go diff --git a/pkg/server/insert_one.go b/pkg/server/insert.go similarity index 66% rename from pkg/server/insert_one.go rename to pkg/server/insert.go index a0f5c52..84f6f19 100644 --- a/pkg/server/insert_one.go +++ b/pkg/server/insert.go @@ -2,12 +2,12 @@ package server import ( "encoding/json" - "errors" - "fmt" "net/http" + "github.com/formancehq/go-libs/sharedapi" "github.com/formancehq/go-libs/sharedlogging" webhooks "github.com/formancehq/webhooks/pkg" + "github.com/pkg/errors" ) func (h *serverHandler) insertOneConfigHandle(w http.ResponseWriter, r *http.Request) { @@ -24,21 +24,25 @@ func (h *serverHandler) insertOneConfigHandle(w http.ResponseWriter, r *http.Req } if err := cfg.Validate(); err != nil { - err := fmt.Errorf("invalid config: %w", err) + err := errors.Wrap(err, "invalid config") sharedlogging.GetLogger(r.Context()).Errorf(err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) return } - if id, err := insertOneConfig(r.Context(), cfg, h.store); err != nil { + c, err := h.store.InsertOneConfig(r.Context(), cfg) + if err == nil { + sharedlogging.GetLogger(r.Context()).Infof("POST %s: inserted id %s", PathConfigs, c.ID) + resp := sharedapi.BaseResponse[webhooks.Config]{ + Data: &c, + } + if err := json.NewEncoder(w).Encode(resp); err != nil { + sharedlogging.GetLogger(r.Context()).Errorf("json.Encoder.Encode: %s", err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + } else { sharedlogging.GetLogger(r.Context()).Errorf("POST %s: %s", PathConfigs, err) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - return - } else if err := json.NewEncoder(w).Encode(id); err != nil { - sharedlogging.GetLogger(r.Context()).Errorf("json.Encoder.Encode: %s", err) - http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) - return - } else { - sharedlogging.GetLogger(r.Context()).Infof("POST %s: inserted id %s", PathConfigs, id) } } diff --git a/pkg/server/openapi.yaml b/pkg/server/openapi.yaml index ed6165c..2bee94a 100644 --- a/pkg/server/openapi.yaml +++ b/pkg/server/openapi.yaml @@ -55,7 +55,7 @@ paths: data: type: array items: - $ref: '#/components/schemas/Config' + $ref: '#/components/schemas/ConfigActivated' required: - data @@ -83,13 +83,11 @@ paths: required: true responses: "200": - description: Config created successfully, returns its ID. + description: Config created successfully. content: - text/plain: - schema: - type: string - description: Config ID - example: 4997257d-dfb6-445b-929c-cbe2ab182818 + application/json: + schema: + $ref: '#/components/schemas/ConfigActivatedResponse' "400": description: Bad Request @@ -115,7 +113,7 @@ paths: example: 4997257d-dfb6-445b-929c-cbe2ab182818 responses: "200": - description: OK + description: Config successfully deleted. content: {} /configs/{id}/activate: @@ -138,21 +136,7 @@ paths: content: application/json: schema: - type: object - required: - - cursor - properties: - cursor: - allOf: - - $ref: '#/components/schemas/Cursor' - - type: object - properties: - data: - type: array - items: - $ref: '#/components/schemas/Config' - required: - - data + $ref: '#/components/schemas/ConfigActivatedResponse' "304": description: Config not modified, was already activated. content: {} @@ -177,21 +161,7 @@ paths: content: application/json: schema: - type: object - required: - - cursor - properties: - cursor: - allOf: - - $ref: '#/components/schemas/Cursor' - - type: object - properties: - data: - type: array - items: - $ref: '#/components/schemas/Config' - required: - - data + $ref: '#/components/schemas/ConfigDeactivatedResponse' "304": description: Config not modified, was already deactivated. content: {} @@ -218,7 +188,6 @@ paths: example: "V0bivxRWveaoz08afqjU6Ko/jwO0Cb+3" required: - secret - parameters: - name: id in: path @@ -229,25 +198,11 @@ paths: example: 4997257d-dfb6-445b-929c-cbe2ab182818 responses: "200": - description: OK + description: Secret successfully changed. content: application/json: schema: - type: object - required: - - cursor - properties: - cursor: - allOf: - - $ref: '#/components/schemas/Cursor' - - type: object - properties: - data: - type: array - items: - $ref: '#/components/schemas/Config' - required: - - data + $ref: '#/components/schemas/ConfigActivatedResponse' components: schemas: @@ -266,7 +221,19 @@ components: example: "TYPE1" example: ["TYPE1", "TYPE2"] - Config: + ConfigActivatedResponse: + type: object + properties: + data: + $ref: '#/components/schemas/ConfigActivated' + + ConfigDeactivatedResponse: + type: object + properties: + data: + $ref: '#/components/schemas/ConfigDeactivated' + + ConfigActivated: properties: endpoint: type: string @@ -292,6 +259,32 @@ components: format: date-time example: "2022-07-20T08:32:59Z" + ConfigDeactivated: + properties: + endpoint: + type: string + example: "https://example.com" + secret: + type: string + example: "V0bivxRWveaoz08afqjU6Ko/jwO0Cb+3" + eventTypes: + type: array + items: + type: string + example: "TYPE1" + example: ["TYPE1", "TYPE2"] + active: + type: boolean + example: false + createdAt: + type: string + format: date-time + example: "2022-07-20T07:31:40Z" + modifiedAt: + type: string + format: date-time + example: "2022-07-20T08:32:59Z" + Cursor: type: object properties: diff --git a/pkg/server/change_secret.go b/pkg/server/secret.go similarity index 86% rename from pkg/server/change_secret.go rename to pkg/server/secret.go index b66951a..31e9ef9 100644 --- a/pkg/server/change_secret.go +++ b/pkg/server/secret.go @@ -2,13 +2,14 @@ package server import ( "encoding/json" - "errors" "net/http" "github.com/formancehq/go-libs/sharedapi" "github.com/formancehq/go-libs/sharedlogging" webhooks "github.com/formancehq/webhooks/pkg" + "github.com/formancehq/webhooks/pkg/storage" "github.com/go-chi/chi/v5" + "github.com/pkg/errors" ) func (h *serverHandler) changeSecretHandle(w http.ResponseWriter, r *http.Request) { @@ -31,19 +32,19 @@ func (h *serverHandler) changeSecretHandle(w http.ResponseWriter, r *http.Reques return } - cursor, err := changeOneConfigSecret(r.Context(), id, sec.Secret, h.store) + c, err := h.store.UpdateOneConfigSecret(r.Context(), id, sec.Secret) if err == nil { sharedlogging.GetLogger(r.Context()).Infof("PUT %s/%s%s", PathConfigs, id, PathChangeSecret) resp := sharedapi.BaseResponse[webhooks.Config]{ - Cursor: &cursor, + Data: &c, } if err := json.NewEncoder(w).Encode(resp); err != nil { sharedlogging.GetLogger(r.Context()).Errorf("json.Encoder.Encode: %s", err) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } - } else if errors.Is(err, ErrConfigNotFound) { - sharedlogging.GetLogger(r.Context()).Infof("PUT %s/%s%s: %s", PathConfigs, id, PathChangeSecret, ErrConfigNotFound) + } else if errors.Is(err, storage.ErrConfigNotFound) { + sharedlogging.GetLogger(r.Context()).Infof("PUT %s/%s%s: %s", PathConfigs, id, PathChangeSecret, storage.ErrConfigNotFound) http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) } else { sharedlogging.GetLogger(r.Context()).Errorf("PUT %s/%s%s: %s", PathConfigs, id, PathChangeSecret, err) diff --git a/pkg/server/service.go b/pkg/server/service.go deleted file mode 100644 index 74b48cf..0000000 --- a/pkg/server/service.go +++ /dev/null @@ -1,92 +0,0 @@ -package server - -import ( - "context" - - "github.com/formancehq/go-libs/sharedapi" - "github.com/formancehq/go-libs/sharedlogging" - webhooks "github.com/formancehq/webhooks/pkg" - "github.com/formancehq/webhooks/pkg/storage" - "github.com/pkg/errors" -) - -var ( - ErrConfigNotFound = errors.New("config not found") - ErrConfigNotModified = errors.New("config not modified") - ErrConfigNotDeleted = errors.New("config not deleted") -) - -func insertOneConfig(ctx context.Context, cfg webhooks.ConfigUser, store storage.Store) (string, error) { - id, err := store.InsertOneConfig(ctx, cfg) - if err != nil { - return "", errors.Wrap(err, "store.insertOneConfig") - } - - sharedlogging.GetLogger(ctx).Debug("insertOneConfig: id: ", id) - return id, nil -} - -func deleteOneConfig(ctx context.Context, id string, store storage.Store) error { - if _, err := findConfig(ctx, store, id); err != nil { - return errors.Wrap(err, "findConfig") - } - - if deletedCount, err := store.DeleteOneConfig(ctx, id); err != nil { - return errors.Wrap(err, "store.deleteOneConfig") - } else if deletedCount == 0 { - return ErrConfigNotDeleted - } - - sharedlogging.GetLogger(ctx).Debug("deleteOneConfig: id: ", id) - return nil -} - -func updateOneConfigActivation(ctx context.Context, active bool, id string, store storage.Store) (sharedapi.Cursor[webhooks.Config], error) { - if _, err := findConfig(ctx, store, id); err != nil { - return sharedapi.Cursor[webhooks.Config]{}, errors.Wrap(err, "findConfig") - } - - if _, modifiedCount, _, _, err := store.UpdateOneConfigActivation(ctx, id, active); err != nil { - return sharedapi.Cursor[webhooks.Config]{}, errors.Wrap(err, "store.updateOneConfigActivation") - } else if modifiedCount == 0 { - return sharedapi.Cursor[webhooks.Config]{}, ErrConfigNotModified - } - - cursor, err := findConfig(ctx, store, id) - if err != nil { - return sharedapi.Cursor[webhooks.Config]{}, errors.Wrap(err, "findConfig") - } - - sharedlogging.GetLogger(ctx).Debugf("updateOneConfigActivation (%v): id: %s", active, id) - return cursor, nil -} - -func changeOneConfigSecret(ctx context.Context, id, secret string, store storage.Store) (sharedapi.Cursor[webhooks.Config], error) { - if _, err := findConfig(ctx, store, id); err != nil { - return sharedapi.Cursor[webhooks.Config]{}, errors.Wrap(err, "findConfig") - } - - if _, modifiedCount, _, _, err := store.UpdateOneConfigSecret(ctx, id, secret); err != nil { - return sharedapi.Cursor[webhooks.Config]{}, errors.Wrap(err, "store.UpdateOneConfigSecret") - } else if modifiedCount == 0 { - return sharedapi.Cursor[webhooks.Config]{}, ErrConfigNotModified - } - - cursor, err := findConfig(ctx, store, id) - if err != nil { - return sharedapi.Cursor[webhooks.Config]{}, errors.Wrap(err, "findConfig") - } - - sharedlogging.GetLogger(ctx).Debug("changeOneConfigSecret: id: ", id) - return cursor, nil -} - -func findConfig(ctx context.Context, store storage.Store, id string) (cur sharedapi.Cursor[webhooks.Config], err error) { - if cur, err = store.FindManyConfigs(ctx, map[string]any{webhooks.KeyID: id}); err != nil { - return sharedapi.Cursor[webhooks.Config]{}, errors.Wrap(err, "store.FindManyConfigs") - } else if len(cur.Data) == 0 { - return sharedapi.Cursor[webhooks.Config]{}, ErrConfigNotFound - } - - return -} diff --git a/pkg/storage/mongo/mongo.go b/pkg/storage/mongo/mongo.go index d9ebe2b..46070d5 100644 --- a/pkg/storage/mongo/mongo.go +++ b/pkg/storage/mongo/mongo.go @@ -2,19 +2,15 @@ package mongo import ( "context" - "fmt" "time" - "github.com/formancehq/go-libs/sharedapi" "github.com/formancehq/go-libs/sharedlogging" "github.com/formancehq/webhooks/cmd/flag" webhooks "github.com/formancehq/webhooks/pkg" "github.com/formancehq/webhooks/pkg/storage" - "github.com/google/uuid" "github.com/pkg/errors" "github.com/spf13/viper" "go.mongodb.org/mongo-driver/bson" - "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readpref" @@ -39,10 +35,10 @@ func NewStore() (storage.Store, error) { client, err := mongo.Connect(ctx, options.Client().ApplyURI(mongoDBUri).SetMonitor(otelmongo.NewMonitor())) if err != nil { - return Store{}, fmt.Errorf("mongo.Connect: %w", err) + return Store{}, errors.Wrap(err, "mongo.Connect") } if err := client.Ping(ctx, readpref.Primary()); err != nil { - return Store{}, fmt.Errorf("mongo.Client.Ping: %w", err) + return Store{}, errors.Wrap(err, "mongo.Client.Ping") } return Store{ @@ -57,77 +53,90 @@ func NewStore() (storage.Store, error) { }, nil } -func (s Store) FindManyConfigs(ctx context.Context, filter map[string]any) (sharedapi.Cursor[webhooks.Config], error) { - res := sharedapi.Cursor[webhooks.Config]{ - Data: []webhooks.Config{}, - } +func (s Store) FindManyConfigs(ctx context.Context, filter map[string]any) ([]webhooks.Config, error) { + res := []webhooks.Config{} opts := options.Find().SetSort(bson.M{webhooks.KeyUpdatedAt: -1}) cur, err := s.configsCollection.Find(ctx, filter, opts) if err != nil { - return res, fmt.Errorf("mongo.Collection.Find: %w", err) + return res, errors.Wrap(err, "mongo.Collection.Find") } defer cur.Close(ctx) - if err := cur.All(ctx, &res.Data); err != nil { - return sharedapi.Cursor[webhooks.Config]{}, - fmt.Errorf("mongo.Cursor.All: %w", err) + if err := cur.All(ctx, &res); err != nil { + return []webhooks.Config{}, errors.Wrap(err, "mongo.Cursor.All") } return res, nil } -func (s Store) InsertOneConfig(ctx context.Context, cfgUser webhooks.ConfigUser) (insertedID string, err error) { - cfg := webhooks.Config{ - ConfigUser: cfgUser, - ID: uuid.NewString(), - Active: true, - CreatedAt: time.Now().UTC(), - UpdatedAt: time.Now().UTC(), - } - - res, err := s.configsCollection.InsertOne(ctx, cfg) +func (s Store) InsertOneConfig(ctx context.Context, cfgUser webhooks.ConfigUser) (webhooks.Config, error) { + cfg := webhooks.NewConfig(cfgUser) + _, err := s.configsCollection.InsertOne(ctx, cfg) if err != nil { - return "", fmt.Errorf("store.Collection.InsertOne: %w", err) + return webhooks.Config{}, errors.Wrap(err, "store.Collection.InsertOne") } - return res.InsertedID.(string), nil + return cfg, nil } -func (s Store) DeleteOneConfig(ctx context.Context, id string) (deletedCount int64, err error) { +func (s Store) DeleteOneConfig(ctx context.Context, id string) error { res, err := s.configsCollection.DeleteOne(ctx, bson.D{ {Key: webhooks.KeyID, Value: id}, }) if err != nil { - return 0, fmt.Errorf("momgo.Collection.DeleteOne: %w", err) + return errors.Wrap(err, "momgo.Collection.DeleteOne") } - return res.DeletedCount, nil + if res.DeletedCount == 0 { + return storage.ErrConfigNotFound + } + + return nil } -func (s Store) UpdateOneConfigActivation(ctx context.Context, id string, active bool, -) (matchedCount, modifiedCount, upsertedCount int64, upsertedID any, err error) { - filter := bson.D{ +func (s Store) UpdateOneConfigActivation(ctx context.Context, id string, active bool) (webhooks.Config, error) { + cfg := webhooks.Config{} + filter := bson.D{{Key: webhooks.KeyID, Value: id}} + if err := s.configsCollection.FindOne(ctx, filter).Decode(&cfg); err != nil { + if errors.Is(err, mongo.ErrNoDocuments) { + return webhooks.Config{}, storage.ErrConfigNotFound + } + return webhooks.Config{}, errors.Wrap(err, "decode config") + } + if cfg.Active == active { + return webhooks.Config{}, storage.ErrConfigNotModified + } + + filter = bson.D{ {Key: webhooks.KeyID, Value: id}, {Key: webhooks.KeyActive, Value: !active}, } - update := bson.D{{Key: "$set", Value: bson.D{ {Key: webhooks.KeyActive, Value: active}, {Key: webhooks.KeyUpdatedAt, Value: time.Now().UTC()}, }}} - - res, err := s.configsCollection.UpdateOne(ctx, filter, update) - if err != nil { - return 0, 0, 0, nil, - errors.Wrap(err, "mongo.Collection.UpdateOne") + if _, err := s.configsCollection.UpdateOne(ctx, filter, update); err != nil { + return webhooks.Config{}, errors.Wrap(err, "mongo.Collection.UpdateOne") } - return res.MatchedCount, res.ModifiedCount, res.UpsertedCount, res.UpsertedID, nil + cfg.Active = active + return cfg, nil } -func (s Store) UpdateOneConfigSecret(ctx context.Context, id, secret string, -) (matchedCount, modifiedCount, upsertedCount int64, upsertedID any, err error) { - filter := bson.D{ +func (s Store) UpdateOneConfigSecret(ctx context.Context, id, secret string) (webhooks.Config, error) { + cfg := webhooks.Config{} + filter := bson.D{{Key: webhooks.KeyID, Value: id}} + if err := s.configsCollection.FindOne(ctx, filter).Decode(&cfg); err != nil { + if errors.Is(err, mongo.ErrNoDocuments) { + return webhooks.Config{}, storage.ErrConfigNotFound + } + return webhooks.Config{}, errors.Wrap(err, "decode updated config") + } + if cfg.Secret == secret { + return webhooks.Config{}, storage.ErrConfigNotModified + } + + filter = bson.D{ {Key: webhooks.KeyID, Value: id}, {Key: webhooks.KeySecret, Value: bson.D{ {Key: "$ne", Value: secret}, @@ -137,29 +146,25 @@ func (s Store) UpdateOneConfigSecret(ctx context.Context, id, secret string, {Key: webhooks.KeySecret, Value: secret}, {Key: webhooks.KeyUpdatedAt, Value: time.Now().UTC()}, }}} - - res, err := s.configsCollection.UpdateOne(ctx, filter, update) - if err != nil { - return 0, 0, 0, nil, - fmt.Errorf("mongo.Collection.UpdateOne: %w", err) + if _, err := s.configsCollection.UpdateOne(ctx, filter, update); err != nil { + return webhooks.Config{}, errors.Wrap(err, "mongo.Collection.UpdateOne") } - return res.MatchedCount, res.ModifiedCount, res.UpsertedCount, res.UpsertedID, nil + cfg.Secret = secret + return cfg, nil } -func (s Store) FindManyAttempts(ctx context.Context, filter map[string]any) (sharedapi.Cursor[webhooks.Attempt], error) { - res := sharedapi.Cursor[webhooks.Attempt]{ - Data: []webhooks.Attempt{}, - } +func (s Store) FindManyAttempts(ctx context.Context, filter map[string]any) ([]webhooks.Attempt, error) { + res := []webhooks.Attempt{} opts := options.Find().SetSort(bson.M{webhooks.KeyID: -1}) cur, err := s.attemptsCollection.Find(ctx, filter, opts) if err != nil { - return res, fmt.Errorf("mongo.Collection.Find: %w", err) + return res, errors.Wrap(err, "mongo.Collection.Find") } defer cur.Close(ctx) - if err := cur.All(ctx, &res.Data); err != nil { - return res, fmt.Errorf("mongo.Cursor.All: %w", err) + if err := cur.All(ctx, &res); err != nil { + return res, errors.Wrap(err, "mongo.Cursor.All") } return res, nil @@ -179,8 +184,15 @@ func (s Store) FindDistinctWebhookIDs(ctx context.Context, filter map[string]any return res, nil } -func (s Store) UpdateManyAttemptsStatus(ctx context.Context, webhookID string, status string, -) (matchedCount, modifiedCount, upsertedCount int64, upsertedID any, err error) { +func (s Store) UpdateManyAttemptsStatus(ctx context.Context, webhookID, status string) ([]webhooks.Attempt, error) { + atts, err := s.FindManyAttempts(ctx, map[string]any{webhooks.KeyWebhookID: webhookID}) + if err != nil { + return []webhooks.Attempt{}, errors.Wrap(err, "mongo.Collection.UpdateMany") + } + if len(atts) == 0 { + return []webhooks.Attempt{}, storage.ErrAttemptIDNotFound + } + filter := bson.D{ {Key: webhooks.KeyWebhookID, Value: webhookID}, {Key: webhooks.KeyStatus, Value: bson.D{ @@ -193,20 +205,25 @@ func (s Store) UpdateManyAttemptsStatus(ctx context.Context, webhookID string, s res, err := s.attemptsCollection.UpdateMany(ctx, filter, update) if err != nil { - return 0, 0, 0, nil, - fmt.Errorf("mongo.Collection.UpdateMany: %w", err) + return []webhooks.Attempt{}, errors.Wrap(err, "mongo.Collection.UpdateMany") + } + if res.ModifiedCount == 0 { + return []webhooks.Attempt{}, storage.ErrAttemptNotModified } - return res.MatchedCount, res.ModifiedCount, res.UpsertedCount, res.UpsertedID, nil + for i := range atts { + atts[i].Status = status + } + return atts, nil } -func (s Store) InsertOneAttempt(ctx context.Context, att webhooks.Attempt) (insertedID string, err error) { - res, err := s.attemptsCollection.InsertOne(ctx, att) +func (s Store) InsertOneAttempt(ctx context.Context, att webhooks.Attempt) error { + _, err := s.attemptsCollection.InsertOne(ctx, att) if err != nil { - return "", fmt.Errorf("store.Collection.InsertOne: %w", err) + return errors.Wrap(err, "store.Collection.InsertOne") } - return res.InsertedID.(primitive.ObjectID).String(), nil + return nil } func (s Store) Close(ctx context.Context) error { @@ -215,7 +232,8 @@ func (s Store) Close(ctx context.Context) error { } if err := s.client.Disconnect(ctx); err != nil { - return fmt.Errorf("mongo.Client.Disconnect: %w", err) + return errors.Wrap(err, "mongo.Client.Disconnect") } + return nil } diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 9f68cbe..a19bfef 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -3,8 +3,8 @@ package storage import ( "context" - "github.com/formancehq/go-libs/sharedapi" webhooks "github.com/formancehq/webhooks/pkg" + "github.com/pkg/errors" ) const ( @@ -12,15 +12,22 @@ const ( CollectionAttempts = "attempts" ) +var ( + ErrConfigNotFound = errors.New("config not found") + ErrConfigNotModified = errors.New("config not modified") + ErrAttemptIDNotFound = errors.New("attempt webhookID not found") + ErrAttemptNotModified = errors.New("attempt not modified") +) + type Store interface { - FindManyConfigs(ctx context.Context, filter map[string]any) (sharedapi.Cursor[webhooks.Config], error) - InsertOneConfig(ctx context.Context, cfg webhooks.ConfigUser) (string, error) - DeleteOneConfig(ctx context.Context, id string) (int64, error) - UpdateOneConfigActivation(ctx context.Context, id string, active bool) (matchedCount, modifiedCount, upsertedCount int64, upsertedID any, err error) - UpdateOneConfigSecret(ctx context.Context, id, secret string) (matchedCount, modifiedCount, upsertedCount int64, upsertedID any, err error) - FindManyAttempts(ctx context.Context, filter map[string]any) (sharedapi.Cursor[webhooks.Attempt], error) + FindManyConfigs(ctx context.Context, filter map[string]any) ([]webhooks.Config, error) + InsertOneConfig(ctx context.Context, cfg webhooks.ConfigUser) (webhooks.Config, error) + DeleteOneConfig(ctx context.Context, id string) error + UpdateOneConfigActivation(ctx context.Context, id string, active bool) (webhooks.Config, error) + UpdateOneConfigSecret(ctx context.Context, id, secret string) (webhooks.Config, error) + FindManyAttempts(ctx context.Context, filter map[string]any) ([]webhooks.Attempt, error) FindDistinctWebhookIDs(ctx context.Context, filter map[string]any) ([]string, error) - UpdateManyAttemptsStatus(ctx context.Context, webhookID string, status string) (matchedCount, modifiedCount, upsertedCount int64, upsertedID any, err error) - InsertOneAttempt(ctx context.Context, att webhooks.Attempt) (insertedID string, err error) + UpdateManyAttemptsStatus(ctx context.Context, webhookID string, status string) ([]webhooks.Attempt, error) + InsertOneAttempt(ctx context.Context, att webhooks.Attempt) error Close(ctx context.Context) error } diff --git a/pkg/worker/messages/worker.go b/pkg/worker/messages/worker.go index 802c38e..917e7e3 100644 --- a/pkg/worker/messages/worker.go +++ b/pkg/worker/messages/worker.go @@ -150,12 +150,12 @@ func (w *WorkerMessages) processMessage(ctx context.Context, msgValue []byte) er filter := map[string]any{webhooks.KeyEventTypes: ev.Type} sharedlogging.GetLogger(ctx).Debugf("searching configs with filter: %+v", filter) - cur, err := w.store.FindManyConfigs(ctx, filter) + cfgs, err := w.store.FindManyConfigs(ctx, filter) if err != nil { return errors.Wrap(err, "storage.store.FindManyConfigs") } - for _, cfg := range cur.Data { + for _, cfg := range cfgs { sharedlogging.GetLogger(ctx).Debugf("found one config: %+v", cfg) data, err := json.Marshal(ev) if err != nil { @@ -174,7 +174,7 @@ func (w *WorkerMessages) processMessage(ctx context.Context, msgValue []byte) er attempt.WebhookID, cfg.Endpoint, ev.Type) } - if _, err := w.store.InsertOneAttempt(ctx, attempt); err != nil { + if err := w.store.InsertOneAttempt(ctx, attempt); err != nil { return errors.Wrap(err, "storage.store.InsertOneAttempt") } } diff --git a/pkg/worker/retries/worker.go b/pkg/worker/retries/worker.go index 1fd84b3..5392724 100644 --- a/pkg/worker/retries/worker.go +++ b/pkg/worker/retries/worker.go @@ -94,30 +94,33 @@ func (w *WorkerRetries) attemptRetries(ctx context.Context, errChan chan error) for _, id := range ids { filter[webhooks.KeyWebhookID] = id - res, err := w.store.FindManyAttempts(ctx, filter) + atts, err := w.store.FindManyAttempts(ctx, filter) if err != nil { errChan <- errors.Wrap(err, "storage.Store.FindManyAttempts") continue } - if len(res.Data) == 0 { + if len(atts) == 0 { errChan <- fmt.Errorf("%w for webhookID: %s", ErrNoAttemptsFound, id) continue } - newAttemptNb := res.Data[0].RetryAttempt + 1 + newAttemptNb := atts[0].RetryAttempt + 1 attempt, err := webhooks.MakeAttempt(ctx, w.httpClient, w.retriesSchedule, - id, newAttemptNb, res.Data[0].Config, []byte(res.Data[0].Payload)) + id, newAttemptNb, atts[0].Config, []byte(atts[0].Payload)) if err != nil { errChan <- errors.Wrap(err, "webhooks.MakeAttempt") continue } - if _, err := w.store.InsertOneAttempt(ctx, attempt); err != nil { + if err := w.store.InsertOneAttempt(ctx, attempt); err != nil { errChan <- errors.Wrap(err, "storage.Store.InsertOneAttempt retried") continue } - if _, _, _, _, err := w.store.UpdateManyAttemptsStatus(ctx, id, attempt.Status); err != nil { + if _, err := w.store.UpdateManyAttemptsStatus(ctx, id, attempt.Status); err != nil { + if errors.Is(err, storage.ErrAttemptNotModified) { + continue + } errChan <- errors.Wrap(err, "storage.Store.UpdateManyAttemptsStatus") continue } diff --git a/test/main_test.go b/test/main_test.go index de1a6cc..e40bae7 100644 --- a/test/main_test.go +++ b/test/main_test.go @@ -17,6 +17,7 @@ import ( webhooks "github.com/formancehq/webhooks/pkg" "github.com/spf13/pflag" "github.com/spf13/viper" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -95,3 +96,17 @@ func decodeCursorResponse[T any](t *testing.T, reader io.Reader) *sharedapi.Curs require.NoError(t, err) return res.Cursor } + +func decodeSingleResponse[T any](t *testing.T, reader io.Reader) (T, bool) { + res := sharedapi.BaseResponse[T]{} + if !decode(t, reader, &res) { + var zero T + return zero, false + } + return *res.Data, true +} + +func decode(t *testing.T, reader io.Reader, v interface{}) bool { + err := json.NewDecoder(reader).Decode(v) + return assert.NoError(t, err) +} diff --git a/test/server_test.go b/test/server_test.go index 3070b21..5a0d9ba 100644 --- a/test/server_test.go +++ b/test/server_test.go @@ -2,7 +2,6 @@ package test_test import ( "context" - "encoding/json" "net/http" "net/url" "strings" @@ -66,7 +65,9 @@ func TestServer(t *testing.T) { t.Run("valid", func(t *testing.T) { for i, cfg := range validConfigs { resBody := requestServer(t, http.MethodPost, server.PathConfigs, http.StatusOK, cfg) - assert.NoError(t, json.NewDecoder(resBody).Decode(&insertedIds[i])) + c, ok := decodeSingleResponse[webhooks.Config](t, resBody) + assert.Equal(t, true, ok) + insertedIds[i] = c.ID require.NoError(t, resBody.Close()) } }) @@ -133,13 +134,13 @@ func TestServer(t *testing.T) { t.Run("PUT "+server.PathConfigs, func(t *testing.T) { t.Run(server.PathDeactivate, func(t *testing.T) { resBody := requestServer(t, http.MethodPut, server.PathConfigs+"/"+insertedIds[0]+server.PathDeactivate, http.StatusOK) - cur := decodeCursorResponse[webhooks.Config](t, resBody) - assert.Equal(t, 1, len(cur.Data)) - assert.Equal(t, false, cur.Data[0].Active) + c, ok := decodeSingleResponse[webhooks.Config](t, resBody) + assert.Equal(t, true, ok) + assert.Equal(t, false, c.Active) require.NoError(t, resBody.Close()) resBody = requestServer(t, http.MethodGet, server.PathConfigs, http.StatusOK) - cur = decodeCursorResponse[webhooks.Config](t, resBody) + cur := decodeCursorResponse[webhooks.Config](t, resBody) assert.Equal(t, len(validConfigs), len(cur.Data)) assert.Equal(t, false, cur.Data[0].Active) require.NoError(t, resBody.Close()) @@ -149,13 +150,13 @@ func TestServer(t *testing.T) { t.Run(server.PathActivate, func(t *testing.T) { resBody := requestServer(t, http.MethodPut, server.PathConfigs+"/"+insertedIds[0]+server.PathActivate, http.StatusOK) - cur := decodeCursorResponse[webhooks.Config](t, resBody) - assert.Equal(t, 1, len(cur.Data)) - assert.Equal(t, true, cur.Data[0].Active) + c, ok := decodeSingleResponse[webhooks.Config](t, resBody) + assert.Equal(t, true, ok) + assert.Equal(t, true, c.Active) require.NoError(t, resBody.Close()) resBody = requestServer(t, http.MethodGet, server.PathConfigs, http.StatusOK) - cur = decodeCursorResponse[webhooks.Config](t, resBody) + cur := decodeCursorResponse[webhooks.Config](t, resBody) assert.Equal(t, len(validConfigs), len(cur.Data)) assert.Equal(t, true, cur.Data[len(cur.Data)-1].Active) require.NoError(t, resBody.Close()) @@ -165,15 +166,16 @@ func TestServer(t *testing.T) { t.Run(server.PathChangeSecret, func(t *testing.T) { resBody := requestServer(t, http.MethodPut, server.PathConfigs+"/"+insertedIds[0]+server.PathChangeSecret, http.StatusOK) - cur := decodeCursorResponse[webhooks.Config](t, resBody) - assert.Equal(t, 1, len(cur.Data)) + c, ok := decodeSingleResponse[webhooks.Config](t, resBody) + assert.Equal(t, true, ok) + assert.NotEqual(t, "", c.Secret) require.NoError(t, resBody.Close()) validSecret := webhooks.Secret{Secret: webhooks.NewSecret()} resBody = requestServer(t, http.MethodPut, server.PathConfigs+"/"+insertedIds[0]+server.PathChangeSecret, http.StatusOK, validSecret) - cur = decodeCursorResponse[webhooks.Config](t, resBody) - assert.Equal(t, 1, len(cur.Data)) - assert.Equal(t, validSecret.Secret, cur.Data[0].Secret) + c, ok = decodeSingleResponse[webhooks.Config](t, resBody) + assert.Equal(t, true, ok) + assert.Equal(t, validSecret.Secret, c.Secret) require.NoError(t, resBody.Close()) invalidSecret := webhooks.Secret{Secret: "invalid"}