diff --git a/internal/app/api/connector.go b/internal/app/api/connector.go index 38b95ec0..0fd6796d 100644 --- a/internal/app/api/connector.go +++ b/internal/app/api/connector.go @@ -7,6 +7,8 @@ import ( "net/http" "time" + "github.com/formancehq/payments/internal/app/storage" + "github.com/google/uuid" "github.com/formancehq/payments/internal/app/models" @@ -81,7 +83,25 @@ func listTasks[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor](connectorManager *integration.ConnectorManager[Config, Descriptor], ) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - tasks, err := connectorManager.ListTasksStates(r.Context()) + skip, err := integerWithDefault(r, "skip", 0) + if err != nil { + handleValidationError(w, r, err) + + return + } + + limit, err := integerWithDefault(r, "limit", maxPerPage) + if err != nil { + handleValidationError(w, r, err) + + return + } + + if limit > maxPerPage { + limit = maxPerPage + } + + tasks, err := connectorManager.ListTasksStates(r.Context(), storage.Paginate(skip, limit)) if err != nil { handleError(w, r, err) diff --git a/internal/app/integration/manager.go b/internal/app/integration/manager.go index 97624f9c..46792308 100644 --- a/internal/app/integration/manager.go +++ b/internal/app/integration/manager.go @@ -3,6 +3,8 @@ package integration import ( "context" + "github.com/formancehq/payments/internal/app/storage" + "github.com/google/uuid" "github.com/formancehq/payments/internal/app/models" @@ -220,9 +222,9 @@ func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) IsInstalled(ctx cont } func (l *ConnectorManager[ConnectorConfig, - TaskDescriptor]) ListTasksStates(ctx context.Context, + TaskDescriptor]) ListTasksStates(ctx context.Context, pagination storage.Paginator, ) ([]models.Task, error) { - return l.scheduler.ListTasks(ctx) + return l.scheduler.ListTasks(ctx, pagination) } func (l *ConnectorManager[Config, TaskDescriptor]) ReadTaskState(ctx context.Context, taskID uuid.UUID) (*models.Task, error) { diff --git a/internal/app/storage/task.go b/internal/app/storage/task.go index 24a692c3..f4fabcde 100644 --- a/internal/app/storage/task.go +++ b/internal/app/storage/task.go @@ -106,7 +106,7 @@ func (s *Storage) ListTasksByStatus(ctx context.Context, provider models.Connect return tasks, nil } -func (s *Storage) ListTasks(ctx context.Context, provider models.ConnectorProvider) ([]models.Task, error) { +func (s *Storage) ListTasks(ctx context.Context, provider models.ConnectorProvider, pagination Paginator) ([]models.Task, error) { connector, err := s.GetConnector(ctx, provider) if err != nil { return nil, e("failed to get connector", err) @@ -114,9 +114,12 @@ func (s *Storage) ListTasks(ctx context.Context, provider models.ConnectorProvid var tasks []models.Task - err = s.db.NewSelect().Model(&tasks). - Where("connector_id = ?", connector.ID). - Scan(ctx) + q := s.db.NewSelect().Model(&tasks). + Where("connector_id = ?", connector.ID) + + pagination.apply(q) + + err = q.Scan(ctx) if err != nil { return nil, e("failed to get tasks", err) } diff --git a/internal/app/task/scheduler.go b/internal/app/task/scheduler.go index 69b623b6..1d5c687a 100644 --- a/internal/app/task/scheduler.go +++ b/internal/app/task/scheduler.go @@ -51,8 +51,9 @@ type DefaultTaskScheduler[TaskDescriptor payments.TaskDescriptor] struct { } func (s *DefaultTaskScheduler[TaskDescriptor]) ListTasks(ctx context.Context, + pagination storage.Paginator, ) ([]models.Task, error) { - return s.store.ListTasks(ctx, s.provider) + return s.store.ListTasks(ctx, s.provider, pagination) } func (s *DefaultTaskScheduler[TaskDescriptor]) ReadTask(ctx context.Context, taskID uuid.UUID) (*models.Task, error) { diff --git a/internal/app/task/store.go b/internal/app/task/store.go index 3d788ee4..9328a3b7 100644 --- a/internal/app/task/store.go +++ b/internal/app/task/store.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" + "github.com/formancehq/payments/internal/app/storage" + "github.com/google/uuid" "github.com/formancehq/payments/internal/app/models" @@ -13,7 +15,7 @@ type Repository interface { UpdateTaskStatus(ctx context.Context, provider models.ConnectorProvider, descriptor json.RawMessage, status models.TaskStatus, err string) error FindAndUpsertTask(ctx context.Context, provider models.ConnectorProvider, descriptor json.RawMessage, status models.TaskStatus, err string) (*models.Task, error) ListTasksByStatus(ctx context.Context, provider models.ConnectorProvider, status models.TaskStatus) ([]models.Task, error) - ListTasks(ctx context.Context, provider models.ConnectorProvider) ([]models.Task, error) + ListTasks(ctx context.Context, provider models.ConnectorProvider, pagination storage.Paginator) ([]models.Task, error) ReadOldestPendingTask(ctx context.Context, provider models.ConnectorProvider) (*models.Task, error) GetTask(ctx context.Context, id uuid.UUID) (*models.Task, error) GetTaskByDescriptor(ctx context.Context, provider models.ConnectorProvider, descriptor json.RawMessage) (*models.Task, error) diff --git a/internal/app/task/storememory.go b/internal/app/task/storememory.go index 27124a3f..b16b82ea 100644 --- a/internal/app/task/storememory.go +++ b/internal/app/task/storememory.go @@ -53,6 +53,7 @@ func (s *InMemoryStore) GetTaskByDescriptor(ctx context.Context, provider models func (s *InMemoryStore) ListTasks(ctx context.Context, provider models.ConnectorProvider, + pagination storage.Paginator, ) ([]models.Task, error) { ret := make([]models.Task, 0) @@ -115,7 +116,7 @@ func (s *InMemoryStore) ReadOldestPendingTask(ctx context.Context, func (s *InMemoryStore) ListTasksByStatus(ctx context.Context, provider models.ConnectorProvider, taskStatus models.TaskStatus, ) ([]models.Task, error) { - all, err := s.ListTasks(ctx, provider) + all, err := s.ListTasks(ctx, provider, storage.Paginate(0, 0)) if err != nil { return nil, err }