Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: various agent pool and job bugs #659

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 44 additions & 42 deletions internal/agent/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,57 +221,59 @@ func (d *daemon) Start(ctx context.Context) error {
// fetch jobs allocated to this agent and launch workers to do jobs; also
// handle cancelation signals for jobs
for {
// block on waiting for jobs
var jobs []*Job
getJobs := func() (err error) {
processJobs := func() (err error) {
d.poolLogger.Info("waiting for next job")
jobs, err = d.getAgentJobs(ctx, agent.ID)
return err
// block on waiting for jobs
jobs, err := d.getAgentJobs(ctx, agent.ID)
if err != nil {
return err
}
for _, j := range jobs {
if j.Status == JobAllocated {
d.poolLogger.Info("received job", "job", j)
// start job and receive job token in return
token, err := d.startJob(ctx, j.Spec)
if err != nil {
if ctx.Err() != nil {
return nil
}
d.poolLogger.Error(err, "starting job")
continue
}
d.poolLogger.V(0).Info("started job")
op := newOperation(newOperationOptions{
logger: d.poolLogger.WithValues("job", j),
client: d.client,
config: d.config,
job: j,
downloader: d.downloader,
envs: d.envs,
token: token,
})
// check operation in with the terminator, so that if a cancelation signal
// arrives it can be handled accordingly for the duration of the operation.
terminator.checkIn(j.Spec, op)
op.V(0).Info("started job")
g.Go(func() error {
op.doAndFinish()
terminator.checkOut(op.job.Spec)
return nil
})
} else if j.Signaled != nil {
d.poolLogger.Info("received cancelation signal", "force", *j.Signaled, "job", j)
terminator.cancel(j.Spec, *j.Signaled, true)
}
}
return nil
}
policy := backoff.WithContext(backoff.NewExponentialBackOff(), ctx)
_ = backoff.RetryNotify(getJobs, policy, func(err error, next time.Duration) {
_ = backoff.RetryNotify(processJobs, policy, func(err error, next time.Duration) {
d.poolLogger.Error(err, "waiting for next job", "backoff", next)
})
// only stop retrying if context is canceled
if ctx.Err() != nil {
return nil
}
for _, j := range jobs {
if j.Status == JobAllocated {
d.poolLogger.Info("received job", "job", j)
// start job and receive job token in return
token, err := d.startJob(ctx, j.Spec)
if err != nil {
if ctx.Err() != nil {
return nil
}
d.poolLogger.Error(err, "starting job")
continue
}
d.poolLogger.V(0).Info("started job")
op := newOperation(newOperationOptions{
logger: d.poolLogger.WithValues("job", j),
client: d.client,
config: d.config,
job: j,
downloader: d.downloader,
envs: d.envs,
token: token,
})
// check operation in with the terminator, so that if a cancelation signal
// arrives it can be handled accordingly for the duration of the operation.
terminator.checkIn(j.Spec, op)
op.V(0).Info("started job")
g.Go(func() error {
op.doAndFinish()
terminator.checkOut(op.job.Spec)
return nil
})
} else if j.Signaled != nil {
d.poolLogger.Info("received cancelation signal", "force", *j.Signaled, "job", j)
terminator.cancel(j.Spec, *j.Signaled, true)
}
}
}
})
return g.Wait()
Expand Down
65 changes: 39 additions & 26 deletions internal/agent/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func (j *Job) LogValue() slog.Value {

func (j *Job) Organizations() []string { return nil }

func (j *Job) IsSiteAdmin() bool { return true }
func (j *Job) IsOwner(string) bool { return true }
func (j *Job) IsSiteAdmin() bool { return false }
func (j *Job) IsOwner(string) bool { return false }

func (j *Job) CanAccessSite(action rbac.Action) bool {
return false
Expand Down Expand Up @@ -176,10 +176,9 @@ func (j *Job) CanAccessTeam(rbac.Action, string) bool {
}

func (j *Job) allocate(agentID string) error {
if j.Status != JobUnallocated {
return errors.New("job can only be allocated when it is in the unallocated state")
if err := j.updateStatus(JobAllocated); err != nil {
return err
}
j.Status = JobAllocated
j.AgentID = &agentID
return nil
}
Expand All @@ -195,10 +194,8 @@ func (j *Job) reallocate(agentID string) error {
// cancel job based on current state of its parent run - depending on its state,
// the job is signaled and/or its state is updated too.
func (j *Job) cancel(run *otfrun.Run) (*bool, error) {
var (
// whether job be signaled
signal *bool
)
// whether job be signaled
var signal *bool
switch run.Status {
case otfrun.RunPlanning, otfrun.RunApplying:
if run.CancelSignaledAt != nil {
Expand All @@ -207,17 +204,16 @@ func (j *Job) cancel(run *otfrun.Run) (*bool, error) {
signal = internal.Bool(false)
}
case otfrun.RunCanceled:
// run has been canceled so immediately cancel job too unless already
// canceled
if j.Status != JobCanceled {
j.Status = JobCanceled
// run has been canceled so immediately cancel job too
if err := j.updateStatus(JobCanceled); err != nil {
return nil, err
}
case otfrun.RunForceCanceled:
// run has been forceably canceled, so both signal job to forcefully
// cancel current operation, and immediately cancel job.
signal = internal.Bool(true)
if j.Status != JobCanceled {
j.Status = JobCanceled
if err := j.updateStatus(JobCanceled); err != nil {
return nil, err
}
}
if signal != nil {
Expand All @@ -230,19 +226,36 @@ func (j *Job) cancel(run *otfrun.Run) (*bool, error) {
return nil, nil
}

func (j *Job) startJob() error {
return j.updateStatus(JobRunning)
}

func (j *Job) finishJob(to JobStatus) error {
return j.updateStatus(to)
}

func (j *Job) updateStatus(to JobStatus) error {
switch to {
case JobRunning:
if j.Status != JobAllocated {
return ErrInvalidJobStateTransition
var isValid bool
switch j.Status {
case JobUnallocated:
switch to {
case JobAllocated, JobCanceled:
isValid = true
}
case JobFinished, JobErrored, JobCanceled:
if j.Status != JobRunning {
return ErrInvalidJobStateTransition
case JobAllocated:
switch to {
case JobRunning, JobCanceled:
isValid = true
}
case JobRunning:
switch to {
case JobFinished, JobCanceled, JobErrored:
isValid = true
}
default:
return ErrInvalidJobStateTransition
}
j.Status = to
return nil
if isValid {
j.Status = to
return nil
}
return ErrInvalidJobStateTransition
}
25 changes: 25 additions & 0 deletions internal/agent/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,28 @@ func Test_jobSpecFromString(t *testing.T) {
})
}
}

func TestJob_updateStatus(t *testing.T) {
tests := []struct {
name string
from JobStatus
to JobStatus
want error
}{
{"allocate job", JobUnallocated, JobAllocated, nil},
{"start job", JobAllocated, JobRunning, nil},
{"finish job", JobRunning, JobFinished, nil},
{"finish with error", JobRunning, JobErrored, nil},
{"cancel unstarted job", JobAllocated, JobCanceled, nil},
{"cancel running job", JobRunning, JobCanceled, nil},
{"cannot allocate canceled job", JobCanceled, JobAllocated, ErrInvalidJobStateTransition},
{"cannot allocate finished job", JobCanceled, JobFinished, ErrInvalidJobStateTransition},
{"cannot allocate errored job", JobCanceled, JobErrored, ErrInvalidJobStateTransition},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
j := &Job{Status: tt.from}
assert.Equal(t, tt.want, j.updateStatus(tt.to))
})
}
}
10 changes: 5 additions & 5 deletions internal/agent/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,8 @@ func (s *service) createJob(ctx context.Context, run *otfrun.Run) error {
}

// cancelJob is called when a user cancels a run - cancelJob determines whether
// the corresponding job is signaled and what type of signal, and/or whether the job
// should be canceled.
// the corresponding job is signaled and what type of signal, and/or whether the
// job should be canceled.
func (s *service) cancelJob(ctx context.Context, run *otfrun.Run) error {
var (
spec = JobSpec{RunID: run.ID, Phase: run.Phase()}
Expand All @@ -525,7 +525,7 @@ func (s *service) cancelJob(ctx context.Context, run *otfrun.Run) error {
})
if err != nil {
if errors.Is(err, internal.ErrResourceNotFound) {
// ignore when there is no job corresponding to a run phase yet.
// ignore when no job has yet been created for the run.
return nil
}
s.Error(err, "canceling job", "spec", spec)
Expand Down Expand Up @@ -640,7 +640,7 @@ func (s *service) startJob(ctx context.Context, spec JobSpec) ([]byte, error) {
if job.AgentID == nil || *job.AgentID != subject.String() {
return internal.ErrAccessNotPermitted
}
if err := job.updateStatus(JobRunning); err != nil {
if err := job.startJob(); err != nil {
return err
}
// start corresponding run phase too
Expand Down Expand Up @@ -692,7 +692,7 @@ func (s *service) finishJob(ctx context.Context, spec JobSpec, opts finishJobOpt
if err != nil {
return err
}
return job.updateStatus(opts.Status)
return job.finishJob(opts.Status)
})
if err != nil {
s.Error(err, "finishing job", "spec", spec)
Expand Down
9 changes: 7 additions & 2 deletions internal/agent/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ func (h *webHandlers) listAllowedPools(w http.ResponseWriter, r *http.Request) {
h.Error(w, err.Error(), http.StatusUnprocessableEntity)
return
}

ws, err := h.workspaceService.GetWorkspace(r.Context(), workspaceID)
if err != nil {
h.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -318,7 +317,13 @@ func (h *webHandlers) listAllowedPools(w http.ResponseWriter, r *http.Request) {
return
}

h.Render("agent_pools_list_allowed.tmpl", w, pools)
h.Render("agent_pools_list_allowed.tmpl", w, struct {
Pools []*Pool
CurrentPoolID string
}{
Pools: pools,
CurrentPoolID: r.URL.Query().Get("agent_pool_id"),
})
}

// agent token handlers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@
</div>
<div>
{{ template "identifier" . }}
<form action="{{ deleteAgentPoolPath .ID }}" method="POST">
<button class="btn-danger" onclick="return confirm('Are you sure you want to delete?')">delete</button>
</form>
</div>
</div>
{{ end }}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{{ range . }}
<option value="{{ .ID }}">{{ .Name }}</option>
<select id="agent-pool-id" name="agent_pool_id">
{{ range .Pools }}
<option value="{{ .ID }}" {{ selected $.CurrentPoolID .ID }}>{{ .Name }}</option>
{{ end }}
</select>
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
<div class="col-start-2 hidden peer-checked:flex flex-col mt-2 bg-gray-100 p-2 gap-2">
<div class="flex items-center gap-2">
<label class="text-md" for="agent-pool-id">Agent pool</label>
<select hx-get="{{ poolsWorkspacePath .Workspace.ID }}" hx-trigger="load" hx-swap="innerHTML" id="agent-pool-id" name="agent_pool_id"></select>
<div hx-get="{{ poolsWorkspacePath .Workspace.ID }}?agent_pool_id={{ default "" .Workspace.AgentPoolID }}" hx-trigger="load" hx-swap="innerHTML"></div>
</div>
<span class="description">Select an agent pool. If no pools are listed then you either need to create a pool or you need to configure at least one pool to grant access to your workspace. Manage agent pools <a id="agent-pools-link" class="underline" href="{{ agentPoolsPath .Workspace.Organization }}">here</a>.</span>
</div>
Expand Down
32 changes: 16 additions & 16 deletions internal/workspace/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,15 +427,15 @@ func (h *webHandlers) editWorkspace(w http.ResponseWriter, r *http.Request) {

func (h *webHandlers) updateWorkspace(w http.ResponseWriter, r *http.Request) {
var params struct {
AgentPoolID *string `schema:"agent_pool_id"`
AutoApply bool `schema:"auto_apply"`
Name *string
Description *string
ExecutionMode *ExecutionMode `schema:"execution_mode"`
TerraformVersion *string `schema:"terraform_version"`
WorkingDirectory *string `schema:"working_directory"`
WorkspaceID string `schema:"workspace_id,required"`
GlobalRemoteState bool `schema:"global_remote_state"`
AgentPoolID string `schema:"agent_pool_id"`
AutoApply bool `schema:"auto_apply"`
Name string
Description string
ExecutionMode ExecutionMode `schema:"execution_mode"`
TerraformVersion string `schema:"terraform_version"`
WorkingDirectory string `schema:"working_directory"`
WorkspaceID string `schema:"workspace_id,required"`
GlobalRemoteState bool `schema:"global_remote_state"`

// VCS connection
VCSTriggerStrategy string `schema:"vcs_trigger"`
Expand All @@ -459,11 +459,11 @@ func (h *webHandlers) updateWorkspace(w http.ResponseWriter, r *http.Request) {

opts := UpdateOptions{
AutoApply: &params.AutoApply,
Name: params.Name,
Description: params.Description,
ExecutionMode: params.ExecutionMode,
TerraformVersion: params.TerraformVersion,
WorkingDirectory: params.WorkingDirectory,
Name: &params.Name,
Description: &params.Description,
ExecutionMode: &params.ExecutionMode,
TerraformVersion: &params.TerraformVersion,
WorkingDirectory: &params.WorkingDirectory,
GlobalRemoteState: &params.GlobalRemoteState,
}
if ws.Connection != nil {
Expand All @@ -490,8 +490,8 @@ func (h *webHandlers) updateWorkspace(w http.ResponseWriter, r *http.Request) {
}
}
// only set agent pool ID if execution mode is set to agent
if opts.ExecutionMode != nil && *opts.ExecutionMode == AgentExecutionMode {
opts.AgentPoolID = params.AgentPoolID
if params.ExecutionMode == AgentExecutionMode {
opts.AgentPoolID = &params.AgentPoolID
}

ws, err = h.svc.UpdateWorkspace(r.Context(), params.WorkspaceID, opts)
Expand Down