diff --git a/cli/command/service/formatter.go b/cli/command/service/formatter.go index 311355618d16..7c2290d97a52 100644 --- a/cli/command/service/formatter.go +++ b/cli/command/service/formatter.go @@ -570,6 +570,10 @@ func (c *serviceContext) Mode() string { return "global" case c.service.Spec.Mode.Replicated != nil: return "replicated" + case c.service.Spec.Mode.ReplicatedJob != nil: + return "replicated job" + case c.service.Spec.Mode.GlobalJob != nil: + return "global job" default: return "" } @@ -578,10 +582,33 @@ func (c *serviceContext) Mode() string { func (c *serviceContext) Replicas() string { s := &c.service - var running, desired uint64 + var running, desired, completed uint64 if s.ServiceStatus != nil { running = c.service.ServiceStatus.RunningTasks desired = c.service.ServiceStatus.DesiredTasks + completed = c.service.ServiceStatus.CompletedTasks + } + // for jobs, we will not include the max per node, even if it is set. jobs + // include instead the progress of the job as a whole, in addition to the + // current running state. the system respects max per node, but if we + // included it in the list output, the lines for jobs would be entirely too + // long and make the UI look bad. + if s.Spec.Mode.ReplicatedJob != nil { + return fmt.Sprintf( + "%d/%d (%d/%d completed)", + running, desired, completed, *s.Spec.Mode.ReplicatedJob.TotalCompletions, + ) + } + if s.Spec.Mode.GlobalJob != nil { + // for global jobs, we need to do a little math. desired tasks are only + // the tasks that have not yet actually reached the Completed state. + // Completed tasks have reached the completed state. the TOTAL number + // of tasks to run is the sum of the tasks desired to still complete, + // and the tasks actually completed. + return fmt.Sprintf( + "%d/%d (%d/%d completed)", + running, desired, completed, desired+completed, + ) } if r := c.maxReplicas(); r > 0 { return fmt.Sprintf("%d/%d (max %d per node)", running, desired, r) diff --git a/cli/command/service/formatter_test.go b/cli/command/service/formatter_test.go index 10b7923c71e0..f36f36d01f45 100644 --- a/cli/command/service/formatter_test.go +++ b/cli/command/service/formatter_test.go @@ -15,6 +15,13 @@ import ( ) func TestServiceContextWrite(t *testing.T) { + var ( + // we need a pair of variables for setting the job parameters, because + // those parameters take pointers to uint64, which we can't make as a + // literal + varThree uint64 = 3 + varTen uint64 = 10 + ) cases := []struct { context formatter.Context expected string @@ -38,6 +45,8 @@ func TestServiceContextWrite(t *testing.T) { 01_baz baz global 1/3 *:80->8080/tcp 04_qux2 qux2 replicated 3/3 (max 2 per node) 03_qux10 qux10 replicated 2/3 (max 1 per node) +05_job1 zarp1 replicated job 2/3 (5/10 completed) +06_job2 zarp2 global job 1/1 (3/4 completed) `, }, { @@ -46,6 +55,8 @@ func TestServiceContextWrite(t *testing.T) { 01_baz 04_qux2 03_qux10 +05_job1 +06_job2 `, }, { @@ -55,6 +66,8 @@ bar replicated baz global qux2 replicated qux10 replicated +zarp1 replicated job +zarp2 global job `, }, { @@ -64,6 +77,8 @@ bar baz qux2 qux10 +zarp1 +zarp2 `, }, // Raw Format @@ -77,6 +92,8 @@ qux10 id: 01_baz id: 04_qux2 id: 03_qux10 +id: 05_job1 +id: 06_job2 `, }, // Custom Format @@ -86,6 +103,8 @@ id: 03_qux10 baz qux2 qux10 +zarp1 +zarp2 `, }, } @@ -170,6 +189,37 @@ qux10 DesiredTasks: 3, }, }, + { + ID: "05_job1", + Spec: swarm.ServiceSpec{ + Annotations: swarm.Annotations{Name: "zarp1"}, + Mode: swarm.ServiceMode{ + ReplicatedJob: &swarm.ReplicatedJob{ + MaxConcurrent: &varThree, + TotalCompletions: &varTen, + }, + }, + }, + ServiceStatus: &swarm.ServiceStatus{ + RunningTasks: 2, + DesiredTasks: 3, + CompletedTasks: 5, + }, + }, + { + ID: "06_job2", + Spec: swarm.ServiceSpec{ + Annotations: swarm.Annotations{Name: "zarp2"}, + Mode: swarm.ServiceMode{ + GlobalJob: &swarm.GlobalJob{}, + }, + }, + ServiceStatus: &swarm.ServiceStatus{ + RunningTasks: 1, + DesiredTasks: 1, + CompletedTasks: 3, + }, + }, } out := bytes.NewBufferString("") testcase.context.Output = out diff --git a/cli/command/service/list.go b/cli/command/service/list.go index 61e414c6b72e..56cb3a23bab9 100644 --- a/cli/command/service/list.go +++ b/cli/command/service/list.go @@ -111,6 +111,8 @@ func AppendServiceStatus(ctx context.Context, c client.APIClient, services []swa status := map[string]*swarm.ServiceStatus{} taskFilter := filters.NewArgs() for i, s := range services { + // there is no need in this switch to check for job modes. jobs are not + // supported until after ServiceStatus was introduced. switch { case s.ServiceStatus != nil: // Server already returned service-status, so we don't diff --git a/cli/command/service/opts.go b/cli/command/service/opts.go index e0beeba00e91..61f0c88e99b5 100644 --- a/cli/command/service/opts.go +++ b/cli/command/service/opts.go @@ -508,8 +508,9 @@ type serviceOptions struct { resources resourceOptions stopGrace opts.DurationOpt - replicas Uint64Opt - mode string + replicas Uint64Opt + mode string + maxConcurrent Uint64Opt restartPolicy restartPolicyOptions constraints opts.ListOpts @@ -554,18 +555,45 @@ func (options *serviceOptions) ToServiceMode() (swarm.ServiceMode, error) { switch options.mode { case "global": if options.replicas.Value() != nil { - return serviceMode, errors.Errorf("replicas can only be used with replicated mode") + return serviceMode, errors.Errorf("replicas can only be used with replicated or replicated-job mode") } if options.maxReplicas > 0 { - return serviceMode, errors.New("replicas-max-per-node can only be used with replicated mode") + return serviceMode, errors.New("replicas-max-per-node can only be used with replicated or replicated-job mode") + } + if options.maxConcurrent.Value() != nil { + return serviceMode, errors.New("max-concurrent can only be used with replicated-job mode") } serviceMode.Global = &swarm.GlobalService{} case "replicated": + if options.maxConcurrent.Value() != nil { + return serviceMode, errors.New("max-concurrent can only be used with replicated-job mode") + } + serviceMode.Replicated = &swarm.ReplicatedService{ Replicas: options.replicas.Value(), } + case "replicated-job": + concurrent := options.maxConcurrent.Value() + if concurrent == nil { + concurrent = options.replicas.Value() + } + serviceMode.ReplicatedJob = &swarm.ReplicatedJob{ + MaxConcurrent: concurrent, + TotalCompletions: options.replicas.Value(), + } + case "global-job": + if options.maxReplicas > 0 { + return serviceMode, errors.New("replicas-max-per-node can only be used with replicated or replicated-job mode") + } + if options.maxConcurrent.Value() != nil { + return serviceMode, errors.New("max-concurrent can only be used with replicated-job mode") + } + if options.replicas.Value() != nil { + return serviceMode, errors.Errorf("replicas can only be used with replicated or replicated-job mode") + } + serviceMode.GlobalJob = &swarm.GlobalJob{} default: return serviceMode, errors.Errorf("Unknown mode: %s, only replicated and global supported", options.mode) } @@ -611,6 +639,16 @@ func (options *serviceOptions) ToService(ctx context.Context, apiClient client.N return service, err } + updateConfig := options.update.updateConfig(flags) + rollbackConfig := options.rollback.rollbackConfig(flags) + + // update and rollback configuration is not supported for jobs. If these + // flags are not set, then the values will be nil. If they are non-nil, + // then return an error. + if (serviceMode.ReplicatedJob != nil || serviceMode.GlobalJob != nil) && (updateConfig != nil || rollbackConfig != nil) { + return service, errors.Errorf("update and rollback configuration is not supported for jobs") + } + networks := convertNetworks(options.networks) for i, net := range networks { nwID, err := resolveNetworkID(ctx, apiClient, net.Target) @@ -671,8 +709,8 @@ func (options *serviceOptions) ToService(ctx context.Context, apiClient client.N LogDriver: options.logDriver.toLogDriver(), }, Mode: serviceMode, - UpdateConfig: options.update.updateConfig(flags), - RollbackConfig: options.rollback.rollbackConfig(flags), + UpdateConfig: updateConfig, + RollbackConfig: rollbackConfig, EndpointSpec: options.endpoint.ToEndpointSpec(), } @@ -769,6 +807,8 @@ func addServiceFlags(flags *pflag.FlagSet, opts *serviceOptions, defaultFlagValu flags.Var(&opts.stopGrace, flagStopGracePeriod, flagDesc(flagStopGracePeriod, "Time to wait before force killing a container (ns|us|ms|s|m|h)")) flags.Var(&opts.replicas, flagReplicas, "Number of tasks") + flags.Var(&opts.maxConcurrent, flagConcurrent, "Number of job tasks to run at once (default equal to --replicas)") + flags.SetAnnotation(flagConcurrent, "version", []string{"1.41"}) flags.Uint64Var(&opts.maxReplicas, flagMaxReplicas, defaultFlagValues.getUint64(flagMaxReplicas), "Maximum number of tasks per node (default 0 = unlimited)") flags.SetAnnotation(flagMaxReplicas, "version", []string{"1.40"}) @@ -878,6 +918,7 @@ const ( flagLimitCPU = "limit-cpu" flagLimitMemory = "limit-memory" flagMaxReplicas = "replicas-max-per-node" + flagConcurrent = "max-concurrent" flagMode = "mode" flagMount = "mount" flagMountRemove = "mount-rm" diff --git a/cli/command/service/opts_test.go b/cli/command/service/opts_test.go index 8c27e9f02637..058313d5cf5c 100644 --- a/cli/command/service/opts_test.go +++ b/cli/command/service/opts_test.go @@ -285,7 +285,7 @@ func TestToServiceMaxReplicasGlobalModeConflict(t *testing.T) { maxReplicas: 1, } _, err := opt.ToServiceMode() - assert.Error(t, err, "replicas-max-per-node can only be used with replicated mode") + assert.Error(t, err, "replicas-max-per-node can only be used with replicated or replicated-job mode") } func TestToServiceSysCtls(t *testing.T) { diff --git a/cli/command/service/progress/progress.go b/cli/command/service/progress/progress.go index 4b9cdd733769..bf69765ced95 100644 --- a/cli/command/service/progress/progress.go +++ b/cli/command/service/progress/progress.go @@ -7,6 +7,7 @@ import ( "io" "os" "os/signal" + "strconv" "strings" "time" @@ -45,6 +46,7 @@ var ( const ( maxProgress = 9 maxProgressBars = 20 + maxJobProgress = 10 ) type progressUpdater interface { @@ -53,7 +55,9 @@ type progressUpdater interface { func init() { for state := range numberedStates { - if !terminalState(state) && len(state) > longestState { + // for jobs, we use the "complete" state, and so it should be factored + // in to the computation of the longest state. + if (!terminalState(state) || state == swarm.TaskStateComplete) && len(state) > longestState { longestState = len(state) } } @@ -164,6 +168,18 @@ func ServiceProgress(ctx context.Context, client client.APIClient, serviceID str return err } if converged { + // if the service is a job, there's no need to verify it. jobs are + // stay done once they're done. skip the verification and just end + // the progress monitoring. + // + // only job services have a non-nil job status, which means we can + // use the presence of this field to check if the service is a job + // here. + if service.JobStatus != nil { + progress.Message(progressOut, "", "job complete") + return nil + } + if convergedAt.IsZero() { convergedAt = time.Now() } @@ -230,6 +246,14 @@ func initializeUpdater(service swarm.Service, progressOut progress.Output) (prog progressOut: progressOut, }, nil } + if service.Spec.Mode.ReplicatedJob != nil { + return newReplicatedJobProgressUpdater(service, progressOut), nil + } + if service.Spec.Mode.GlobalJob != nil { + return &globalJobProgressUpdater{ + progressOut: progressOut, + }, nil + } return nil, errors.New("unrecognized service mode") } @@ -502,3 +526,323 @@ func (u *globalProgressUpdater) writeTaskProgress(task swarm.Task, nodeCount int }) } } + +// replicatedJobProgressUpdater outputs the progress of a replicated job. This +// progress consists of a few main elements. +// +// The first is the progress bar for the job as a whole. This shows the number +// of completed out of total tasks for the job. Tasks that are currently +// running are not counted. +// +// The second is the status of the "active" tasks for the job. We count a task +// as "active" if it has any non-terminal state, not just running. This is +// shown as a fraction of the maximum concurrent tasks that can be running, +// which is the less of MaxConcurrent or TotalCompletions - completed tasks. +type replicatedJobProgressUpdater struct { + progressOut progress.Output + + // jobIteration is the service's job iteration, used to exclude tasks + // belonging to earlier iterations. + jobIteration uint64 + + // concurrent is the value of MaxConcurrent as an int. That is, the maximum + // number of tasks allowed to be run simultaneously. + concurrent int + + // total is the value of TotalCompletions, the number of complete tasks + // desired. + total int + + // initialized is set to true after the first time update is called. the + // first time update is called, the components of the progress UI are all + // written out in an initial pass. this ensure that they will subsequently + // be in order, no matter how they are updated. + initialized bool + + // progressDigits is the number digits in total, so that we know how much + // to pad the job progress field with. + // + // when we're writing the number of completed over total tasks, we need to + // pad the numerator with spaces, so that the bar doesn't jump around. + // we'll compute that once on init, and then reuse it over and over. + // + // we compute this in the least clever way possible: convert to string + // with strconv.Itoa, then take the len. + progressDigits int + + // activeDigits is the same, but for active tasks, and it applies to both + // the numerator and denominator. + activeDigits int +} + +func newReplicatedJobProgressUpdater(service swarm.Service, progressOut progress.Output) *replicatedJobProgressUpdater { + u := &replicatedJobProgressUpdater{ + progressOut: progressOut, + concurrent: int(*service.Spec.Mode.ReplicatedJob.MaxConcurrent), + total: int(*service.Spec.Mode.ReplicatedJob.TotalCompletions), + jobIteration: service.JobStatus.JobIteration.Index, + } + u.progressDigits = len(strconv.Itoa(u.total)) + u.activeDigits = len(strconv.Itoa(u.concurrent)) + + return u +} + +// update writes out the progress of the replicated job. +func (u *replicatedJobProgressUpdater) update(_ swarm.Service, tasks []swarm.Task, _ map[string]struct{}, _ bool) (bool, error) { + if !u.initialized { + u.writeOverallProgress(0, 0) + + // only write out progress bars if there will be less than the maximum + if u.total <= maxProgressBars { + for i := 1; i <= u.total; i++ { + u.progressOut.WriteProgress(progress.Progress{ + ID: fmt.Sprintf("%d/%d", i, u.total), + Action: " ", + }) + } + } + u.initialized = true + } + + // tasksBySlot is a mapping of slot number to the task valid for that slot. + // it deduplicated tasks occupying the same numerical slot but in different + // states. + tasksBySlot := make(map[int]swarm.Task) + for _, task := range tasks { + // first, check if the task belongs to this service iteration. skip + // tasks belonging to other iterations. + if task.JobIteration == nil || task.JobIteration.Index != u.jobIteration { + continue + } + + // then, if the task is in an unknown state, ignore it. + if numberedStates[task.DesiredState] == 0 || + numberedStates[task.Status.State] == 0 { + continue + } + + // finally, check if the task already exists in the map + if existing, ok := tasksBySlot[task.Slot]; ok { + // if so, use the task with the lower actual state + if numberedStates[existing.Status.State] > numberedStates[task.Status.State] { + tasksBySlot[task.Slot] = task + } + } else { + // otherwise, just add it to the map. + tasksBySlot[task.Slot] = task + } + } + + activeTasks := 0 + completeTasks := 0 + + for i := 0; i < len(tasksBySlot); i++ { + task := tasksBySlot[i] + u.writeTaskProgress(task) + + if numberedStates[task.Status.State] < numberedStates[swarm.TaskStateComplete] { + activeTasks++ + } + + if task.Status.State == swarm.TaskStateComplete { + completeTasks++ + } + } + + u.writeOverallProgress(activeTasks, completeTasks) + + return completeTasks == u.total, nil +} + +func (u *replicatedJobProgressUpdater) writeOverallProgress(active, completed int) { + u.progressOut.WriteProgress(progress.Progress{ + ID: "job progress", + Action: fmt.Sprintf( + // * means "use the next positional arg to compute padding" + "%*d out of %d complete", u.progressDigits, completed, u.total, + ), + Current: int64(completed), + Total: int64(u.total), + HideCounts: true, + }) + + // actualDesired is the lesser of MaxConcurrent, or the remaining tasks + actualDesired := u.total - completed + if actualDesired > u.concurrent { + actualDesired = u.concurrent + } + + u.progressOut.WriteProgress(progress.Progress{ + ID: "active tasks", + Action: fmt.Sprintf( + // [n] notation lets us select a specific argument, 1-indexed + // putting the [1] before the star means "make the string this + // length". putting the [2] or the [3] means "use this argument + // here" + // + // we pad both the numerator and the denominator because, as the + // job reaches its conclusion, the number of possible concurrent + // tasks will go down, as fewer than MaxConcurrent tasks are needed + // to complete the job. + "%[1]*[2]d out of %[1]*[3]d tasks", u.activeDigits, active, actualDesired, + ), + }) +} + +func (u *replicatedJobProgressUpdater) writeTaskProgress(task swarm.Task) { + if u.total > maxProgressBars { + return + } + + if task.Status.Err != "" { + u.progressOut.WriteProgress(progress.Progress{ + ID: fmt.Sprintf("%d/%d", task.Slot+1, u.total), + Action: truncError(task.Status.Err), + }) + return + } + + u.progressOut.WriteProgress(progress.Progress{ + ID: fmt.Sprintf("%d/%d", task.Slot+1, u.total), + Action: fmt.Sprintf("%-*s", longestState, task.Status.State), + Current: numberedStates[task.Status.State], + Total: maxJobProgress, + HideCounts: true, + }) +} + +// globalJobProgressUpdater is the progressUpdater for GlobalJob-mode services. +// Because GlobalJob services are so much simpler than ReplicatedJob services, +// this updater is in turn simpler as well. +type globalJobProgressUpdater struct { + progressOut progress.Output + + // initialized is used to detect the first pass of update, and to perform + // first time initialization logic at that time. + initialized bool + + // total is the total number of tasks expected for this job + total int + + // progressDigits is the number of spaces to pad the numerator of the job + // progress field + progressDigits int + + taskNodes map[string]struct{} + nodeOrder []string +} + +func (u *globalJobProgressUpdater) update(service swarm.Service, tasks []swarm.Task, activeNodes map[string]struct{}, _ bool) (bool, error) { + if !u.initialized { + // if there are not yet tasks, then return early. + if len(tasks) == 0 && len(activeNodes) != 0 { + u.progressOut.WriteProgress(progress.Progress{ + ID: "job progress", + Action: "waiting for tasks", + }) + return false, nil + } + + // when a global job starts, all of its tasks are created at once, so + // we can use len(tasks) to know how many we're expecting. + u.taskNodes = map[string]struct{}{} + + for _, task := range tasks { + // skip any tasks not belonging to this job iteration. + if task.JobIteration == nil || task.JobIteration.Index != service.JobStatus.JobIteration.Index { + continue + } + + // collect the list of all node IDs for this service. + // + // basically, global jobs will execute on any new nodes that join + // the cluster in the future. to avoid making things complicated, + // we will only check the progress of the initial set of nodes. if + // any new nodes come online during the operation, we will ignore + // them. + u.taskNodes[task.NodeID] = struct{}{} + } + + u.total = len(u.taskNodes) + u.progressDigits = len(strconv.Itoa(u.total)) + + u.writeOverallProgress(0) + u.initialized = true + } + + // tasksByNodeID maps a NodeID to the latest task for that Node ID. this + // lets us pick only the latest task for any given node. + tasksByNodeID := map[string]swarm.Task{} + + for _, task := range tasks { + // skip any tasks not belonging to this job iteration + if task.JobIteration == nil || task.JobIteration.Index != service.JobStatus.JobIteration.Index { + continue + } + + // if the task is not on one of the initial set of nodes, ignore it. + if _, ok := u.taskNodes[task.NodeID]; !ok { + continue + } + + // if there is already a task recorded for this node, choose the one + // with the lower state + if oldtask, ok := tasksByNodeID[task.NodeID]; ok { + if numberedStates[oldtask.Status.State] > numberedStates[task.Status.State] { + tasksByNodeID[task.NodeID] = task + } + } else { + tasksByNodeID[task.NodeID] = task + } + } + + complete := 0 + for _, task := range tasksByNodeID { + u.writeTaskProgress(task) + if task.Status.State == swarm.TaskStateComplete { + complete++ + } + } + + u.writeOverallProgress(complete) + return complete == u.total, nil +} + +func (u *globalJobProgressUpdater) writeTaskProgress(task swarm.Task) { + if u.total > maxProgressBars { + return + } + + if task.Status.Err != "" { + u.progressOut.WriteProgress(progress.Progress{ + ID: task.NodeID, + Action: truncError(task.Status.Err), + }) + return + } + + u.progressOut.WriteProgress(progress.Progress{ + ID: task.NodeID, + Action: fmt.Sprintf("%-*s", longestState, task.Status.State), + Current: numberedStates[task.Status.State], + Total: maxJobProgress, + HideCounts: true, + }) +} + +func (u *globalJobProgressUpdater) writeOverallProgress(complete int) { + // all tasks for a global job are active at once, so we only write out the + // total progress. + u.progressOut.WriteProgress(progress.Progress{ + // see (*replicatedJobProgressUpdater).writeOverallProgress for an + // explanation fo the advanced fmt use in this function. + ID: "job progress", + Action: fmt.Sprintf( + "%*d out of %d complete", u.progressDigits, complete, u.total, + ), + Current: int64(complete), + Total: int64(u.total), + HideCounts: true, + }) +} diff --git a/cli/command/service/progress/progress_test.go b/cli/command/service/progress/progress_test.go index 2a386d64f5e1..cff72502263b 100644 --- a/cli/command/service/progress/progress_test.go +++ b/cli/command/service/progress/progress_test.go @@ -42,6 +42,20 @@ func (u updaterTester) testUpdater(tasks []swarm.Task, expectedConvergence bool, assert.Check(u.t, is.DeepEqual(expectedProgress, u.p.p)) } +func (u updaterTester) testUpdaterNoOrder(tasks []swarm.Task, expectedConvergence bool, expectedProgress []progress.Progress) { + u.p.clear() + converged, err := u.updater.update(u.service, tasks, u.activeNodes, u.rollback) + assert.Check(u.t, err) + assert.Check(u.t, is.Equal(expectedConvergence, converged)) + + // instead of checking that expected and actual match exactly, verify that + // they are the same length, and every time from actual is in expected. + assert.Check(u.t, is.Equal(len(expectedProgress), len(u.p.p))) + for _, prog := range expectedProgress { + assert.Check(u.t, is.Contains(u.p.p, prog)) + } +} + func TestReplicatedProgressUpdaterOneReplica(t *testing.T) { replicas := uint64(1) @@ -373,3 +387,511 @@ func TestGlobalProgressUpdaterManyNodes(t *testing.T) { }) } } + +func TestReplicatedJobProgressUpdaterSmall(t *testing.T) { + concurrent := uint64(2) + total := uint64(5) + + service := swarm.Service{ + Spec: swarm.ServiceSpec{ + Mode: swarm.ServiceMode{ + ReplicatedJob: &swarm.ReplicatedJob{ + MaxConcurrent: &concurrent, + TotalCompletions: &total, + }, + }, + }, + JobStatus: &swarm.JobStatus{ + JobIteration: swarm.Version{Index: 1}, + }, + } + + p := &mockProgress{} + ut := updaterTester{ + t: t, + updater: newReplicatedJobProgressUpdater(service, p), + p: p, + activeNodes: map[string]struct{}{"a": {}, "b": {}}, + service: service, + } + + // create some tasks belonging to a previous iteration + tasks := []swarm.Task{ + { + ID: "oldtask1", + Slot: 0, + NodeID: "", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + JobIteration: &swarm.Version{Index: 0}, + }, { + ID: "oldtask2", + Slot: 1, + NodeID: "", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateComplete}, + JobIteration: &swarm.Version{Index: 0}, + }, + } + + ut.testUpdater(tasks, false, []progress.Progress{ + // on the initial pass, we draw all of the progress bars at once, which + // puts them in order for the rest of the operation + {ID: "job progress", Action: "0 out of 5 complete", Current: 0, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "0 out of 2 tasks"}, + {ID: "1/5", Action: " "}, + {ID: "2/5", Action: " "}, + {ID: "3/5", Action: " "}, + {ID: "4/5", Action: " "}, + {ID: "5/5", Action: " "}, + // from here on, we draw as normal. as a side effect, we will have a + // second update for the job progress and active tasks. This has no + // practical effect on the UI, it's just a side effect of the update + // logic. + {ID: "job progress", Action: "0 out of 5 complete", Current: 0, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "0 out of 2 tasks"}, + }) + + // wipe the old tasks out of the list + tasks = []swarm.Task{} + tasks = append(tasks, + swarm.Task{ + ID: "task1", + Slot: 0, + NodeID: "", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index}, + }, + swarm.Task{ + ID: "task2", + Slot: 1, + NodeID: "", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index}, + }, + ) + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "1/5", Action: "new ", Current: 1, Total: 10, HideCounts: true}, + {ID: "2/5", Action: "new ", Current: 1, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "0 out of 5 complete", Current: 0, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "2 out of 2 tasks"}, + }) + + tasks[0].Status.State = swarm.TaskStatePreparing + tasks[1].Status.State = swarm.TaskStateAssigned + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "1/5", Action: "preparing", Current: 6, Total: 10, HideCounts: true}, + {ID: "2/5", Action: "assigned ", Current: 4, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "0 out of 5 complete", Current: 0, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "2 out of 2 tasks"}, + }) + + tasks[0].Status.State = swarm.TaskStateRunning + tasks[1].Status.State = swarm.TaskStatePreparing + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "1/5", Action: "running ", Current: 9, Total: 10, HideCounts: true}, + {ID: "2/5", Action: "preparing", Current: 6, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "0 out of 5 complete", Current: 0, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "2 out of 2 tasks"}, + }) + + tasks[0].Status.State = swarm.TaskStateComplete + tasks[1].Status.State = swarm.TaskStateComplete + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "2 out of 5 complete", Current: 2, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "0 out of 2 tasks"}, + }) + + tasks = append(tasks, + swarm.Task{ + ID: "task3", + Slot: 2, + NodeID: "", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index}, + }, + swarm.Task{ + ID: "task4", + Slot: 3, + NodeID: "", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index}, + }, + ) + + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "3/5", Action: "new ", Current: 1, Total: 10, HideCounts: true}, + {ID: "4/5", Action: "new ", Current: 1, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "2 out of 5 complete", Current: 2, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "2 out of 2 tasks"}, + }) + + tasks[2].Status.State = swarm.TaskStateRunning + tasks[3].Status.State = swarm.TaskStateRunning + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "3/5", Action: "running ", Current: 9, Total: 10, HideCounts: true}, + {ID: "4/5", Action: "running ", Current: 9, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "2 out of 5 complete", Current: 2, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "2 out of 2 tasks"}, + }) + + tasks[3].Status.State = swarm.TaskStateComplete + tasks = append(tasks, + swarm.Task{ + ID: "task5", + Slot: 4, + NodeID: "", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateRunning}, + JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index}, + }, + ) + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "3/5", Action: "running ", Current: 9, Total: 10, HideCounts: true}, + {ID: "4/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "5/5", Action: "running ", Current: 9, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "3 out of 5 complete", Current: 3, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "2 out of 2 tasks"}, + }) + + tasks[2].Status.State = swarm.TaskStateFailed + tasks[2].Status.Err = "the task failed" + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "3/5", Action: "the task failed"}, + {ID: "4/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "5/5", Action: "running ", Current: 9, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "3 out of 5 complete", Current: 3, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "1 out of 2 tasks"}, + }) + + tasks[4].Status.State = swarm.TaskStateComplete + tasks = append(tasks, + swarm.Task{ + ID: "task6", + Slot: 2, + NodeID: "", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateRunning}, + JobIteration: &swarm.Version{Index: service.JobStatus.JobIteration.Index}, + }, + ) + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "3/5", Action: "running ", Current: 9, Total: 10, HideCounts: true}, + {ID: "4/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "5/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "4 out of 5 complete", Current: 4, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "1 out of 1 tasks"}, + }) + + tasks[5].Status.State = swarm.TaskStateComplete + ut.testUpdater(tasks, true, []progress.Progress{ + {ID: "1/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "2/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "3/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "4/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "5/5", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "5 out of 5 complete", Current: 5, Total: 5, HideCounts: true}, + {ID: "active tasks", Action: "0 out of 0 tasks"}, + }) +} + +func TestReplicatedJobProgressUpdaterLarge(t *testing.T) { + concurrent := uint64(10) + total := uint64(50) + + service := swarm.Service{ + Spec: swarm.ServiceSpec{ + Mode: swarm.ServiceMode{ + ReplicatedJob: &swarm.ReplicatedJob{ + MaxConcurrent: &concurrent, + TotalCompletions: &total, + }, + }, + }, + JobStatus: &swarm.JobStatus{ + JobIteration: swarm.Version{Index: 0}, + }, + } + + p := &mockProgress{} + ut := updaterTester{ + t: t, + updater: newReplicatedJobProgressUpdater(service, p), + p: p, + activeNodes: map[string]struct{}{"a": {}, "b": {}}, + service: service, + } + + tasks := []swarm.Task{} + + // see the comments in TestReplicatedJobProgressUpdaterSmall for why + // we write this out twice. + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "job progress", Action: " 0 out of 50 complete", Current: 0, Total: 50, HideCounts: true}, + {ID: "active tasks", Action: " 0 out of 10 tasks"}, + // we don't write out individual status bars for a large job, only the + // overall progress bar + {ID: "job progress", Action: " 0 out of 50 complete", Current: 0, Total: 50, HideCounts: true}, + {ID: "active tasks", Action: " 0 out of 10 tasks"}, + }) + + // first, create the initial batch of running tasks + for i := 0; i < int(concurrent); i++ { + tasks = append(tasks, swarm.Task{ + ID: strconv.Itoa(i), + Slot: i, + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + JobIteration: &swarm.Version{Index: 0}, + }) + + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "job progress", Action: " 0 out of 50 complete", Current: 0, Total: 50, HideCounts: true}, + {ID: "active tasks", Action: fmt.Sprintf("%2d out of 10 tasks", i+1)}, + }) + } + + // now, start moving tasks to completed, and starting new tasks after them. + // to do this, we'll start at 0, mark a task complete, and then append a + // new one. we'll stop before we get to the end, because the end has a + // steadily decreasing denominator for the active tasks + // + // for 10 concurrent 50 total, this means we'll stop at 50 - 10 = 40 tasks + // in the completed state, 10 tasks running. the last index in use will be + // 39. + for i := 0; i < int(total)-int(concurrent); i++ { + tasks[i].Status.State = swarm.TaskStateComplete + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "job progress", Action: fmt.Sprintf("%2d out of 50 complete", i+1), Current: int64(i + 1), Total: 50, HideCounts: true}, + {ID: "active tasks", Action: " 9 out of 10 tasks"}, + }) + + last := len(tasks) + tasks = append(tasks, swarm.Task{ + ID: strconv.Itoa(last), + Slot: last, + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + JobIteration: &swarm.Version{Index: 0}, + }) + + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "job progress", Action: fmt.Sprintf("%2d out of 50 complete", i+1), Current: int64(i + 1), Total: 50, HideCounts: true}, + {ID: "active tasks", Action: "10 out of 10 tasks"}, + }) + } + + // quick check, to make sure we did the math right when we wrote this code: + // we do have 50 tasks in the slice, right? + assert.Check(t, is.Equal(len(tasks), int(total))) + + // now, we're down to our last 10 tasks, which are all running. We need to + // wind these down + for i := int(total) - int(concurrent) - 1; i < int(total); i++ { + tasks[i].Status.State = swarm.TaskStateComplete + ut.testUpdater(tasks, (i+1 == int(total)), []progress.Progress{ + {ID: "job progress", Action: fmt.Sprintf("%2d out of 50 complete", i+1), Current: int64(i + 1), Total: 50, HideCounts: true}, + {ID: "active tasks", Action: fmt.Sprintf("%2[1]d out of %2[1]d tasks", int(total)-(i+1))}, + }) + } +} + +func TestGlobalJobProgressUpdaterSmall(t *testing.T) { + service := swarm.Service{ + Spec: swarm.ServiceSpec{ + Mode: swarm.ServiceMode{ + GlobalJob: &swarm.GlobalJob{}, + }, + }, + JobStatus: &swarm.JobStatus{ + JobIteration: swarm.Version{Index: 1}, + }, + } + + p := &mockProgress{} + ut := updaterTester{ + t: t, + updater: &globalJobProgressUpdater{ + progressOut: p, + }, + p: p, + activeNodes: map[string]struct{}{"a": {}, "b": {}, "c": {}}, + service: service, + } + + tasks := []swarm.Task{ + { + ID: "oldtask1", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateComplete}, + JobIteration: &swarm.Version{Index: 0}, + NodeID: "a", + }, { + ID: "oldtask2", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateComplete}, + JobIteration: &swarm.Version{Index: 0}, + NodeID: "b", + }, { + ID: "task1", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + JobIteration: &swarm.Version{Index: 1}, + NodeID: "a", + }, { + ID: "task2", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + JobIteration: &swarm.Version{Index: 1}, + NodeID: "b", + }, { + ID: "task3", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStateNew}, + JobIteration: &swarm.Version{Index: 1}, + NodeID: "c", + }, + } + + // we don't know how many tasks will be created until we get the initial + // task list, so we should not write out any definitive answers yet. + ut.testUpdater([]swarm.Task{}, false, []progress.Progress{ + {ID: "job progress", Action: "waiting for tasks"}, + }) + + ut.testUpdaterNoOrder(tasks, false, []progress.Progress{ + {ID: "job progress", Action: "0 out of 3 complete", Current: 0, Total: 3, HideCounts: true}, + {ID: "a", Action: "new ", Current: 1, Total: 10, HideCounts: true}, + {ID: "b", Action: "new ", Current: 1, Total: 10, HideCounts: true}, + {ID: "c", Action: "new ", Current: 1, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "0 out of 3 complete", Current: 0, Total: 3, HideCounts: true}, + }) + + tasks[2].Status.State = swarm.TaskStatePreparing + tasks[3].Status.State = swarm.TaskStateRunning + tasks[4].Status.State = swarm.TaskStateAccepted + ut.testUpdaterNoOrder(tasks, false, []progress.Progress{ + {ID: "a", Action: "preparing", Current: 6, Total: 10, HideCounts: true}, + {ID: "b", Action: "running ", Current: 9, Total: 10, HideCounts: true}, + {ID: "c", Action: "accepted ", Current: 5, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "0 out of 3 complete", Current: 0, Total: 3, HideCounts: true}, + }) + + tasks[2].Status.State = swarm.TaskStateRunning + tasks[3].Status.State = swarm.TaskStateComplete + tasks[4].Status.State = swarm.TaskStateRunning + ut.testUpdaterNoOrder(tasks, false, []progress.Progress{ + {ID: "a", Action: "running ", Current: 9, Total: 10, HideCounts: true}, + {ID: "b", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "c", Action: "running ", Current: 9, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "1 out of 3 complete", Current: 1, Total: 3, HideCounts: true}, + }) + + tasks[2].Status.State = swarm.TaskStateFailed + tasks[2].Status.Err = "task failed" + tasks[4].Status.State = swarm.TaskStateComplete + ut.testUpdaterNoOrder(tasks, false, []progress.Progress{ + {ID: "a", Action: "task failed"}, + {ID: "b", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "c", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "2 out of 3 complete", Current: 2, Total: 3, HideCounts: true}, + }) + + tasks = append(tasks, swarm.Task{ + ID: "task4", + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{State: swarm.TaskStatePreparing}, + NodeID: tasks[2].NodeID, + JobIteration: &swarm.Version{Index: 1}, + }) + + ut.testUpdaterNoOrder(tasks, false, []progress.Progress{ + {ID: "a", Action: "preparing", Current: 6, Total: 10, HideCounts: true}, + {ID: "b", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "c", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "2 out of 3 complete", Current: 2, Total: 3, HideCounts: true}, + }) + + tasks[5].Status.State = swarm.TaskStateComplete + ut.testUpdaterNoOrder(tasks, true, []progress.Progress{ + {ID: "a", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "b", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "c", Action: "complete ", Current: 10, Total: 10, HideCounts: true}, + {ID: "job progress", Action: "3 out of 3 complete", Current: 3, Total: 3, HideCounts: true}, + }) +} + +func TestGlobalJobProgressUpdaterLarge(t *testing.T) { + service := swarm.Service{ + Spec: swarm.ServiceSpec{ + Mode: swarm.ServiceMode{ + GlobalJob: &swarm.GlobalJob{}, + }, + }, + JobStatus: &swarm.JobStatus{ + JobIteration: swarm.Version{Index: 1}, + }, + } + + activeNodes := map[string]struct{}{} + for i := 0; i < 50; i++ { + activeNodes[fmt.Sprintf("node%v", i)] = struct{}{} + } + + p := &mockProgress{} + ut := updaterTester{ + t: t, + updater: &globalJobProgressUpdater{ + progressOut: p, + }, + p: p, + activeNodes: activeNodes, + service: service, + } + + tasks := []swarm.Task{} + for nodeID := range activeNodes { + tasks = append(tasks, swarm.Task{ + ID: fmt.Sprintf("task%s", nodeID), + NodeID: nodeID, + DesiredState: swarm.TaskStateComplete, + Status: swarm.TaskStatus{ + State: swarm.TaskStateNew, + }, + JobIteration: &swarm.Version{Index: 1}, + }) + } + + // no bars, because too many tasks + ut.testUpdater(tasks, false, []progress.Progress{ + {ID: "job progress", Action: " 0 out of 50 complete", Current: 0, Total: 50, HideCounts: true}, + {ID: "job progress", Action: " 0 out of 50 complete", Current: 0, Total: 50, HideCounts: true}, + }) + + for i := range tasks { + tasks[i].Status.State = swarm.TaskStateComplete + ut.testUpdater(tasks, i+1 == len(activeNodes), []progress.Progress{ + { + ID: "job progress", + Action: fmt.Sprintf("%2d out of 50 complete", i+1), + Current: int64(i + 1), Total: 50, HideCounts: true, + }, + }) + } +} diff --git a/cli/command/service/testdata/service-context-write-raw.golden b/cli/command/service/testdata/service-context-write-raw.golden index feb100c9d7a8..fa78876c6ab7 100644 --- a/cli/command/service/testdata/service-context-write-raw.golden +++ b/cli/command/service/testdata/service-context-write-raw.golden @@ -26,3 +26,17 @@ replicas: 2/3 (max 1 per node) image: ports: +id: 05_job1 +name: zarp1 +mode: replicated job +replicas: 2/3 (5/10 completed) +image: +ports: + +id: 06_job2 +name: zarp2 +mode: global job +replicas: 1/1 (3/4 completed) +image: +ports: + diff --git a/docs/reference/commandline/service_create.md b/docs/reference/commandline/service_create.md index 7443979903ba..c91967b14c1e 100644 --- a/docs/reference/commandline/service_create.md +++ b/docs/reference/commandline/service_create.md @@ -50,7 +50,8 @@ Options: --limit-memory bytes Limit Memory --log-driver string Logging driver for service --log-opt list Logging driver options - --mode string Service mode (replicated or global) (default "replicated") + --max-concurrent Number of job tasks to run at once (default equal to --replicas) + --mode string Service mode (replicated, global, replicated-job, or global-job) (default "replicated") --mount mount Attach a filesystem mount to the service --name string Service name --network network Network attachments @@ -1011,6 +1012,59 @@ $ docker service create --name cuda \ nvidia/cuda ``` +### Running as a job + +Jobs are a special kind of service designed to run an operation to completion +and then stop, as opposed to running long-running daemons. When a Task +belonging to a job exits successfully (return value 0), the Task is marked as +"Completed", and is not run again. + +Jobs are started by using one of two modes, `replicated-job` or `global-job` + +```bash +$ docker service create --name myjob \ + --mode replicated-job \ + bash "true" +``` + +This command will run one Task, which will, using the `bash` image, execute the +command `true`, which will return 0 and then exit. + +Importantly, none of the update or rollback configuration options are valid. +Jobs can be updated, but cannot be rolled out or rolled back. + +Jobs are available in both replicated and global modes. + +#### Replicated Jobs + +A replicated job is like a replicated service. Setting the `--replicas` flag +will specify total number of iterations of a job to execute. + +By default, all replicas of a replicated job will launch at once. To control +the total number of replicas that are executing simultaneously at any one time, +the `--max-concurrent` flag can be used: + +```bash +$ docker service create --name mythrottledjob \ + --mode replicated-job \ + --replicas 10 \ + --max-concurrent 2 \ + bash "true" +``` + +The above command will execute 10 Tasks in total, but only 2 of them will be +run at any given time. + +#### Global Jobs + +Global jobs are like global services, in that a Task is executed once on each node +matching placement constraints. Global jobs are represented by the mode `global-job`. + +Note that after a Global job is created, any new Nodes added to the cluster +will have a Task from that job started on them. The Global Job does not as a +whole have a "done" state, except insofar as every Node meeting the job's +constraints has a Completed task. + ## Related commands * [service inspect](service_inspect.md) diff --git a/docs/reference/commandline/service_ls.md b/docs/reference/commandline/service_ls.md index 407e8ade2742..2836f0ae2c92 100644 --- a/docs/reference/commandline/service_ls.md +++ b/docs/reference/commandline/service_ls.md @@ -45,14 +45,17 @@ On a manager node: ```bash $ docker service ls -ID NAME MODE REPLICAS IMAGE -c8wgl7q4ndfd frontend replicated 5/5 nginx:alpine -dmu1ept4cxcf redis replicated 3/3 redis:3.0.6 -iwe3278osahj mongo global 7/7 mongo:3.3 +ID NAME MODE REPLICAS IMAGE +c8wgl7q4ndfd frontend replicated 5/5 nginx:alpine +dmu1ept4cxcf redis replicated 3/3 redis:3.0.6 +iwe3278osahj mongo global 7/7 mongo:3.3 +hh08h9uu8uwr job replicated-job 1/1 (3/5 completed) nginx:latest ``` The `REPLICAS` column shows both the *actual* and *desired* number of tasks for -the service. +the service. If the service is in `replicated-job` or `global` job, it will +additionally show the completion status of the job as completed tasks over +total tasks the job will execute. ### Filtering diff --git a/docs/reference/commandline/service_update.md b/docs/reference/commandline/service_update.md index f9b51edf1b05..1429f5040e81 100644 --- a/docs/reference/commandline/service_update.md +++ b/docs/reference/commandline/service_update.md @@ -63,6 +63,7 @@ Options: --limit-memory bytes Limit Memory --log-driver string Logging driver for service --log-opt list Logging driver options + --max-concurrent Number of job tasks to run at once (default equal to --replicas) --mount-add mount Add or update a mount on a service --mount-rm list Remove a mount by its target path --network-add network Add a network @@ -305,6 +306,22 @@ See [`service create`](./service_create.md#templating) for the reference. `service update` supports the same `--isolation` flag as `service create` See [`service create`](./service_create.md) for the reference. +### Updating Jobs + +When a service is created as a job, by setting its mode to `replicated-job` or +to `global-job` when doing `service create`, options for updating it are +limited. + +Updating a job creates a new set of Tasks for the job, and effectively resets +its completion status. If any Tasks were running before the update, they are +stopped, and new Tasks are created. + +Jobs cannot be rolled out or rolled back. None of the flags for configuring +update or rollback settings are valid with job modes. + +To run a job again with the same parameters that it was run previously, it can +be force updated with the `--force` flag. + ## Related commands * [service create](service_create.md) diff --git a/vendor.conf b/vendor.conf index 2939b2d4df70..1372e6b1e710 100755 --- a/vendor.conf +++ b/vendor.conf @@ -12,7 +12,7 @@ github.com/creack/pty 3a6a957789163cacdfe0e291617a github.com/davecgh/go-spew 8991bc29aa16c548c550c7ff78260e27b9ab7c73 # v1.1.1 github.com/docker/compose-on-kubernetes 78e6a00beda64ac8ccb9fec787e601fe2ce0d5bb # v0.5.0-alpha1 github.com/docker/distribution 0d3efadf0154c2b8a4e7b6621fff9809655cc580 -github.com/docker/docker a9507c6f76627fdc092edc542d5a7ef4a6df5eec +github.com/docker/docker 30d9fe30b1c1bf52f15a41e0b106a1542a167e04 git@github.com:dperny/docker.git # TODO(dperny): temporary commit ID, change to final merge commit github.com/docker/docker-credential-helpers 54f0238b6bf101fc3ad3b34114cb5520beb562f5 # v0.6.3 github.com/docker/go d30aec9fd63c35133f8f79c3412ad91a3b08be06 # Contains a customized version of canonical/json and is used by Notary. The package is periodically rebased on current Go versions. github.com/docker/go-connections 7395e3f8aa162843a74ed6d48e79627d9792ac55 # v0.4.0 diff --git a/vendor/github.com/docker/docker/api/types/client.go b/vendor/github.com/docker/docker/api/types/client.go index 54cb236efec7..6616cbcd59f7 100644 --- a/vendor/github.com/docker/docker/api/types/client.go +++ b/vendor/github.com/docker/docker/api/types/client.go @@ -205,7 +205,7 @@ const ( // BuilderV1 is the first generation builder in docker daemon BuilderV1 BuilderVersion = "1" // BuilderBuildKit is builder based on moby/buildkit project - BuilderBuildKit = "2" + BuilderBuildKit BuilderVersion = "2" ) // ImageBuildResponse holds information diff --git a/vendor/github.com/docker/docker/api/types/swarm/service.go b/vendor/github.com/docker/docker/api/types/swarm/service.go index 6b59711ab25a..6eb452d24d12 100644 --- a/vendor/github.com/docker/docker/api/types/swarm/service.go +++ b/vendor/github.com/docker/docker/api/types/swarm/service.go @@ -17,6 +17,10 @@ type Service struct { // listing all tasks for a service, an operation that could be // computation and network expensive. ServiceStatus *ServiceStatus `json:",omitempty"` + + // JobStatus is the status of a Service which is in one of ReplicatedJob or + // GlobalJob modes. It is absent on Replicated and Global services. + JobStatus *JobStatus `json:",omitempty"` } // ServiceSpec represents the spec of a service. @@ -39,8 +43,10 @@ type ServiceSpec struct { // ServiceMode represents the mode of a service. type ServiceMode struct { - Replicated *ReplicatedService `json:",omitempty"` - Global *GlobalService `json:",omitempty"` + Replicated *ReplicatedService `json:",omitempty"` + Global *GlobalService `json:",omitempty"` + ReplicatedJob *ReplicatedJob `json:",omitempty"` + GlobalJob *GlobalJob `json:",omitempty"` } // UpdateState is the state of a service update. @@ -77,6 +83,32 @@ type ReplicatedService struct { // GlobalService is a kind of ServiceMode. type GlobalService struct{} +// ReplicatedJob is the a type of Service which executes a defined Tasks +// in parallel until the specified number of Tasks have succeeded. +type ReplicatedJob struct { + // MaxConcurrent indicates the maximum number of Tasks that should be + // executing simultaneously for this job at any given time. There may be + // fewer Tasks that MaxConcurrent executing simultaneously; for example, if + // there are fewer than MaxConcurrent tasks needed to reach + // TotalCompletions. + // + // If this field is empty, it will default to a max concurrency of 1. + MaxConcurrent *uint64 `json:",omitempty"` + + // TotalCompletions is the total number of Tasks desired to run to + // completion. + // + // If this field is empty, the value of MaxConcurrent will be used. + TotalCompletions *uint64 `json:",omitempty"` +} + +// GlobalJob is the type of a Service which executes a Task on every Node +// matching the Service's placement constraints. These tasks run to completion +// and then exit. +// +// This type is deliberately empty. +type GlobalJob struct{} + const ( // UpdateFailureActionPause PAUSE UpdateFailureActionPause = "pause" @@ -142,4 +174,29 @@ type ServiceStatus struct { // services, this is computed by taking the number of tasks with desired // state of not-Shutdown. DesiredTasks uint64 + + // CompletedTasks is the number of tasks in the state Completed, if this + // service is in ReplicatedJob or GlobalJob mode. This field must be + // cross-referenced with the service type, because the default value of 0 + // may mean that a service is not in a job mode, or it may mean that the + // job has yet to complete any tasks. + CompletedTasks uint64 +} + +// JobStatus is the status of a job-type service. +type JobStatus struct { + // JobIteration is a value increased each time a Job is executed, + // successfully or otherwise. "Executed", in this case, means the job as a + // whole has been started, not that an individual Task has been launched. A + // job is "Executed" when its ServiceSpec is updated. JobIteration can be + // used to disambiguate Tasks belonging to different executions of a job. + // + // Though JobIteration will increase with each subsequent execution, it may + // not necessarily increase by 1, and so JobIteration should not be used to + // keep track of the number of times a job has been executed. + JobIteration Version + + // LastExecution is the time that the job was last executed, as observed by + // Swarm manager. + LastExecution time.Time `json:",omitempty"` } diff --git a/vendor/github.com/docker/docker/api/types/swarm/task.go b/vendor/github.com/docker/docker/api/types/swarm/task.go index d5a57df5db5a..9f193df37cb6 100644 --- a/vendor/github.com/docker/docker/api/types/swarm/task.go +++ b/vendor/github.com/docker/docker/api/types/swarm/task.go @@ -56,6 +56,12 @@ type Task struct { DesiredState TaskState `json:",omitempty"` NetworksAttachments []NetworkAttachment `json:",omitempty"` GenericResources []GenericResource `json:",omitempty"` + + // JobIteration is the JobIteration of the Service that this Task was + // spawned from, if the Service is a ReplicatedJob or GlobalJob. This is + // used to determine which Tasks belong to which run of the job. This field + // is absent if the Service mode is Replicated or Global. + JobIteration *Version `json:",omitempty"` } // TaskSpec represents the spec of a task. diff --git a/vendor/github.com/docker/docker/vendor.conf b/vendor/github.com/docker/docker/vendor.conf index 1d4f4060d35e..0aa383e3b0d1 100644 --- a/vendor/github.com/docker/docker/vendor.conf +++ b/vendor/github.com/docker/docker/vendor.conf @@ -128,7 +128,7 @@ github.com/containerd/ttrpc 92c8520ef9f86600c650dd540266 github.com/gogo/googleapis d31c731455cb061f42baff3bda55bad0118b126b # v1.2.0 # cluster -github.com/docker/swarmkit 7dded76ec532741c1ad9736cd2bb6d6661f0a386 +github.com/docker/swarmkit ef128ab4f5d50ffe4851d937b2b296e55562d10f github.com/gogo/protobuf ba06b47c162d49f2af050fb4c75bcbc86a159d5c # v1.2.1 github.com/golang/protobuf aa810b61a9c79d51363740d207bb46cf8e620ed5 # v1.2.0 github.com/cloudflare/cfssl 5d63dbd981b5c408effbb58c442d54761ff94fbd # 1.3.2