diff --git a/foundation/database/dbmigrate/sql/migrate.sql b/foundation/database/dbmigrate/sql/migrate.sql index 40ee8c0..35f3f69 100644 --- a/foundation/database/dbmigrate/sql/migrate.sql +++ b/foundation/database/dbmigrate/sql/migrate.sql @@ -67,3 +67,7 @@ CREATE INDEX job_id_index ON job_executions (job_id); CREATE INDEX job_executions_start_time_index ON job_executions (start_time); +-- Version: 1.02 +-- Description: Add tags column to jobs table + +ALTER TABLE jobs ADD tags TEXT[]; \ No newline at end of file diff --git a/go.mod b/go.mod index 6a8189a..d104e28 100644 --- a/go.mod +++ b/go.mod @@ -3,78 +3,73 @@ module github.com/GLCharge/distributed-scheduler go 1.20 require ( - github.com/GLCharge/otelzap v0.0.0-20230904131944-57dc7c9994a9 - github.com/ardanlabs/darwin/v3 v3.3.1 + github.com/ardanlabs/darwin/v3 v3.3.0 github.com/cenkalti/backoff/v4 v4.2.1 - github.com/gin-contrib/zap v0.2.0 github.com/google/go-cmp v0.5.9 - github.com/samber/lo v1.39.0 - github.com/spf13/cobra v1.8.0 + github.com/lib/pq v1.2.0 + github.com/samber/lo v1.38.1 + github.com/spf13/cobra v1.7.0 github.com/swaggo/files v1.0.1 github.com/swaggo/gin-swagger v1.6.0 - github.com/swaggo/swag v1.16.2 - github.com/vearne/gin-timeout v0.1.7 + github.com/swaggo/swag v1.8.12 ) require ( github.com/KyleBanks/depth v1.2.1 // indirect - github.com/chenzhuoyu/iasm v0.9.0 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect - github.com/go-openapi/jsonpointer v0.19.6 // indirect - github.com/go-openapi/jsonreference v0.20.2 // indirect - github.com/go-openapi/spec v0.20.9 // indirect - github.com/go-openapi/swag v0.22.4 // indirect + github.com/go-openapi/jsonpointer v0.19.5 // indirect + github.com/go-openapi/jsonreference v0.20.0 // indirect + github.com/go-openapi/spec v0.20.8 // indirect + github.com/go-openapi/swag v0.22.3 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/mailru/easyjson v0.7.7 // indirect - github.com/rogpeppe/go-internal v1.11.0 // indirect + github.com/rogpeppe/go-internal v1.10.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/uptrace/opentelemetry-go-extra/otelutil v0.2.0 // indirect - go.opentelemetry.io/otel v1.15.1 // indirect - go.opentelemetry.io/otel/trace v1.15.1 // indirect - golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect - golang.org/x/tools v0.17.0 // indirect + golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect + golang.org/x/tools v0.7.0 // indirect ) require ( - github.com/ardanlabs/conf/v3 v3.1.6 - github.com/bytedance/sonic v1.10.0 // indirect - github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect + github.com/ardanlabs/conf/v3 v3.1.5 + github.com/bytedance/sonic v1.9.1 // indirect + github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/gin-contrib/sse v0.1.0 // indirect github.com/gin-gonic/gin v1.9.1 github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect - github.com/go-playground/validator/v10 v10.15.3 // indirect + github.com/go-playground/validator/v10 v10.14.0 // indirect github.com/goccy/go-json v0.10.2 // indirect github.com/google/uuid v1.3.0 github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect - github.com/jackc/pgx/v5 v5.4.1 + github.com/jackc/pgx/v5 v5.3.1 github.com/jmoiron/sqlx v1.3.5 github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/cpuid/v2 v2.2.5 // indirect + github.com/klauspost/cpuid/v2 v2.2.4 // indirect github.com/leodido/go-urn v1.2.4 // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/pelletier/go-toml/v2 v2.1.0 // indirect + github.com/pelletier/go-toml/v2 v2.0.8 // indirect github.com/pkg/errors v0.9.1 github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/rabbitmq/amqp091-go v1.9.0 + github.com/rabbitmq/amqp091-go v1.8.1 github.com/robfig/cron/v3 v3.0.1 - github.com/stretchr/testify v1.8.4 + github.com/stretchr/testify v1.8.3 github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.11 // indirect - go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.25.0 - golang.org/x/arch v0.5.0 // indirect - golang.org/x/crypto v0.18.0 // indirect - golang.org/x/net v0.20.0 // indirect - golang.org/x/sys v0.16.0 // indirect - golang.org/x/text v0.14.0 // indirect - google.golang.org/protobuf v1.31.0 // indirect + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.6.0 // indirect + go.uber.org/zap v1.24.0 + golang.org/x/arch v0.3.0 // indirect + golang.org/x/crypto v0.9.0 // indirect + golang.org/x/net v0.10.0 // indirect + golang.org/x/sys v0.8.0 // indirect + golang.org/x/text v0.9.0 // indirect + google.golang.org/protobuf v1.30.0 // indirect gopkg.in/guregu/null.v4 v4.0.0 gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/handlers/jobs.go b/handlers/jobs.go index 8c5aea2..972be08 100644 --- a/handlers/jobs.go +++ b/handlers/jobs.go @@ -6,7 +6,7 @@ import ( "strconv" "github.com/GLCharge/distributed-scheduler/model" - "github.com/GLCharge/distributed-scheduler/service/job" + jobService "github.com/GLCharge/distributed-scheduler/service/job" "github.com/gin-gonic/gin" ) @@ -22,14 +22,14 @@ func JobsRoutesV1(router *gin.Engine, jobsHandler *Jobs) { } } -func NewJobsHandler(service *job.Service) *Jobs { +func NewJobsHandler(service *jobService.Service) *Jobs { return &Jobs{ service: service, } } type Jobs struct { - service *job.Service + service *jobService.Service } type ErrorResponse struct { @@ -181,6 +181,7 @@ func (j *Jobs) DeleteJob() gin.HandlerFunc { // @Produce json // @Param limit query int false "Limit" // @Param offset query int false "Offset" +// @Param tags query array false "Tags" // @Success 200 {object} []model.Job // @Failure 400 {object} ErrorResponse // @Failure 500 {object} ErrorResponse @@ -190,7 +191,9 @@ func (j *Jobs) ListJobs() gin.HandlerFunc { limit, offset := LimitAndOffset(ctx) - jobs, err := j.service.ListJobs(ctx.Request.Context(), limit, offset) + tags := ctx.QueryArray("tags") + + jobs, err := j.service.ListJobs(ctx.Request.Context(), limit, offset, tags) if err != nil { ctx.JSON(http.StatusInternalServerError, ErrorResponse{Error: err.Error()}) return diff --git a/model/job.go b/model/job.go index aed8265..36b6472 100644 --- a/model/job.go +++ b/model/job.go @@ -96,6 +96,8 @@ type Job struct { // when the job is scheduled to run next (can be null if the job is not scheduled to run again) NextRun null.Time `json:"next_run"` + + Tags []string `json:"tags"` } // swagger:model JobUpdate @@ -106,6 +108,8 @@ type JobUpdate struct { CronSchedule *string `json:"cron_schedule,omitempty"` ExecuteAt *time.Time `json:"execute_at,omitempty"` + + Tags *[]string `json:"tags,omitempty"` } func (j *Job) ApplyUpdate(update JobUpdate) { @@ -132,6 +136,10 @@ func (j *Job) ApplyUpdate(update JobUpdate) { j.ExecuteAt = null.TimeFromPtr(update.ExecuteAt) } + if update.Tags != nil { + j.Tags = *update.Tags + } + j.UpdatedAt = time.Now() j.SetInitialRunTime() @@ -332,6 +340,8 @@ type JobCreate struct { // HTTPJob and AMQPJob are mutually exclusive. HTTPJob *HTTPJob `json:"http_job,omitempty"` AMQPJob *AMQPJob `json:"amqp_job,omitempty"` + + Tags []string `json:"tags"` } func (j *JobCreate) ToJob() *Job { @@ -345,6 +355,7 @@ func (j *JobCreate) ToJob() *Job { AMQPJob: j.AMQPJob, CreatedAt: time.Now(), UpdatedAt: time.Now(), + Tags: j.Tags, } job.SetInitialRunTime() diff --git a/service/job/job.go b/service/job/job.go index fce039d..bad4be8 100644 --- a/service/job/job.go +++ b/service/job/job.go @@ -43,12 +43,11 @@ func (s *Service) CreateJob(ctx context.Context, jobCreate *model.JobCreate) (*m return nil, err } - return job, err + return job, nil } // GetJob returns the job with the given ID. func (s *Service) GetJob(ctx context.Context, id uuid.UUID) (*model.Job, error) { - return s.store.GetJob(ctx, id) } @@ -85,10 +84,10 @@ func (s *Service) DeleteJob(ctx context.Context, id uuid.UUID) error { } // ListJobs returns a list of jobs with the given limit and offset. -func (s *Service) ListJobs(ctx context.Context, limit, offset uint64) ([]*model.Job, error) { +func (s *Service) ListJobs(ctx context.Context, limit, offset uint64, tags []string) ([]model.Job, error) { // Implement listing jobs using the store - return s.store.ListJobs(ctx, limit, offset) + return s.store.ListJobs(ctx, limit, offset, tags) } // GetJobsToRun returns a list of jobs that should be run at the given time. diff --git a/store/postgres/models.go b/store/postgres/models.go index 4179d97..acd9d44 100644 --- a/store/postgres/models.go +++ b/store/postgres/models.go @@ -6,27 +6,28 @@ import ( "github.com/GLCharge/distributed-scheduler/model" "github.com/google/uuid" + "github.com/lib/pq" "github.com/pkg/errors" "gopkg.in/guregu/null.v4" ) type jobDB struct { - ID uuid.UUID `db:"id"` - Type string `db:"type"` - Status string `db:"status"` - ExecuteAt null.Time `db:"execute_at"` - CronSchedule null.String `db:"cron_schedule"` - HTTPJob []byte `db:"http_job"` - AMQPJob []byte `db:"amqp_job"` - CreatedAt time.Time `db:"created_at"` - UpdatedAt time.Time `db:"updated_at"` - NextRun null.Time `db:"next_run"` - LockedUntil null.Time `db:"locked_until"` - LockedBy null.String `db:"locked_by"` + ID uuid.UUID `db:"id"` + Type string `db:"type"` + Status string `db:"status"` + ExecuteAt null.Time `db:"execute_at"` + CronSchedule null.String `db:"cron_schedule"` + HTTPJob []byte `db:"http_job"` + AMQPJob []byte `db:"amqp_job"` + CreatedAt time.Time `db:"created_at"` + UpdatedAt time.Time `db:"updated_at"` + NextRun null.Time `db:"next_run"` + LockedUntil null.Time `db:"locked_until"` + LockedBy null.String `db:"locked_by"` + Tags pq.StringArray `db:"tags"` } func toJobDB(j *model.Job) (*jobDB, error) { - dbJ := &jobDB{ ID: j.ID, Type: string(j.Type), @@ -36,6 +37,7 @@ func toJobDB(j *model.Job) (*jobDB, error) { CreatedAt: j.CreatedAt, UpdatedAt: j.UpdatedAt, NextRun: j.NextRun, + Tags: j.Tags, } if j.HTTPJob != nil { @@ -68,6 +70,7 @@ func (j *jobDB) ToJob() (*model.Job, error) { CreatedAt: j.CreatedAt, UpdatedAt: j.UpdatedAt, NextRun: j.NextRun, + Tags: j.Tags, } if err := unmarshalNullableJSON(j.HTTPJob, &job.HTTPJob); err != nil { @@ -99,7 +102,6 @@ type executionDB struct { } func (e *executionDB) ToModel() *model.JobExecution { - return &model.JobExecution{ ID: e.ID, JobID: e.JobID, diff --git a/store/postgres/postgres.go b/store/postgres/postgres.go index 9fb5806..a973f02 100644 --- a/store/postgres/postgres.go +++ b/store/postgres/postgres.go @@ -3,6 +3,7 @@ package postgres import ( "context" "database/sql" + "errors" "fmt" "github.com/GLCharge/otelzap" "gopkg.in/guregu/null.v4" @@ -98,29 +99,31 @@ func (s *pgStore) CreateJob(ctx context.Context, job *model.Job) error { // insert job struct into database query := ` - INSERT INTO jobs ( - id, - type, - status, - execute_at, - cron_schedule, - http_job, - amqp_job, - created_at, - updated_at, - next_run - ) VALUES ( - :id, - :type, - :status, - :execute_at, - :cron_schedule, - :http_job, - :amqp_job, - :created_at, - :updated_at, - :next_run - ) + INSERT INTO jobs ( + id, + type, + status, + execute_at, + cron_schedule, + http_job, + amqp_job, + created_at, + updated_at, + next_run, + tags + ) VALUES ( + :id, + :type, + :status, + :execute_at, + :cron_schedule, + :http_job, + :amqp_job, + :created_at, + :updated_at, + :next_run, + :tags + ) ` _, err = s.db.NamedExecContext(ctx, query, dbJob) @@ -141,7 +144,7 @@ func (s *pgStore) GetJob(ctx context.Context, id uuid.UUID) (*model.Job, error) ` err := s.db.GetContext(ctx, &dbJob, query, id) if err != nil { - if err == sql.ErrNoRows { + if errors.Is(err, sql.ErrNoRows) { return nil, model.ErrJobNotFound } return nil, fmt.Errorf("failed to get job from database: %w", err) @@ -169,25 +172,33 @@ func (s *pgStore) DeleteJob(ctx context.Context, id uuid.UUID) error { return nil } -func (s *pgStore) ListJobs(ctx context.Context, limit, offset uint64) ([]*model.Job, error) { +func (s *pgStore) ListJobs(ctx context.Context, limit, offset uint64, tags []string) ([]model.Job, error) { // get all jobs from database + args := []interface{}{limit, offset} query := ` SELECT * FROM jobs ORDER BY id DESC LIMIT $1 OFFSET $2 ` - var dbJobs []*jobDB - err := s.db.SelectContext(ctx, &dbJobs, query, limit, offset) + if len(tags) > 0 { + args = append(args, tags) + query = ` + SELECT * FROM jobs WHERE tags @> $3 ORDER BY id DESC LIMIT $1 OFFSET $2 + ` + } + + var dbJobs []jobDB + err := s.db.SelectContext(ctx, &dbJobs, query, args...) if err != nil { return nil, fmt.Errorf("failed to get jobs from database: %w", err) } // convert JobDB structs to Job structs - var jobs []*model.Job + jobs := []model.Job{} for _, dbJob := range dbJobs { job, err := dbJob.ToJob() if err != nil { return nil, fmt.Errorf("failed to convert db job to job: %w", err) } - jobs = append(jobs, job) + jobs = append(jobs, *job) } return jobs, nil @@ -214,14 +225,14 @@ func (s *pgStore) GetJobsToRun(ctx context.Context, at time.Time, lockedUntil ti } defer rows.Close() - var dbjobs []*jobDB - err = sqlx.StructScan(rows, &dbjobs) + var dbJobs []*jobDB + err = sqlx.StructScan(rows, &dbJobs) if err != nil { return nil, fmt.Errorf("failed to scan job: %w", err) } var jobs []*model.Job - for _, dbJob := range dbjobs { + for _, dbJob := range dbJobs { job, err := dbJob.ToJob() if err != nil { diff --git a/store/store.go b/store/store.go index 626828c..5bfeb33 100644 --- a/store/store.go +++ b/store/store.go @@ -14,7 +14,7 @@ type Storer interface { CreateJob(ctx context.Context, job *model.Job) error GetJob(ctx context.Context, id uuid.UUID) (*model.Job, error) DeleteJob(ctx context.Context, id uuid.UUID) error - ListJobs(ctx context.Context, limit, offset uint64) ([]*model.Job, error) + ListJobs(ctx context.Context, limit, offset uint64, tags []string) ([]model.Job, error) UpdateJob(ctx context.Context, job *model.Job) error // Get jobs to run