diff --git a/api/tasks.go b/api/tasks.go index 79ddbe6852f2..dfed4e6e67bb 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -506,6 +506,7 @@ const ( TaskRestartSignal = "Restart Signaled" TaskLeaderDead = "Leader Task Dead" TaskBuildingTaskDir = "Building Task Directory" + TaskGenericMessage = "Generic" ) // TaskEvent is an event that effects the state of a task and contains meta-data @@ -533,4 +534,5 @@ type TaskEvent struct { VaultError string TaskSignalReason string TaskSignal string + GenericSource string } diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 3171b9f20198..8b12c93c933c 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -620,9 +620,10 @@ func (r *AllocRunner) setStatus(status, desc string) { } } -// setTaskState is used to set the status of a task. If state is empty then the -// event is appended but not synced with the server. The event may be omitted -func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEvent) { +// setTaskState is used to set the status of a task. If lazySync is set then the +// event is appended but not synced with the server. If state is omitted, the +// last known state is used. +func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEvent, lazySync bool) { r.taskStatusLock.Lock() defer r.taskStatusLock.Unlock() taskState, ok := r.taskStates[taskName] @@ -643,10 +644,18 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv r.appendTaskEvent(taskState, event) } - if state == "" { + if lazySync { return } + // If the state hasn't been set use the existing state. + if state == "" { + state = taskState.State + if taskState.State == "" { + state = structs.TaskStatePending + } + } + switch state { case structs.TaskStateRunning: // Capture the start time if it is just starting diff --git a/client/consul_template.go b/client/consul_template.go index 2a887c8770ff..2e1b1ac60e18 100644 --- a/client/consul_template.go +++ b/client/consul_template.go @@ -5,6 +5,7 @@ import ( "math/rand" "os" "path/filepath" + "sort" "strconv" "strings" "sync" @@ -22,9 +23,21 @@ import ( ) const ( + // consulTemplateSourceName is the source name when using the TaskHooks. + consulTemplateSourceName = "Template" + // hostSrcOption is the Client option that determines whether the template // source may be from the host hostSrcOption = "template.allow_host_source" + + // missingDepEventLimit is the number of missing dependencies that will be + // logged before we switch to showing just the number of missing + // dependencies. + missingDepEventLimit = 3 + + // DefaultMaxTemplateEventRate is the default maximum rate at which a + // template event should be fired. + DefaultMaxTemplateEventRate = 3 * time.Second ) var ( @@ -48,19 +61,19 @@ type TaskHooks interface { // Kill is used to kill the task because of the passed error. If fail is set // to true, the task is marked as failed Kill(source, reason string, fail bool) + + // EmitEvent is used to emit an event to be stored in the tasks events. + EmitEvent(source, message string) } // TaskTemplateManager is used to run a set of templates for a given task type TaskTemplateManager struct { - // templates is the set of templates we are managing - templates []*structs.Template + // config holds the template managers configuration + config *TaskTemplateManagerConfig // lookup allows looking up the set of Nomad templates by their consul-template ID lookup map[string][]*structs.Template - // hooks is used to signal/restart the task as templates are rendered - hook TaskHooks - // runner is the consul-template runner runner *manager.Runner @@ -76,29 +89,67 @@ type TaskTemplateManager struct { shutdownLock sync.Mutex } -func NewTaskTemplateManager(hook TaskHooks, tmpls []*structs.Template, - config *config.Config, vaultToken, taskDir string, - envBuilder *env.Builder) (*TaskTemplateManager, error) { +// TaskTemplateManagerConfig is used to configure an instance of the +// TaskTemplateManager +type TaskTemplateManagerConfig struct { + // Hooks is used to interact with the task the template manager is being run + // for + Hooks TaskHooks + + // Templates is the set of templates we are managing + Templates []*structs.Template + + // ClientConfig is the Nomad Client configuration + ClientConfig *config.Config + + // VaultToken is the Vault token for the task. + VaultToken string + + // TaskDir is the task's directory + TaskDir string + + // EnvBuilder is the environment variable builder for the task. + EnvBuilder *env.Builder + + // MaxTemplateEventRate is the maximum rate at which we should emit events. + MaxTemplateEventRate time.Duration + + // retryRate is only used for testing and is used to increase the retry rate + retryRate time.Duration +} + +// Validate validates the configuration. +func (c *TaskTemplateManagerConfig) Validate() error { + if c == nil { + return fmt.Errorf("Nil config passed") + } else if c.Hooks == nil { + return fmt.Errorf("Invalid task hooks given") + } else if c.ClientConfig == nil { + return fmt.Errorf("Invalid client config given") + } else if c.TaskDir == "" { + return fmt.Errorf("Invalid task directory given") + } else if c.EnvBuilder == nil { + return fmt.Errorf("Invalid task environment given") + } else if c.MaxTemplateEventRate == 0 { + return fmt.Errorf("Invalid max template event rate given") + } + + return nil +} +func NewTaskTemplateManager(config *TaskTemplateManagerConfig) (*TaskTemplateManager, error) { // Check pre-conditions - if hook == nil { - return nil, fmt.Errorf("Invalid task hook given") - } else if config == nil { - return nil, fmt.Errorf("Invalid config given") - } else if taskDir == "" { - return nil, fmt.Errorf("Invalid task directory given") - } else if envBuilder == nil { - return nil, fmt.Errorf("Invalid task environment given") + if err := config.Validate(); err != nil { + return nil, err } tm := &TaskTemplateManager{ - templates: tmpls, - hook: hook, + config: config, shutdownCh: make(chan struct{}), } // Parse the signals that we need - for _, tmpl := range tmpls { + for _, tmpl := range config.Templates { if tmpl.ChangeSignal == "" { continue } @@ -116,14 +167,14 @@ func NewTaskTemplateManager(hook TaskHooks, tmpls []*structs.Template, } // Build the consul-template runner - runner, lookup, err := templateRunner(tmpls, config, vaultToken, taskDir, envBuilder.Build()) + runner, lookup, err := templateRunner(config) if err != nil { return nil, err } tm.runner = runner tm.lookup = lookup - go tm.run(envBuilder, taskDir) + go tm.run() return tm, nil } @@ -146,22 +197,63 @@ func (tm *TaskTemplateManager) Stop() { } // run is the long lived loop that handles errors and templates being rendered -func (tm *TaskTemplateManager) run(envBuilder *env.Builder, taskDir string) { +func (tm *TaskTemplateManager) run() { // Runner is nil if there is no templates if tm.runner == nil { // Unblock the start if there is nothing to do - tm.hook.UnblockStart("consul-template") + tm.config.Hooks.UnblockStart(consulTemplateSourceName) return } // Start the runner go tm.runner.Start() - // Track when they have all been rendered so we don't signal the task for - // any render event before hand - var allRenderedTime time.Time + // Block till all the templates have been rendered + tm.handleFirstRender() + + // Detect if there was a shutdown. + select { + case <-tm.shutdownCh: + return + default: + } + + // Read environment variables from env templates before we unblock + envMap, err := loadTemplateEnv(tm.config.Templates, tm.config.TaskDir) + if err != nil { + tm.config.Hooks.Kill(consulTemplateSourceName, err.Error(), true) + return + } + tm.config.EnvBuilder.SetTemplateEnv(envMap) + + // Unblock the task + tm.config.Hooks.UnblockStart(consulTemplateSourceName) + + // If all our templates are change mode no-op, then we can exit here + if tm.allTemplatesNoop() { + return + } + + // handle all subsequent render events. + tm.handleTemplateRerenders(time.Now()) +} + +// handleFirstRender blocks till all templates have been rendered +func (tm *TaskTemplateManager) handleFirstRender() { + // missingDependencies is the set of missing dependencies. + var missingDependencies map[string]struct{} + + // eventTimer is used to trigger the firing of an event showing the missing + // dependencies. + eventTimer := time.NewTimer(tm.config.MaxTemplateEventRate) + if !eventTimer.Stop() { + <-eventTimer.C + } + + // outstandingEvent tracks whether there is an outstanding event that should + // be fired. + outstandingEvent := false - // Handle the first rendering // Wait till all the templates have been rendered WAIT: for { @@ -173,7 +265,7 @@ WAIT: continue } - tm.hook.Kill("consul-template", err.Error(), true) + tm.config.Hooks.Kill(consulTemplateSourceName, err.Error(), true) case <-tm.runner.TemplateRenderedCh(): // A template has been rendered, figure out what to do events := tm.runner.RenderEvents() @@ -191,28 +283,78 @@ WAIT: } break WAIT - } - } + case <-tm.runner.RenderEventCh(): + events := tm.runner.RenderEvents() + joinedSet := make(map[string]struct{}) + for _, event := range events { + missing := event.MissingDeps + if missing == nil { + continue + } - // Read environment variables from env templates - envMap, err := loadTemplateEnv(tm.templates, taskDir) - if err != nil { - tm.hook.Kill("consul-template", err.Error(), true) - return - } - envBuilder.SetTemplateEnv(envMap) + for _, dep := range missing.List() { + joinedSet[dep.String()] = struct{}{} + } + } - allRenderedTime = time.Now() - tm.hook.UnblockStart("consul-template") + // Check to see if the new joined set is the same as the old + different := len(joinedSet) != len(missingDependencies) + if !different { + for k := range joinedSet { + if _, ok := missingDependencies[k]; !ok { + different = true + break + } + } + } - // If all our templates are change mode no-op, then we can exit here - if tm.allTemplatesNoop() { - return + // Nothing to do + if !different { + continue + } + + // Update the missing set + missingDependencies = joinedSet + + // Update the event timer channel + if !outstandingEvent { + // We got new data so reset + outstandingEvent = true + eventTimer.Reset(tm.config.MaxTemplateEventRate) + } + case <-eventTimer.C: + if missingDependencies == nil { + continue + } + + // Clear the outstanding event + outstandingEvent = false + + // Build the missing set + missingSlice := make([]string, 0, len(missingDependencies)) + for k := range missingDependencies { + missingSlice = append(missingSlice, k) + } + sort.Strings(missingSlice) + + if l := len(missingSlice); l > missingDepEventLimit { + missingSlice[missingDepEventLimit] = fmt.Sprintf("and %d more", l-missingDepEventLimit) + missingSlice = missingSlice[:missingDepEventLimit+1] + } + + missingStr := strings.Join(missingSlice, ", ") + tm.config.Hooks.EmitEvent(consulTemplateSourceName, fmt.Sprintf("Missing: %s", missingStr)) + } } +} +// handleTemplateRerenders is used to handle template render events after they +// have all rendered. It takes action based on which set of templates re-render. +// The passed allRenderedTime is the time at which all templates have rendered. +// This is used to avoid signaling the task for any render event before hand. +func (tm *TaskTemplateManager) handleTemplateRerenders(allRenderedTime time.Time) { // A lookup for the last time the template was handled - numTemplates := len(tm.templates) - handledRenders := make(map[string]time.Time, numTemplates) + handledRenders := make(map[string]time.Time, len(tm.config.Templates)) for { select { @@ -223,7 +365,7 @@ WAIT: continue } - tm.hook.Kill("consul-template", err.Error(), true) + tm.config.Hooks.Kill(consulTemplateSourceName, err.Error(), true) case <-tm.runner.TemplateRenderedCh(): // A template has been rendered, figure out what to do var handling []string @@ -248,17 +390,17 @@ WAIT: // Lookup the template and determine what to do tmpls, ok := tm.lookup[id] if !ok { - tm.hook.Kill("consul-template", fmt.Sprintf("consul-template runner returned unknown template id %q", id), true) + tm.config.Hooks.Kill(consulTemplateSourceName, fmt.Sprintf("template runner returned unknown template id %q", id), true) return } // Read environment variables from templates - envMap, err := loadTemplateEnv(tmpls, taskDir) + envMap, err := loadTemplateEnv(tmpls, tm.config.TaskDir) if err != nil { - tm.hook.Kill("consul-template", err.Error(), true) + tm.config.Hooks.Kill(consulTemplateSourceName, err.Error(), true) return } - envBuilder.SetTemplateEnv(envMap) + tm.config.EnvBuilder.SetTemplateEnv(envMap) for _, tmpl := range tmpls { switch tmpl.ChangeMode { @@ -297,11 +439,11 @@ WAIT: } if restart { - tm.hook.Restart("consul-template", "template with change_mode restart re-rendered") + tm.config.Hooks.Restart(consulTemplateSourceName, "template with change_mode restart re-rendered") } else if len(signals) != 0 { var mErr multierror.Error for signal := range signals { - err := tm.hook.Signal("consul-template", "template re-rendered", tm.signals[signal]) + err := tm.config.Hooks.Signal(consulTemplateSourceName, "template re-rendered", tm.signals[signal]) if err != nil { multierror.Append(&mErr, err) } @@ -312,7 +454,7 @@ WAIT: for signal := range signals { flat = append(flat, tm.signals[signal]) } - tm.hook.Kill("consul-template", fmt.Sprintf("Sending signals %v failed: %v", flat, err), true) + tm.config.Hooks.Kill(consulTemplateSourceName, fmt.Sprintf("Sending signals %v failed: %v", flat, err), true) } } } @@ -322,7 +464,7 @@ WAIT: // allTemplatesNoop returns whether all the managed templates have change mode noop. func (tm *TaskTemplateManager) allTemplatesNoop() bool { - for _, tmpl := range tm.templates { + for _, tmpl := range tm.config.Templates { if tmpl.ChangeMode != structs.TemplateChangeModeNoop { return false } @@ -332,25 +474,23 @@ func (tm *TaskTemplateManager) allTemplatesNoop() bool { } // templateRunner returns a consul-template runner for the given templates and a -// lookup by destination to the template. If no templates are given, a nil -// template runner and lookup is returned. -func templateRunner(tmpls []*structs.Template, config *config.Config, - vaultToken, taskDir string, taskEnv *env.TaskEnv) ( +// lookup by destination to the template. If no templates are in the config, a +// nil template runner and lookup is returned. +func templateRunner(config *TaskTemplateManagerConfig) ( *manager.Runner, map[string][]*structs.Template, error) { - if len(tmpls) == 0 { + if len(config.Templates) == 0 { return nil, nil, nil } // Parse the templates - allowAbs := config.ReadBoolDefault(hostSrcOption, true) - ctmplMapping, err := parseTemplateConfigs(tmpls, taskDir, taskEnv, allowAbs) + ctmplMapping, err := parseTemplateConfigs(config) if err != nil { return nil, nil, err } // Create the runner configuration. - runnerConfig, err := newRunnerConfig(config, vaultToken, ctmplMapping) + runnerConfig, err := newRunnerConfig(config, ctmplMapping) if err != nil { return nil, nil, err } @@ -361,7 +501,7 @@ func templateRunner(tmpls []*structs.Template, config *config.Config, } // Set Nomad's environment variables - runner.Env = taskEnv.All() + runner.Env = config.EnvBuilder.Build().All() // Build the lookup idMap := runner.TemplateConfigMapping() @@ -377,12 +517,14 @@ func templateRunner(tmpls []*structs.Template, config *config.Config, return runner, lookup, nil } -// parseTemplateConfigs converts the tasks templates into consul-templates -func parseTemplateConfigs(tmpls []*structs.Template, taskDir string, - taskEnv *env.TaskEnv, allowAbs bool) (map[ctconf.TemplateConfig]*structs.Template, error) { +// parseTemplateConfigs converts the tasks templates in the config into +// consul-templates +func parseTemplateConfigs(config *TaskTemplateManagerConfig) (map[ctconf.TemplateConfig]*structs.Template, error) { + allowAbs := config.ClientConfig.ReadBoolDefault(hostSrcOption, true) + taskEnv := config.EnvBuilder.Build() - ctmpls := make(map[ctconf.TemplateConfig]*structs.Template, len(tmpls)) - for _, tmpl := range tmpls { + ctmpls := make(map[ctconf.TemplateConfig]*structs.Template, len(config.Templates)) + for _, tmpl := range config.Templates { var src, dest string if tmpl.SourcePath != "" { if filepath.IsAbs(tmpl.SourcePath) { @@ -392,11 +534,11 @@ func parseTemplateConfigs(tmpls []*structs.Template, taskDir string, src = tmpl.SourcePath } else { - src = filepath.Join(taskDir, taskEnv.ReplaceEnv(tmpl.SourcePath)) + src = filepath.Join(config.TaskDir, taskEnv.ReplaceEnv(tmpl.SourcePath)) } } if tmpl.DestPath != "" { - dest = filepath.Join(taskDir, taskEnv.ReplaceEnv(tmpl.DestPath)) + dest = filepath.Join(config.TaskDir, taskEnv.ReplaceEnv(tmpl.DestPath)) } ct := ctconf.DefaultTemplateConfig() @@ -424,12 +566,11 @@ func parseTemplateConfigs(tmpls []*structs.Template, taskDir string, } // newRunnerConfig returns a consul-template runner configuration, setting the -// Vault and Consul configurations based on the clients configs. The parameters -// are the client config, Vault token if set and the mapping of consul-templates -// to Nomad templates. -func newRunnerConfig(config *config.Config, vaultToken string, +// Vault and Consul configurations based on the clients configs. +func newRunnerConfig(config *TaskTemplateManagerConfig, templateMapping map[ctconf.TemplateConfig]*structs.Template) (*ctconf.Config, error) { + cc := config.ClientConfig conf := ctconf.DefaultConfig() // Gather the consul-template tempates @@ -452,29 +593,29 @@ func newRunnerConfig(config *config.Config, vaultToken string, } // Force faster retries - if testRetryRate != 0 { - rate := testRetryRate + if config.retryRate != 0 { + rate := config.retryRate conf.Consul.Retry.Backoff = &rate } // Setup the Consul config - if config.ConsulConfig != nil { - conf.Consul.Address = &config.ConsulConfig.Addr - conf.Consul.Token = &config.ConsulConfig.Token + if cc.ConsulConfig != nil { + conf.Consul.Address = &cc.ConsulConfig.Addr + conf.Consul.Token = &cc.ConsulConfig.Token - if config.ConsulConfig.EnableSSL != nil && *config.ConsulConfig.EnableSSL { - verify := config.ConsulConfig.VerifySSL != nil && *config.ConsulConfig.VerifySSL + if cc.ConsulConfig.EnableSSL != nil && *cc.ConsulConfig.EnableSSL { + verify := cc.ConsulConfig.VerifySSL != nil && *cc.ConsulConfig.VerifySSL conf.Consul.SSL = &ctconf.SSLConfig{ Enabled: helper.BoolToPtr(true), Verify: &verify, - Cert: &config.ConsulConfig.CertFile, - Key: &config.ConsulConfig.KeyFile, - CaCert: &config.ConsulConfig.CAFile, + Cert: &cc.ConsulConfig.CertFile, + Key: &cc.ConsulConfig.KeyFile, + CaCert: &cc.ConsulConfig.CAFile, } } - if config.ConsulConfig.Auth != "" { - parts := strings.SplitN(config.ConsulConfig.Auth, ":", 2) + if cc.ConsulConfig.Auth != "" { + parts := strings.SplitN(cc.ConsulConfig.Auth, ":", 2) if len(parts) != 2 { return nil, fmt.Errorf("Failed to parse Consul Auth config") } @@ -492,22 +633,22 @@ func newRunnerConfig(config *config.Config, vaultToken string, emptyStr := "" conf.Vault.RenewToken = helper.BoolToPtr(false) conf.Vault.Token = &emptyStr - if config.VaultConfig != nil && config.VaultConfig.IsEnabled() { - conf.Vault.Address = &config.VaultConfig.Addr - conf.Vault.Token = &vaultToken + if cc.VaultConfig != nil && cc.VaultConfig.IsEnabled() { + conf.Vault.Address = &cc.VaultConfig.Addr + conf.Vault.Token = &config.VaultToken conf.Vault.Grace = helper.TimeToPtr(vaultGrace) - if strings.HasPrefix(config.VaultConfig.Addr, "https") || config.VaultConfig.TLSCertFile != "" { - skipVerify := config.VaultConfig.TLSSkipVerify != nil && *config.VaultConfig.TLSSkipVerify + if strings.HasPrefix(cc.VaultConfig.Addr, "https") || cc.VaultConfig.TLSCertFile != "" { + skipVerify := cc.VaultConfig.TLSSkipVerify != nil && *cc.VaultConfig.TLSSkipVerify verify := !skipVerify conf.Vault.SSL = &ctconf.SSLConfig{ Enabled: helper.BoolToPtr(true), Verify: &verify, - Cert: &config.VaultConfig.TLSCertFile, - Key: &config.VaultConfig.TLSKeyFile, - CaCert: &config.VaultConfig.TLSCaFile, - CaPath: &config.VaultConfig.TLSCaPath, - ServerName: &config.VaultConfig.TLSServerName, + Cert: &cc.VaultConfig.TLSCertFile, + Key: &cc.VaultConfig.TLSKeyFile, + CaCert: &cc.VaultConfig.TLSCaFile, + CaPath: &cc.VaultConfig.TLSCaPath, + ServerName: &cc.VaultConfig.TLSServerName, } } else { conf.Vault.SSL = &ctconf.SSLConfig{ diff --git a/client/consul_template_test.go b/client/consul_template_test.go index d3ef17b735cf..88ee17b0e50d 100644 --- a/client/consul_template_test.go +++ b/client/consul_template_test.go @@ -43,14 +43,18 @@ type MockTaskHooks struct { KillReason string KillCh chan struct{} + + Events []string + EmitEventCh chan struct{} } func NewMockTaskHooks() *MockTaskHooks { return &MockTaskHooks{ - UnblockCh: make(chan struct{}, 1), - RestartCh: make(chan struct{}, 1), - SignalCh: make(chan struct{}, 1), - KillCh: make(chan struct{}, 1), + UnblockCh: make(chan struct{}, 1), + RestartCh: make(chan struct{}, 1), + SignalCh: make(chan struct{}, 1), + KillCh: make(chan struct{}, 1), + EmitEventCh: make(chan struct{}, 1), } } func (m *MockTaskHooks) Restart(source, reason string) { @@ -87,6 +91,14 @@ func (m *MockTaskHooks) UnblockStart(source string) { m.Unblocked = true } +func (m *MockTaskHooks) EmitEvent(source, message string) { + m.Events = append(m.Events, message) + select { + case m.EmitEventCh <- struct{}{}: + default: + } +} + // testHarness is used to test the TaskTemplateManager by spinning up // Consul/Vault as needed type testHarness struct { @@ -100,6 +112,7 @@ type testHarness struct { taskDir string vault *testutil.TestVault consul *ctestutil.TestServer + emitRate time.Duration } // newTestHarness returns a harness starting a dev consul and vault server, @@ -111,6 +124,7 @@ func newTestHarness(t *testing.T, templates []*structs.Template, consul, vault b templates: templates, node: mock.Node(), config: &config.Config{Region: region}, + emitRate: DefaultMaxTemplateEventRate, } // Build the task environment @@ -146,22 +160,31 @@ func newTestHarness(t *testing.T, templates []*structs.Template, consul, vault b } func (h *testHarness) start(t *testing.T) { - manager, err := NewTaskTemplateManager(h.mockHooks, h.templates, - h.config, h.vaultToken, h.taskDir, h.envBuilder) - if err != nil { + if err := h.startWithErr(); err != nil { t.Fatalf("failed to build task template manager: %v", err) } - - h.manager = manager } func (h *testHarness) startWithErr() error { - manager, err := NewTaskTemplateManager(h.mockHooks, h.templates, - h.config, h.vaultToken, h.taskDir, h.envBuilder) - h.manager = manager + var err error + h.manager, err = NewTaskTemplateManager(&TaskTemplateManagerConfig{ + Hooks: h.mockHooks, + Templates: h.templates, + ClientConfig: h.config, + VaultToken: h.vaultToken, + TaskDir: h.taskDir, + EnvBuilder: h.envBuilder, + MaxTemplateEventRate: h.emitRate, + retryRate: 10 * time.Millisecond, + }) + return err } +func (h *testHarness) setEmitRate(d time.Duration) { + h.emitRate = d +} + // stop is used to stop any running Vault or Consul server plus the task manager func (h *testHarness) stop() { if h.vault != nil { @@ -178,61 +201,118 @@ func (h *testHarness) stop() { } } -func TestTaskTemplateManager_Invalid(t *testing.T) { +func TestTaskTemplateManager_InvalidConfig(t *testing.T) { t.Parallel() hooks := NewMockTaskHooks() - var tmpls []*structs.Template - region := "global" - config := &config.Config{Region: region} + clientConfig := &config.Config{Region: "global"} taskDir := "foo" - vaultToken := "" a := mock.Alloc() - envBuilder := env.NewBuilder(mock.Node(), a, a.Job.TaskGroups[0].Tasks[0], config.Region) + envBuilder := env.NewBuilder(mock.Node(), a, a.Job.TaskGroups[0].Tasks[0], clientConfig.Region) - _, err := NewTaskTemplateManager(nil, nil, nil, "", "", nil) - if err == nil { - t.Fatalf("Expected error") - } - - _, err = NewTaskTemplateManager(nil, tmpls, config, vaultToken, taskDir, envBuilder) - if err == nil || !strings.Contains(err.Error(), "task hook") { - t.Fatalf("Expected invalid task hook error: %v", err) - } - - _, err = NewTaskTemplateManager(hooks, tmpls, nil, vaultToken, taskDir, envBuilder) - if err == nil || !strings.Contains(err.Error(), "config") { - t.Fatalf("Expected invalid config error: %v", err) - } - - _, err = NewTaskTemplateManager(hooks, tmpls, config, vaultToken, "", envBuilder) - if err == nil || !strings.Contains(err.Error(), "task directory") { - t.Fatalf("Expected invalid task dir error: %v", err) - } - - _, err = NewTaskTemplateManager(hooks, tmpls, config, vaultToken, taskDir, nil) - if err == nil || !strings.Contains(err.Error(), "task environment") { - t.Fatalf("Expected invalid task environment error: %v", err) - } - - tm, err := NewTaskTemplateManager(hooks, tmpls, config, vaultToken, taskDir, envBuilder) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } else if tm == nil { - t.Fatalf("Bad %v", tm) - } - - // Build a template with a bad signal - tmpl := &structs.Template{ - DestPath: "foo", - EmbeddedTmpl: "hello, world", - ChangeMode: structs.TemplateChangeModeSignal, - ChangeSignal: "foobarbaz", + cases := []struct { + name string + config *TaskTemplateManagerConfig + expectedErr string + }{ + { + name: "nil config", + config: nil, + expectedErr: "Nil config passed", + }, + { + name: "bad hooks", + config: &TaskTemplateManagerConfig{ + ClientConfig: clientConfig, + TaskDir: taskDir, + EnvBuilder: envBuilder, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }, + expectedErr: "task hooks", + }, + { + name: "bad client config", + config: &TaskTemplateManagerConfig{ + Hooks: hooks, + TaskDir: taskDir, + EnvBuilder: envBuilder, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }, + expectedErr: "client config", + }, + { + name: "bad task dir", + config: &TaskTemplateManagerConfig{ + ClientConfig: clientConfig, + Hooks: hooks, + EnvBuilder: envBuilder, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }, + expectedErr: "task directory", + }, + { + name: "bad env builder", + config: &TaskTemplateManagerConfig{ + ClientConfig: clientConfig, + Hooks: hooks, + TaskDir: taskDir, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }, + expectedErr: "task environment", + }, + { + name: "bad max event rate", + config: &TaskTemplateManagerConfig{ + ClientConfig: clientConfig, + Hooks: hooks, + TaskDir: taskDir, + EnvBuilder: envBuilder, + }, + expectedErr: "template event rate", + }, + { + name: "valid", + config: &TaskTemplateManagerConfig{ + ClientConfig: clientConfig, + Hooks: hooks, + TaskDir: taskDir, + EnvBuilder: envBuilder, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }, + }, + { + name: "invalid signal", + config: &TaskTemplateManagerConfig{ + Templates: []*structs.Template{ + { + DestPath: "foo", + EmbeddedTmpl: "hello, world", + ChangeMode: structs.TemplateChangeModeSignal, + ChangeSignal: "foobarbaz", + }, + }, + ClientConfig: clientConfig, + Hooks: hooks, + TaskDir: taskDir, + EnvBuilder: envBuilder, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }, + expectedErr: "parse signal", + }, } - tmpls = append(tmpls, tmpl) - tm, err = NewTaskTemplateManager(hooks, tmpls, config, vaultToken, taskDir, envBuilder) - if err == nil || !strings.Contains(err.Error(), "Failed to parse signal") { - t.Fatalf("Expected signal parsing error: %v", err) + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + _, err := NewTaskTemplateManager(c.config) + if err != nil { + if c.expectedErr == "" { + t.Fatalf("unexpected error: %v", err) + } else if !strings.Contains(err.Error(), c.expectedErr) { + t.Fatalf("expected error to contain %q; got %q", c.expectedErr, err.Error()) + } + } else if c.expectedErr != "" { + t.Fatalf("expected an error to contain %q", c.expectedErr) + } + }) } } @@ -448,9 +528,6 @@ func TestTaskTemplateManager_Unblock_Consul(t *testing.T) { ChangeMode: structs.TemplateChangeModeNoop, } - // Drop the retry rate - testRetryRate = 10 * time.Millisecond - harness := newTestHarness(t, []*structs.Template{template}, true, false) harness.start(t) defer harness.stop() @@ -498,9 +575,6 @@ func TestTaskTemplateManager_Unblock_Vault(t *testing.T) { ChangeMode: structs.TemplateChangeModeNoop, } - // Drop the retry rate - testRetryRate = 10 * time.Millisecond - harness := newTestHarness(t, []*structs.Template{template}, false, true) harness.start(t) defer harness.stop() @@ -557,9 +631,6 @@ func TestTaskTemplateManager_Unblock_Multi_Template(t *testing.T) { ChangeMode: structs.TemplateChangeModeNoop, } - // Drop the retry rate - testRetryRate = 10 * time.Millisecond - harness := newTestHarness(t, []*structs.Template{template, template2}, true, false) harness.start(t) defer harness.stop() @@ -618,9 +689,6 @@ func TestTaskTemplateManager_Rerender_Noop(t *testing.T) { ChangeMode: structs.TemplateChangeModeNoop, } - // Drop the retry rate - testRetryRate = 10 * time.Millisecond - harness := newTestHarness(t, []*structs.Template{template}, true, false) harness.start(t) defer harness.stop() @@ -704,9 +772,6 @@ func TestTaskTemplateManager_Rerender_Signal(t *testing.T) { ChangeSignal: "SIGBUS", } - // Drop the retry rate - testRetryRate = 10 * time.Millisecond - harness := newTestHarness(t, []*structs.Template{template, template2}, true, false) harness.start(t) defer harness.stop() @@ -790,9 +855,6 @@ func TestTaskTemplateManager_Rerender_Restart(t *testing.T) { ChangeMode: structs.TemplateChangeModeRestart, } - // Drop the retry rate - testRetryRate = 10 * time.Millisecond - harness := newTestHarness(t, []*structs.Template{template}, true, false) harness.start(t) defer harness.stop() @@ -893,9 +955,6 @@ func TestTaskTemplateManager_Signal_Error(t *testing.T) { ChangeSignal: "SIGALRM", } - // Drop the retry rate - testRetryRate = 10 * time.Millisecond - harness := newTestHarness(t, []*structs.Template{template}, true, false) harness.start(t) defer harness.stop() @@ -1063,7 +1122,11 @@ func TestTaskTemplateManager_Config_ServerName(t *testing.T) { Addr: "https://localhost/", TLSServerName: "notlocalhost", } - ctconf, err := newRunnerConfig(c, "token", nil) + config := &TaskTemplateManagerConfig{ + ClientConfig: c, + VaultToken: "token", + } + ctconf, err := newRunnerConfig(config, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -1079,34 +1142,97 @@ func TestTaskTemplateManager_Config_VaultGrace(t *testing.T) { t.Parallel() assert := assert.New(t) c := config.DefaultConfig() + c.Node = mock.Node() c.VaultConfig = &sconfig.VaultConfig{ Enabled: helper.BoolToPtr(true), Addr: "https://localhost/", TLSServerName: "notlocalhost", } - // Make a template that will render immediately - templates := []*structs.Template{ - { - EmbeddedTmpl: "bar", - DestPath: "foo", - ChangeMode: structs.TemplateChangeModeNoop, - VaultGrace: 10 * time.Second, - }, - { - EmbeddedTmpl: "baz", - DestPath: "bam", - ChangeMode: structs.TemplateChangeModeNoop, - VaultGrace: 100 * time.Second, + alloc := mock.Alloc() + config := &TaskTemplateManagerConfig{ + ClientConfig: c, + VaultToken: "token", + + // Make a template that will render immediately + Templates: []*structs.Template{ + { + EmbeddedTmpl: "bar", + DestPath: "foo", + ChangeMode: structs.TemplateChangeModeNoop, + VaultGrace: 10 * time.Second, + }, + { + EmbeddedTmpl: "baz", + DestPath: "bam", + ChangeMode: structs.TemplateChangeModeNoop, + VaultGrace: 100 * time.Second, + }, }, + EnvBuilder: env.NewBuilder(c.Node, alloc, alloc.Job.TaskGroups[0].Tasks[0], c.Region), } - taskEnv := env.NewTaskEnv(nil, nil) - ctmplMapping, err := parseTemplateConfigs(templates, "/fake/dir", taskEnv, false) + ctmplMapping, err := parseTemplateConfigs(config) assert.Nil(err, "Parsing Templates") - ctconf, err := newRunnerConfig(c, "token", ctmplMapping) + ctconf, err := newRunnerConfig(config, ctmplMapping) assert.Nil(err, "Building Runner Config") assert.NotNil(ctconf.Vault.Grace, "Vault Grace Pointer") assert.Equal(10*time.Second, *ctconf.Vault.Grace, "Vault Grace Value") } + +func TestTaskTemplateManager_BlockedEvents(t *testing.T) { + t.Parallel() + // Make a template that will render based on a key in Consul + var embedded string + for i := 0; i < 5; i++ { + embedded += fmt.Sprintf(`{{key "%d"}}`, i) + } + + file := "my.tmpl" + template := &structs.Template{ + EmbeddedTmpl: embedded, + DestPath: file, + ChangeMode: structs.TemplateChangeModeNoop, + } + + harness := newTestHarness(t, []*structs.Template{template}, true, false) + harness.setEmitRate(100 * time.Millisecond) + harness.start(t) + defer harness.stop() + + // Ensure that we get a blocked event + select { + case <-harness.mockHooks.UnblockCh: + t.Fatalf("Task unblock should have not have been called") + case <-harness.mockHooks.EmitEventCh: + case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second): + t.Fatalf("timeout") + } + + // Check to see we got a correct message + event := harness.mockHooks.Events[0] + if !strings.Contains(event, "and 2 more") { + t.Fatalf("bad event: %q", event) + } + + // Write 3 keys to Consul + for i := 0; i < 3; i++ { + harness.consul.SetKV(t, fmt.Sprintf("%d", i), []byte{0xa}) + } + + // Ensure that we get a blocked event + select { + case <-harness.mockHooks.UnblockCh: + t.Fatalf("Task unblock should have not have been called") + case <-harness.mockHooks.EmitEventCh: + case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second): + t.Fatalf("timeout") + } + + // Check to see we got a correct message + event = harness.mockHooks.Events[len(harness.mockHooks.Events)-1] + if !strings.Contains(event, "Missing") || strings.Contains(event, "more") { + t.Fatalf("bad event: %q", event) + } +} diff --git a/client/task_runner.go b/client/task_runner.go index 4e0ca6abd84e..7f0398b21e2b 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -188,8 +188,9 @@ func (s *taskRunnerState) Hash() []byte { return h.Sum(nil) } -// TaskStateUpdater is used to signal that tasks state has changed. -type TaskStateUpdater func(taskName, state string, event *structs.TaskEvent) +// TaskStateUpdater is used to signal that tasks state has changed. If lazySync +// is set the event won't be immediately pushed to the server. +type TaskStateUpdater func(taskName, state string, event *structs.TaskEvent, lazySync bool) // SignalEvent is a tuple of the signal and the event generating it type SignalEvent struct { @@ -251,7 +252,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, // MarkReceived marks the task as received. func (r *TaskRunner) MarkReceived() { - r.updater(r.task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived)) + r.updater(r.task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived), false) } // WaitCh returns a channel to wait for termination @@ -498,14 +499,14 @@ func (r *TaskRunner) DestroyState() error { } // setState is used to update the state of the task runner -func (r *TaskRunner) setState(state string, event *structs.TaskEvent) { +func (r *TaskRunner) setState(state string, event *structs.TaskEvent, lazySync bool) { // Persist our state to disk. if err := r.SaveState(); err != nil { r.logger.Printf("[ERR] client: failed to save state of Task Runner for task %q: %v", r.task.Name, err) } // Indicate the task has been updated. - r.updater(r.task.Name, state, event) + r.updater(r.task.Name, state, event, lazySync) } // createDriver makes a driver for the task @@ -515,7 +516,7 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) { eventEmitter := func(m string, args ...interface{}) { msg := fmt.Sprintf(m, args...) r.logger.Printf("[DEBUG] client: driver event for alloc %q: %s", r.alloc.ID, msg) - r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg)) + r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg), false) } driverCtx := driver.NewDriverContext(r.task.Name, r.alloc.ID, r.config, r.config.Node, r.logger, eventEmitter) @@ -537,7 +538,8 @@ func (r *TaskRunner) Run() { if err := r.validateTask(); err != nil { r.setState( structs.TaskStateDead, - structs.NewTaskEvent(structs.TaskFailedValidation).SetValidationError(err).SetFailsTask()) + structs.NewTaskEvent(structs.TaskFailedValidation).SetValidationError(err).SetFailsTask(), + false) return } @@ -549,7 +551,8 @@ func (r *TaskRunner) Run() { e := fmt.Errorf("failed to create driver of task %q for alloc %q: %v", r.task.Name, r.alloc.ID, err) r.setState( structs.TaskStateDead, - structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(e).SetFailsTask()) + structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(e).SetFailsTask(), + false) return } @@ -560,7 +563,8 @@ func (r *TaskRunner) Run() { e := fmt.Errorf("failed to build task directory for %q: %v", r.task.Name, err) r.setState( structs.TaskStateDead, - structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(e).SetFailsTask()) + structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(e).SetFailsTask(), + false) return } @@ -854,11 +858,21 @@ func (r *TaskRunner) updatedTokenHandler() { // Create a new templateManager var err error - r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates, - r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.envBuilder) + r.templateManager, err = NewTaskTemplateManager(&TaskTemplateManagerConfig{ + Hooks: r, + Templates: r.task.Templates, + ClientConfig: r.config, + VaultToken: r.vaultFuture.Get(), + TaskDir: r.taskDir.Dir, + EnvBuilder: r.envBuilder, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }) + if err != nil { err := fmt.Errorf("failed to build task's template manager: %v", err) - r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask()) + r.setState(structs.TaskStateDead, + structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask(), + false) r.logger.Printf("[ERR] client: alloc %q, task %q %v", r.alloc.ID, r.task.Name, err) r.Kill("vault", err.Error(), true) return @@ -893,7 +907,8 @@ func (r *TaskRunner) prestart(alloc *structs.Allocation, task *structs.Task, res if err != nil { r.setState( structs.TaskStateDead, - structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask()) + structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask(), + false) resultCh <- false return } @@ -901,7 +916,8 @@ func (r *TaskRunner) prestart(alloc *structs.Allocation, task *structs.Task, res if err := os.MkdirAll(filepath.Dir(renderTo), 07777); err != nil { r.setState( structs.TaskStateDead, - structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask()) + structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask(), + false) resultCh <- false return } @@ -909,7 +925,8 @@ func (r *TaskRunner) prestart(alloc *structs.Allocation, task *structs.Task, res if err := ioutil.WriteFile(renderTo, decoded, 0777); err != nil { r.setState( structs.TaskStateDead, - structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask()) + structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask(), + false) resultCh <- false return } @@ -924,14 +941,14 @@ func (r *TaskRunner) prestart(alloc *structs.Allocation, task *structs.Task, res // Download the task's artifacts if !downloaded && len(task.Artifacts) > 0 { - r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts)) + r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts), false) taskEnv := r.envBuilder.Build() for _, artifact := range task.Artifacts { if err := getter.GetArtifact(taskEnv, artifact, r.taskDir.Dir); err != nil { wrapped := fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err) r.logger.Printf("[DEBUG] client: %v", wrapped) r.setState(structs.TaskStatePending, - structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(wrapped)) + structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(wrapped), false) r.restartTracker.SetStartError(structs.WrapRecoverable(wrapped.Error(), err)) goto RESTART } @@ -957,11 +974,18 @@ func (r *TaskRunner) prestart(alloc *structs.Allocation, task *structs.Task, res // Build the template manager if r.templateManager == nil { var err error - r.templateManager, err = NewTaskTemplateManager(r, task.Templates, - r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.envBuilder) + r.templateManager, err = NewTaskTemplateManager(&TaskTemplateManagerConfig{ + Hooks: r, + Templates: r.task.Templates, + ClientConfig: r.config, + VaultToken: r.vaultFuture.Get(), + TaskDir: r.taskDir.Dir, + EnvBuilder: r.envBuilder, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }) if err != nil { err := fmt.Errorf("failed to build task's template manager: %v", err) - r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask()) + r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask(), false) r.logger.Printf("[ERR] client: alloc %q, task %q %v", alloc.ID, task.Name, err) resultCh <- false return @@ -1032,7 +1056,7 @@ func (r *TaskRunner) run() { case success := <-prestartResultCh: if !success { r.cleanup() - r.setState(structs.TaskStateDead, nil) + r.setState(structs.TaskStateDead, nil, false) return } case <-r.startCh: @@ -1044,12 +1068,12 @@ func (r *TaskRunner) run() { startErr := r.startTask() r.restartTracker.SetStartError(startErr) if startErr != nil { - r.setState("", structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(startErr)) + r.setState("", structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(startErr), true) goto RESTART } // Mark the task as started - r.setState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) + r.setState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted), false) r.runningLock.Lock() r.running = true r.runningLock.Unlock() @@ -1076,7 +1100,7 @@ func (r *TaskRunner) run() { // Log whether the task was successful or not. r.restartTracker.SetWaitResult(waitRes) - r.setState("", r.waitErrorToEvent(waitRes)) + r.setState("", r.waitErrorToEvent(waitRes), true) if !waitRes.Successful() { r.logger.Printf("[INFO] client: task %q for alloc %q failed: %v", r.task.Name, r.alloc.ID, waitRes) } else { @@ -1102,7 +1126,7 @@ func (r *TaskRunner) run() { } r.logger.Printf("[DEBUG] client: sending %s", common) - r.setState(structs.TaskStateRunning, se.e) + r.setState(structs.TaskStateRunning, se.e, false) res := r.handle.Signal(se.s) se.result <- res @@ -1118,7 +1142,7 @@ func (r *TaskRunner) run() { } r.logger.Printf("[DEBUG] client: restarting %s: %v", common, event.RestartReason) - r.setState(structs.TaskStateRunning, event) + r.setState(structs.TaskStateRunning, event, false) r.killTask(nil) close(stopCollection) @@ -1138,7 +1162,7 @@ func (r *TaskRunner) run() { r.runningLock.Unlock() if !running { r.cleanup() - r.setState(structs.TaskStateDead, r.destroyEvent) + r.setState(structs.TaskStateDead, r.destroyEvent, false) return } @@ -1155,7 +1179,7 @@ func (r *TaskRunner) run() { if r.destroyEvent.Type == structs.TaskKilling { killEvent = r.destroyEvent } else { - r.setState(structs.TaskStateRunning, r.destroyEvent) + r.setState(structs.TaskStateRunning, r.destroyEvent, false) } } @@ -1166,7 +1190,7 @@ func (r *TaskRunner) run() { <-handleWaitCh r.cleanup() - r.setState(structs.TaskStateDead, nil) + r.setState(structs.TaskStateDead, nil, false) return } } @@ -1176,7 +1200,7 @@ func (r *TaskRunner) run() { restart := r.shouldRestart() if !restart { r.cleanup() - r.setState(structs.TaskStateDead, nil) + r.setState(structs.TaskStateDead, nil, false) return } @@ -1240,7 +1264,8 @@ func (r *TaskRunner) shouldRestart() bool { if state == structs.TaskNotRestarting { r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskNotRestarting). - SetRestartReason(reason).SetFailsTask()) + SetRestartReason(reason).SetFailsTask(), + false) } return false case structs.TaskRestarting: @@ -1248,7 +1273,8 @@ func (r *TaskRunner) shouldRestart() bool { r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskRestarting). SetRestartDelay(when). - SetRestartReason(reason)) + SetRestartReason(reason), + false) default: r.logger.Printf("[ERR] client: restart tracker returned unknown state: %q", state) return false @@ -1270,7 +1296,7 @@ func (r *TaskRunner) shouldRestart() bool { r.destroyLock.Unlock() if destroyed { r.logger.Printf("[DEBUG] client: Not restarting task: %v because it has been destroyed", r.task.Name) - r.setState(structs.TaskStateDead, r.destroyEvent) + r.setState(structs.TaskStateDead, r.destroyEvent, false) return false } @@ -1302,7 +1328,7 @@ func (r *TaskRunner) killTask(killingEvent *structs.TaskEvent) { event.SetKillTimeout(timeout) // Mark that we received the kill event - r.setState(structs.TaskStateRunning, event) + r.setState(structs.TaskStateRunning, event, false) handle := r.getHandle() @@ -1318,7 +1344,7 @@ func (r *TaskRunner) killTask(killingEvent *structs.TaskEvent) { r.runningLock.Unlock() // Store that the task has been destroyed and any associated error. - r.setState("", structs.NewTaskEvent(structs.TaskKilled).SetKillError(err)) + r.setState("", structs.NewTaskEvent(structs.TaskKilled).SetKillError(err), true) } // startTask creates the driver, task dir, and starts the task. @@ -1436,8 +1462,9 @@ func (r *TaskRunner) buildTaskDir(fsi cstructs.FSIsolation) error { // and the task dir is already built. The reason we call Build again is to // ensure that the task dir invariants are still held. if !built { - r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskSetup). - SetMessage(structs.TaskBuildingTaskDir)) + r.setState(structs.TaskStatePending, + structs.NewTaskEvent(structs.TaskSetup).SetMessage(structs.TaskBuildingTaskDir), + false) } chroot := config.DefaultChrootEnv @@ -1662,6 +1689,14 @@ func (r *TaskRunner) Kill(source, reason string, fail bool) { r.Destroy(event) } +func (r *TaskRunner) EmitEvent(source, message string) { + event := structs.NewTaskEvent(structs.TaskGenericMessage). + SetGenericSource(source).SetMessage(message) + r.setState("", event, false) + r.logger.Printf("[DEBUG] client: event from %q for task %q in alloc %q: %v", + source, r.task.Name, r.alloc.ID, message) +} + // UnblockStart unblocks the starting of the task. It currently assumes only // consul-template will unblock func (r *TaskRunner) UnblockStart(source string) { diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 7e5f1151ddfb..788556c81d07 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -41,7 +41,7 @@ type MockTaskStateUpdater struct { events []*structs.TaskEvent } -func (m *MockTaskStateUpdater) Update(name, state string, event *structs.TaskEvent) { +func (m *MockTaskStateUpdater) Update(name, state string, event *structs.TaskEvent, _ bool) { if state != "" { m.state = state } diff --git a/command/alloc_status.go b/command/alloc_status.go index 9094ee002032..723029698338 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -398,6 +398,9 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) { desc = event.DriverMessage case api.TaskLeaderDead: desc = "Leader Task in Group dead" + case api.TaskGenericMessage: + event.Type = event.GenericSource + desc = event.Message } // Reverse order so we are sorted by time diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 134a81a4caeb..fdbcc2b1d2fe 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3539,6 +3539,9 @@ const ( // TaskLeaderDead indicates that the leader task within the has finished. TaskLeaderDead = "Leader Task Dead" + + // TaskGenericMessage is used by various subsystems to emit a message. + TaskGenericMessage = "Generic" ) // TaskEvent is an event that effects the state of a task and contains meta-data @@ -3600,6 +3603,9 @@ type TaskEvent struct { // DriverMessage indicates a driver action being taken. DriverMessage string + + // GenericSource is the source of a message. + GenericSource string } func (te *TaskEvent) GoString() string { @@ -3739,6 +3745,11 @@ func (e *TaskEvent) SetDriverMessage(m string) *TaskEvent { return e } +func (e *TaskEvent) SetGenericSource(s string) *TaskEvent { + e.GenericSource = s + return e +} + // TaskArtifact is an artifact to download before running the task. type TaskArtifact struct { // GetterSource is the source to download an artifact using go-getter diff --git a/vendor/github.com/hashicorp/consul-template/manager/renderer.go b/vendor/github.com/hashicorp/consul-template/manager/renderer.go index ea5d0bcebefa..8ec1fc34a5fb 100644 --- a/vendor/github.com/hashicorp/consul-template/manager/renderer.go +++ b/vendor/github.com/hashicorp/consul-template/manager/renderer.go @@ -66,7 +66,7 @@ func Render(i *RenderInput) (*RenderResult, error) { return &RenderResult{ DidRender: true, WouldRender: true, - Contents: existing, + Contents: i.Contents, }, nil } diff --git a/vendor/github.com/hashicorp/consul-template/manager/runner.go b/vendor/github.com/hashicorp/consul-template/manager/runner.go index 770e72200944..c1347277c59a 100644 --- a/vendor/github.com/hashicorp/consul-template/manager/runner.go +++ b/vendor/github.com/hashicorp/consul-template/manager/runner.go @@ -64,6 +64,12 @@ type Runner struct { // renderedCh is used to signal that a template has been rendered renderedCh chan struct{} + // renderEventCh is used to signal that there is a new render event. A + // render event doesn't necessarily mean that a template has been rendered, + // only that templates attempted to render and may have updated their + // dependency sets. + renderEventCh chan struct{} + // dependencies is the list of dependencies this runner is watching. dependencies map[string]dep.Dependency @@ -395,12 +401,18 @@ func (r *Runner) Stop() { close(r.DoneCh) } -// TemplateRenderedCh returns a channel that will return the path of the -// template when it is rendered. +// TemplateRenderedCh returns a channel that will be triggered when one or more +// templates are rendered. func (r *Runner) TemplateRenderedCh() <-chan struct{} { return r.renderedCh } +// RenderEventCh returns a channel that will be triggered when there is a new +// render event. +func (r *Runner) RenderEventCh() <-chan struct{} { + return r.renderEventCh +} + // RenderEvents returns the render events for each template was rendered. The // map is keyed by template ID. func (r *Runner) RenderEvents() map[string]*RenderEvent { @@ -486,202 +498,36 @@ func (r *Runner) Signal(s os.Signal) error { func (r *Runner) Run() error { log.Printf("[INFO] (runner) initiating run") - var wouldRenderAny, renderedAny bool - var commands []*config.TemplateConfig - depsMap := make(map[string]dep.Dependency) + var newRenderEvent, wouldRenderAny, renderedAny bool + runCtx := &templateRunCtx{ + depsMap: make(map[string]dep.Dependency), + } for _, tmpl := range r.templates { - log.Printf("[DEBUG] (runner) checking template %s", tmpl.ID()) - - // Grab the last event - lastEvent := r.renderEvents[tmpl.ID()] - - // Create the event - event := &RenderEvent{ - Template: tmpl, - TemplateConfigs: r.templateConfigsFor(tmpl), - } - - if lastEvent != nil { - event.LastWouldRender = lastEvent.LastWouldRender - event.LastDidRender = lastEvent.LastDidRender - } - - // Check if we are currently the leader instance - isLeader := true - if r.dedup != nil { - isLeader = r.dedup.IsLeader(tmpl) - } - - // If we are in once mode and this template was already rendered, move - // onto the next one. We do not want to re-render the template if we are - // in once mode, and we certainly do not want to re-run any commands. - if r.once { - r.renderEventsLock.RLock() - _, rendered := r.renderEvents[tmpl.ID()] - r.renderEventsLock.RUnlock() - if rendered { - log.Printf("[DEBUG] (runner) once mode and already rendered") - continue - } - } - - // Attempt to render the template, returning any missing dependencies and - // the rendered contents. If there are any missing dependencies, the - // contents cannot be rendered or trusted! - result, err := tmpl.Execute(&template.ExecuteInput{ - Brain: r.brain, - Env: r.childEnv(), - }) + event, err := r.runTemplate(tmpl, runCtx) if err != nil { - return errors.Wrap(err, tmpl.Source()) - } - - // Grab the list of used and missing dependencies. - missing, used := result.Missing, result.Used - - // Add the dependency to the list of dependencies for this runner. - for _, d := range used.List() { - // If we've taken over leadership for a template, we may have data - // that is cached, but not have the watcher. We must treat this as - // missing so that we create the watcher and re-run the template. - if isLeader && !r.watcher.Watching(d) { - missing.Add(d) - } - if _, ok := depsMap[d.String()]; !ok { - depsMap[d.String()] = d - } - } - - // Diff any missing dependencies the template reported with dependencies - // the watcher is watching. - unwatched := new(dep.Set) - for _, d := range missing.List() { - if !r.watcher.Watching(d) { - unwatched.Add(d) - } - } - - // If there are unwatched dependencies, start the watcher and move onto the - // next one. - if l := unwatched.Len(); l > 0 { - log.Printf("[DEBUG] (runner) was not watching %d dependencies", l) - for _, d := range unwatched.List() { - // If we are deduplicating, we must still handle non-sharable - // dependencies, since those will be ignored. - if isLeader || !d.CanShare() { - r.watcher.Add(d) - } - } - continue - } - - // If the template is missing data for some dependencies then we are not - // ready to render and need to move on to the next one. - if l := missing.Len(); l > 0 { - log.Printf("[DEBUG] (runner) missing data for %d dependencies", l) - continue - } - - // Trigger an update of the de-duplicaiton manager - if r.dedup != nil && isLeader { - if err := r.dedup.UpdateDeps(tmpl, used.List()); err != nil { - log.Printf("[ERR] (runner) failed to update dependency data for de-duplication: %v", err) - } - } - - // Update event information with dependencies. - event.MissingDeps = missing - event.UnwatchedDeps = unwatched - event.UsedDeps = used - - // If quiescence is activated, start/update the timers and loop back around. - // We do not want to render the templates yet. - if q, ok := r.quiescenceMap[tmpl.ID()]; ok { - q.tick() - continue + return err } - // For each template configuration that is tied to this template, attempt to - // render it to disk and accumulate commands for later use. - for _, templateConfig := range r.templateConfigsFor(tmpl) { - log.Printf("[DEBUG] (runner) rendering %s", templateConfig.Display()) - - // Render the template, taking dry mode into account - result, err := Render(&RenderInput{ - Backup: config.BoolVal(templateConfig.Backup), - Contents: result.Output, - Dry: r.dry, - DryStream: r.outStream, - Path: config.StringVal(templateConfig.Destination), - Perms: config.FileModeVal(templateConfig.Perms), - }) - if err != nil { - return errors.Wrap(err, "error rendering "+templateConfig.Display()) - } - - renderTime := time.Now().UTC() + // If there was a render event store it + if event != nil { + r.renderEventsLock.Lock() + r.renderEvents[tmpl.ID()] = event + r.renderEventsLock.Unlock() - // If we would have rendered this template (but we did not because the - // contents were the same or something), we should consider this template - // rendered even though the contents on disk have not been updated. We - // will not fire commands unless the template was _actually_ rendered to - // disk though. - if result.WouldRender { - // This event would have rendered - event.WouldRender = true - event.LastWouldRender = renderTime + // Record that there is at least one new render event + newRenderEvent = true - // Record that at least one template would have been rendered. + // Record that at least one template would have been rendered. + if event.WouldRender { wouldRenderAny = true } - // If we _actually_ rendered the template to disk, we want to run the - // appropriate commands. - if result.DidRender { - log.Printf("[INFO] (runner) rendered %s", templateConfig.Display()) - - // This event did render - event.DidRender = true - event.LastDidRender = renderTime - - // Update the contents - event.Contents = result.Contents - - // Record that at least one template was rendered. + // Record that at least one template was rendered. + if event.DidRender { renderedAny = true - - if !r.dry { - // If the template was rendered (changed) and we are not in dry-run mode, - // aggregate commands, ignoring previously known commands - // - // Future-self Q&A: Why not use a map for the commands instead of an - // array with an expensive lookup option? Well I'm glad you asked that - // future-self! One of the API promises is that commands are executed - // in the order in which they are provided in the TemplateConfig - // definitions. If we inserted commands into a map, we would lose that - // relative ordering and people would be unhappy. - // if config.StringPresent(ctemplate.Command) - if c := config.StringVal(templateConfig.Exec.Command); c != "" { - existing := findCommand(templateConfig, commands) - if existing != nil { - log.Printf("[DEBUG] (runner) skipping command %q from %s (already appended from %s)", - c, templateConfig.Display(), existing.Display()) - } else { - log.Printf("[DEBUG] (runner) appending command %q from %s", - c, templateConfig.Display()) - commands = append(commands, templateConfig) - } - } - } } } - - // Send updated render event - r.renderEventsLock.Lock() - event.UpdatedAt = time.Now().UTC() - r.renderEvents[tmpl.ID()] = event - r.renderEventsLock.Unlock() } // Check if we need to deliver any rendered signals @@ -693,13 +539,21 @@ func (r *Runner) Run() error { } } + // Check if we need to deliver any event signals + if newRenderEvent { + select { + case r.renderEventCh <- struct{}{}: + default: + } + } + // Perform the diff and update the known dependencies. - r.diffAndUpdateDeps(depsMap) + r.diffAndUpdateDeps(runCtx.depsMap) // Execute each command in sequence, collecting any errors that occur - this // ensures all commands execute at least once. var errs []error - for _, t := range commands { + for _, t := range runCtx.commands { command := config.StringVal(t.Exec.Command) log.Printf("[INFO] (runner) executing command %q from %s", command, t.Display()) env := t.Exec.Env.Copy() @@ -744,6 +598,208 @@ func (r *Runner) Run() error { return nil } +type templateRunCtx struct { + // commands is the set of commands that will be executed after all templates + // have run. When adding to the commands, care should be taken not to + // duplicate any existing command from a previous template. + commands []*config.TemplateConfig + + // depsMap is the set of dependencies shared across all templates. + depsMap map[string]dep.Dependency +} + +// runTemplate is used to run a particular template. It takes as input the +// template to run and a shared run context that allows sharing of information +// between templates. The run returns a potentially nil render event and any +// error that occured. The render event is nil in the case that the template has +// been already rendered and is a once template or if there is an error. +func (r *Runner) runTemplate(tmpl *template.Template, runCtx *templateRunCtx) (*RenderEvent, error) { + log.Printf("[DEBUG] (runner) checking template %s", tmpl.ID()) + + // Grab the last event + r.renderEventsLock.RLock() + lastEvent := r.renderEvents[tmpl.ID()] + r.renderEventsLock.RUnlock() + + // Create the event + event := &RenderEvent{ + Template: tmpl, + TemplateConfigs: r.templateConfigsFor(tmpl), + } + + if lastEvent != nil { + event.LastWouldRender = lastEvent.LastWouldRender + event.LastDidRender = lastEvent.LastDidRender + } + + // Check if we are currently the leader instance + isLeader := true + if r.dedup != nil { + isLeader = r.dedup.IsLeader(tmpl) + } + + // If we are in once mode and this template was already rendered, move + // onto the next one. We do not want to re-render the template if we are + // in once mode, and we certainly do not want to re-run any commands. + if r.once { + r.renderEventsLock.RLock() + event, ok := r.renderEvents[tmpl.ID()] + r.renderEventsLock.RUnlock() + if ok && (event.WouldRender || event.DidRender) { + log.Printf("[DEBUG] (runner) once mode and already rendered") + return nil, nil + } + } + + // Attempt to render the template, returning any missing dependencies and + // the rendered contents. If there are any missing dependencies, the + // contents cannot be rendered or trusted! + result, err := tmpl.Execute(&template.ExecuteInput{ + Brain: r.brain, + Env: r.childEnv(), + }) + if err != nil { + return nil, errors.Wrap(err, tmpl.Source()) + } + + // Grab the list of used and missing dependencies. + missing, used := result.Missing, result.Used + + // Add the dependency to the list of dependencies for this runner. + for _, d := range used.List() { + // If we've taken over leadership for a template, we may have data + // that is cached, but not have the watcher. We must treat this as + // missing so that we create the watcher and re-run the template. + if isLeader && !r.watcher.Watching(d) { + missing.Add(d) + } + if _, ok := runCtx.depsMap[d.String()]; !ok { + runCtx.depsMap[d.String()] = d + } + } + + // Diff any missing dependencies the template reported with dependencies + // the watcher is watching. + unwatched := new(dep.Set) + for _, d := range missing.List() { + if !r.watcher.Watching(d) { + unwatched.Add(d) + } + } + + // Update the event with the new dependency information + event.MissingDeps = missing + event.UnwatchedDeps = unwatched + event.UsedDeps = used + event.UpdatedAt = time.Now().UTC() + + // If there are unwatched dependencies, start the watcher and exit since we + // won't have data. + if l := unwatched.Len(); l > 0 { + log.Printf("[DEBUG] (runner) was not watching %d dependencies", l) + for _, d := range unwatched.List() { + // If we are deduplicating, we must still handle non-sharable + // dependencies, since those will be ignored. + if isLeader || !d.CanShare() { + r.watcher.Add(d) + } + } + return event, nil + } + + // If the template is missing data for some dependencies then we are not + // ready to render and need to move on to the next one. + if l := missing.Len(); l > 0 { + log.Printf("[DEBUG] (runner) missing data for %d dependencies", l) + return event, nil + } + + // Trigger an update of the de-duplicaiton manager + if r.dedup != nil && isLeader { + if err := r.dedup.UpdateDeps(tmpl, used.List()); err != nil { + log.Printf("[ERR] (runner) failed to update dependency data for de-duplication: %v", err) + } + } + + // If quiescence is activated, start/update the timers and loop back around. + // We do not want to render the templates yet. + if q, ok := r.quiescenceMap[tmpl.ID()]; ok { + q.tick() + return event, nil + } + + // For each template configuration that is tied to this template, attempt to + // render it to disk and accumulate commands for later use. + for _, templateConfig := range r.templateConfigsFor(tmpl) { + log.Printf("[DEBUG] (runner) rendering %s", templateConfig.Display()) + + // Render the template, taking dry mode into account + result, err := Render(&RenderInput{ + Backup: config.BoolVal(templateConfig.Backup), + Contents: result.Output, + Dry: r.dry, + DryStream: r.outStream, + Path: config.StringVal(templateConfig.Destination), + Perms: config.FileModeVal(templateConfig.Perms), + }) + if err != nil { + return nil, errors.Wrap(err, "error rendering "+templateConfig.Display()) + } + + renderTime := time.Now().UTC() + + // If we would have rendered this template (but we did not because the + // contents were the same or something), we should consider this template + // rendered even though the contents on disk have not been updated. We + // will not fire commands unless the template was _actually_ rendered to + // disk though. + if result.WouldRender { + // This event would have rendered + event.WouldRender = true + event.LastWouldRender = renderTime + } + + // If we _actually_ rendered the template to disk, we want to run the + // appropriate commands. + if result.DidRender { + log.Printf("[INFO] (runner) rendered %s", templateConfig.Display()) + + // This event did render + event.DidRender = true + event.LastDidRender = renderTime + + // Update the contents + event.Contents = result.Contents + + if !r.dry { + // If the template was rendered (changed) and we are not in dry-run mode, + // aggregate commands, ignoring previously known commands + // + // Future-self Q&A: Why not use a map for the commands instead of an + // array with an expensive lookup option? Well I'm glad you asked that + // future-self! One of the API promises is that commands are executed + // in the order in which they are provided in the TemplateConfig + // definitions. If we inserted commands into a map, we would lose that + // relative ordering and people would be unhappy. + // if config.StringPresent(ctemplate.Command) + if c := config.StringVal(templateConfig.Exec.Command); c != "" { + existing := findCommand(templateConfig, runCtx.commands) + if existing != nil { + log.Printf("[DEBUG] (runner) skipping command %q from %s (already appended from %s)", + c, templateConfig.Display(), existing.Display()) + } else { + log.Printf("[DEBUG] (runner) appending command %q from %s", + c, templateConfig.Display()) + runCtx.commands = append(runCtx.commands, templateConfig) + } + } + } + } + } + + return event, nil +} + // init() creates the Runner's underlying data structures and returns an error // if any problems occur. func (r *Runner) init() error { @@ -809,6 +865,7 @@ func (r *Runner) init() error { r.dependencies = make(map[string]dep.Dependency) r.renderedCh = make(chan struct{}, 1) + r.renderEventCh = make(chan struct{}, 1) r.ctemplatesMap = ctemplatesMap r.inStream = os.Stdin diff --git a/vendor/vendor.json b/vendor/vendor.json index d180973f5987..82928f81f776 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -627,10 +627,10 @@ "revisionTime": "2017-08-01T00:58:49Z" }, { - "checksumSHA1": "Cu8hIII8Z6FAuunFI/jXPLl0nQA=", + "checksumSHA1": "tkMwyjIrH+reCJWIg45lvcmkhVQ=", "path": "github.com/hashicorp/consul-template/manager", - "revision": "7b3f45039cf3ad1a758683fd3eebb1cc72affa06", - "revisionTime": "2017-08-01T00:58:49Z" + "revision": "252f61dede55b1009323aae8b0ffcd2e2e933173", + "revisionTime": "2017-08-09T17:59:55Z" }, { "checksumSHA1": "oskgb0WteBKOItG8NNDduM7E/D0=",