Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
runabol committed Dec 16, 2024
1 parent 555aba0 commit 080ff08
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 6 deletions.
1 change: 1 addition & 0 deletions datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Datastore interface {
GetJobs(ctx context.Context, currentUser, q string, page, size int) (*Page[*tork.JobSummary], error)

CreateScheduledJob(ctx context.Context, s *tork.ScheduledJob) error
GetActiveScheduledJobs(ctx context.Context) ([]*tork.ScheduledJob, error)

CreateUser(ctx context.Context, u *tork.User) error
GetUser(ctx context.Context, username string) (*tork.User, error)
Expand Down
4 changes: 4 additions & 0 deletions datastore/inmemory/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,10 @@ func (ds *InMemoryDatastore) CreateScheduledJob(ctx context.Context, s *tork.Sch
return nil
}

func (ds *InMemoryDatastore) GetActiveScheduledJobs(ctx context.Context) ([]*tork.ScheduledJob, error) {
return nil, errors.New("not implemented")
}

func (ds *InMemoryDatastore) WithTx(ctx context.Context, f func(tx datastore.Datastore) error) error {
return f(ds)
}
Expand Down
50 changes: 50 additions & 0 deletions datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -1217,6 +1217,56 @@ func (ds *PostgresDatastore) CreateScheduledJob(ctx context.Context, sj *tork.Sc
})
}

func (ds *PostgresDatastore) GetActiveScheduledJobs(ctx context.Context) ([]*tork.ScheduledJob, error) {
sjrs := []scheduledJobRecord{}
q := `SELECT * FROM scheduled_jobs where state = 'ACTIVE'`
if err := ds.select_(&sjrs, q); err != nil {
return nil, errors.Wrapf(err, "error getting active scheduled jobs from db")
}
sjs := make([]*tork.ScheduledJob, len(sjrs))
for i, sjr := range sjrs {
tasks := make([]*tork.Task, 0)
if err := json.Unmarshal(sjr.Tasks, &tasks); err != nil {
return nil, errors.Wrapf(err, "error desiralizing scheduled job tasks")
}
u, err := ds.GetUser(ctx, sjr.CreatedBy)
if err != nil {
return nil, err
}
rsp := make([]scheduledPermRecord, 0)
q = `SELECT *
FROM scheduled_jobs_perms
where scheduled_job_id = $1`
if err := ds.select_(&rsp, q, sjr.ID); err != nil {
return nil, errors.Wrapf(err, "error getting scheduled job permissions from db")
}
perms := make([]*tork.Permission, len(rsp))
for i, rp := range rsp {
p := &tork.Permission{}
if rp.RoleID != nil {
role, err := ds.GetRole(ctx, *rp.RoleID)
if err != nil {
return nil, err
}
p.Role = role
} else {
user, err := ds.GetUser(ctx, *rp.UserID)
if err != nil {
return nil, err
}
p.User = user
}
perms[i] = p
}
sj, err := sjr.toScheduledJob(tasks, u, perms)
if err != nil {
return nil, err
}
sjs[i] = sj
}
return sjs, nil
}

func (ds *PostgresDatastore) get(dest interface{}, query string, args ...interface{}) error {
if ds.tx != nil {
return ds.tx.Get(dest, query, args...)
Expand Down
75 changes: 75 additions & 0 deletions datastore/postgres/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,24 @@ type jobRecord struct {
ScheduledJobID *string `db:"scheduled_job_id"`
}

type scheduledJobRecord struct {
ID string `db:"id"`
Cron string `db:"cron_expr"`
Name string `db:"name"`
Description string `db:"description"`
Tags pq.StringArray `db:"tags"`
State string `db:"state"`
CreatedAt time.Time `db:"created_at"`
CreatedBy string `db:"created_by"`
Tasks []byte `db:"tasks"`
Inputs []byte `db:"inputs"`
Output string `db:"output_"`
Defaults []byte `db:"defaults"`
Webhooks []byte `db:"webhooks"`
AutoDelete []byte `db:"auto_delete"`
Secrets []byte `db:"secrets"`
}

type jobPermRecord struct {
ID string `db:"id"`
JobID string `db:"job_id"`
Expand All @@ -92,6 +110,14 @@ type jobPermRecord struct {
CreatedAt time.Time `db:"created_at"`
}

type scheduledPermRecord struct {
ID string `db:"id"`
ScheduledJobID string `db:"scheduled_job_id"`
UserID *string `db:"user_id"`
RoleID *string `db:"role_id"`
CreatedAt time.Time `db:"created_at"`
}

type nodeRecord struct {
ID string `db:"id"`
Name string `db:"name"`
Expand Down Expand Up @@ -358,6 +384,55 @@ func (r jobRecord) toJob(tasks, execution []*tork.Task, createdBy *tork.User, pe
}, nil
}

func (r scheduledJobRecord) toScheduledJob(tasks []*tork.Task, createdBy *tork.User, perms []*tork.Permission) (*tork.ScheduledJob, error) {
var inputs map[string]string
if err := json.Unmarshal(r.Inputs, &inputs); err != nil {
return nil, errors.Wrapf(err, "error deserializing job.inputs")
}
var defaults *tork.JobDefaults
if r.Defaults != nil {
defaults = &tork.JobDefaults{}
if err := json.Unmarshal(r.Defaults, defaults); err != nil {
return nil, errors.Wrapf(err, "error deserializing job.defaults")
}
}
var autoDelete *tork.AutoDelete
if r.AutoDelete != nil {
autoDelete = &tork.AutoDelete{}
if err := json.Unmarshal(r.AutoDelete, autoDelete); err != nil {
return nil, errors.Wrapf(err, "error deserializing job.autoDelete")
}
}
var webhooks []*tork.Webhook
if err := json.Unmarshal(r.Webhooks, &webhooks); err != nil {
return nil, errors.Wrapf(err, "error deserializing job.webhook")
}
var secrets map[string]string
if r.Secrets != nil {
if err := json.Unmarshal(r.Secrets, &secrets); err != nil {
return nil, errors.Wrapf(err, "error deserializing job.secrets")
}
}
return &tork.ScheduledJob{
ID: r.ID,
Cron: r.Cron,
Name: r.Name,
Tags: r.Tags,
State: tork.ScheduledJobState(r.State),
CreatedAt: r.CreatedAt,
CreatedBy: createdBy,
Tasks: tasks,
Inputs: inputs,
Description: r.Description,
Output: r.Output,
Defaults: defaults,
Webhooks: webhooks,
Permissions: perms,
AutoDelete: autoDelete,
Secrets: secrets,
}, nil
}

func (r userRecord) toUser() *tork.User {
n := tork.User{
ID: r.ID,
Expand Down
4 changes: 1 addition & 3 deletions db/postgres/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ CREATE UNIQUE INDEX idx_users_roles_uniq ON users_roles (user_id,role_id);
CREATE TABLE scheduled_jobs (
id varchar(32) not null primary key,
lock_id bigserial not null,
name varchar(64) not null,
description text not null,
tags text[] not null default '{}',
Expand All @@ -64,8 +63,7 @@ CREATE TABLE scheduled_jobs (
secrets jsonb,
created_at timestamp not null,
created_by varchar(32) not null references users(id),
state varchar(10) not null,
last_run_at timestamp
state varchar(10) not null
);
CREATE TABLE scheduled_jobs_perms (
Expand Down
2 changes: 1 addition & 1 deletion internal/coordinator/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (s *API) SubmitScheduledJob(ctx context.Context, ji *input.Job) (*tork.Sche
Cron: j.Schedule.Cron,
Inputs: j.Inputs,
Tasks: j.Tasks,
State: tork.ScheduledJobStateScheduled,
State: tork.ScheduledJobStateActive,
}
currentUser := ctx.Value(tork.USERNAME)
if currentUser != nil {
Expand Down
15 changes: 15 additions & 0 deletions internal/coordinator/handlers/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,27 @@ func NewJobSchedulerHandler(ds datastore.Datastore, b mq.Broker, l locker.Locker
return nil, err
}
sc.Start()

h := &jobSchedulerHandler{
ds: ds,
scheduler: sc,
broker: b,
m: make(map[string]gocron.Job),
}

ctx := context.Background()

activeJobs, err := ds.GetActiveScheduledJobs(ctx)
if err != nil {
return nil, err
}

for _, aj := range activeJobs {
if err := h.handle(ctx, aj); err != nil {
return nil, err
}
}

return h.handle, nil
}

Expand Down
4 changes: 2 additions & 2 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ const (
type ScheduledJobState string

const (
ScheduledJobStateScheduled ScheduledJobState = "SCHEDULED"
ScheduledJobStatePaused ScheduledJobState = "PAUSED"
ScheduledJobStateActive ScheduledJobState = "ACTIVE"
ScheduledJobStatePaused ScheduledJobState = "PAUSED"
)

type Job struct {
Expand Down

0 comments on commit 080ff08

Please sign in to comment.