From 1e7ae913e2b38ddf78e17bdfd2354b7a9c9d8d29 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 9 Aug 2017 17:25:29 -0700 Subject: [PATCH] Template emits events explaining why it is blocked This PR does the following: * Adds a mechanism to emit events in the TaskRunner * Vendors a new version of Consul-Template that allows extraction of missing dependencies * Adds logic to our consul_template.go to determine missing events and emit them in a batched fashion. * Refactors the consul_template code to split the run method and take in a config struct rather than many parameters. Fixes https://github.com/hashicorp/nomad/issues/2578 --- client/alloc_runner.go | 5 +- client/consul_template.go | 330 +++++++++++++++++++++++---------- client/consul_template_test.go | 302 ++++++++++++++++++++---------- client/task_runner.go | 26 ++- 4 files changed, 467 insertions(+), 196 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 6fc8b4076e2c..8b12c93c933c 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -620,8 +620,9 @@ func (r *AllocRunner) setStatus(status, desc string) { } } -// setTaskState is used to set the status of a task. If state is empty then the -// event is appended but not synced with the server. The event may be omitted +// setTaskState is used to set the status of a task. If lazySync is set then the +// event is appended but not synced with the server. If state is omitted, the +// last known state is used. func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEvent, lazySync bool) { r.taskStatusLock.Lock() defer r.taskStatusLock.Unlock() diff --git a/client/consul_template.go b/client/consul_template.go index 751b308fd4f2..02c64903915f 100644 --- a/client/consul_template.go +++ b/client/consul_template.go @@ -5,6 +5,7 @@ import ( "math/rand" "os" "path/filepath" + "sort" "strconv" "strings" "sync" @@ -25,6 +26,15 @@ const ( // hostSrcOption is the Client option that determines whether the template // source may be from the host hostSrcOption = "template.allow_host_source" + + // missingDepEventLimit is the number of missing dependencies that will be + // logged before we switch to showing just the number of missing + // dependencies. + missingDepEventLimit = 3 + + // DefaultMaxTemplateEventRate is the default maximum rate at which a + // template event should be fired. + DefaultMaxTemplateEventRate = 3 * time.Second ) var ( @@ -55,15 +65,12 @@ type TaskHooks interface { // TaskTemplateManager is used to run a set of templates for a given task type TaskTemplateManager struct { - // templates is the set of templates we are managing - templates []*structs.Template + // config holds the template managers configuration + config *TaskTemplateManagerConfig // lookup allows looking up the set of Nomad templates by their consul-template ID lookup map[string][]*structs.Template - // hooks is used to signal/restart the task as templates are rendered - hook TaskHooks - // runner is the consul-template runner runner *manager.Runner @@ -79,29 +86,67 @@ type TaskTemplateManager struct { shutdownLock sync.Mutex } -func NewTaskTemplateManager(hook TaskHooks, tmpls []*structs.Template, - config *config.Config, vaultToken, taskDir string, - envBuilder *env.Builder) (*TaskTemplateManager, error) { +// TaskTemplateManagerConfig is used to configure an instance of the +// TaskTemplateManager +type TaskTemplateManagerConfig struct { + // Hooks is used to interact with the task the template manager is being run + // for + Hooks TaskHooks + + // Templates is the set of templates we are managing + Templates []*structs.Template + + // ClientConfig is the Nomad Client configuration + ClientConfig *config.Config + + // VaultToken is the Vault token for the task. + VaultToken string + + // TaskDir is the task's directory + TaskDir string + + // EnvBuilder is the environment variable builder for the task. + EnvBuilder *env.Builder + + // MaxTemplateEventRate is the maximum rate at which we should emit events. + MaxTemplateEventRate time.Duration + // retryRate is only used for testing and is used to increase the retry rate + retryRate time.Duration +} + +// Validate validates the configuration. +func (c *TaskTemplateManagerConfig) Validate() error { + if c == nil { + return fmt.Errorf("Nil config passed") + } else if c.Hooks == nil { + return fmt.Errorf("Invalid task hooks given") + } else if c.ClientConfig == nil { + return fmt.Errorf("Invalid client config given") + } else if c.TaskDir == "" { + return fmt.Errorf("Invalid task directory given") + } else if c.EnvBuilder == nil { + return fmt.Errorf("Invalid task environment given") + } else if c.MaxTemplateEventRate == 0 { + return fmt.Errorf("Invalid max template event rate given") + } + + return nil +} + +func NewTaskTemplateManager(config *TaskTemplateManagerConfig) (*TaskTemplateManager, error) { // Check pre-conditions - if hook == nil { - return nil, fmt.Errorf("Invalid task hook given") - } else if config == nil { - return nil, fmt.Errorf("Invalid config given") - } else if taskDir == "" { - return nil, fmt.Errorf("Invalid task directory given") - } else if envBuilder == nil { - return nil, fmt.Errorf("Invalid task environment given") + if err := config.Validate(); err != nil { + return nil, err } tm := &TaskTemplateManager{ - templates: tmpls, - hook: hook, + config: config, shutdownCh: make(chan struct{}), } // Parse the signals that we need - for _, tmpl := range tmpls { + for _, tmpl := range config.Templates { if tmpl.ChangeSignal == "" { continue } @@ -119,14 +164,14 @@ func NewTaskTemplateManager(hook TaskHooks, tmpls []*structs.Template, } // Build the consul-template runner - runner, lookup, err := templateRunner(tmpls, config, vaultToken, taskDir, envBuilder.Build()) + runner, lookup, err := templateRunner(config) if err != nil { return nil, err } tm.runner = runner tm.lookup = lookup - go tm.run(envBuilder, taskDir) + go tm.run() return tm, nil } @@ -149,22 +194,61 @@ func (tm *TaskTemplateManager) Stop() { } // run is the long lived loop that handles errors and templates being rendered -func (tm *TaskTemplateManager) run(envBuilder *env.Builder, taskDir string) { +func (tm *TaskTemplateManager) run() { // Runner is nil if there is no templates if tm.runner == nil { // Unblock the start if there is nothing to do - tm.hook.UnblockStart("consul-template") + tm.config.Hooks.UnblockStart("Template") return } // Start the runner go tm.runner.Start() - // Track when they have all been rendered so we don't signal the task for - // any render event before hand - var allRenderedTime time.Time + // Block till all the templates have been rendered + tm.handleFirstRender() + + // Detect if there was a shutdown. + select { + case <-tm.shutdownCh: + return + default: + } + + // Read environment variables from env templates before we unblock + envMap, err := loadTemplateEnv(tm.config.Templates, tm.config.TaskDir) + if err != nil { + tm.config.Hooks.Kill("Template", err.Error(), true) + return + } + tm.config.EnvBuilder.SetTemplateEnv(envMap) + + // Unblock the task + tm.config.Hooks.UnblockStart("Template") + + // If all our templates are change mode no-op, then we can exit here + if tm.allTemplatesNoop() { + return + } + + // handle all subsequent render events. + tm.handleTemplateRerenders(time.Now()) +} + +// handleFirstRender blocks till all templates have been rendered +func (tm *TaskTemplateManager) handleFirstRender() { + // missingDependencies is the set of missing dependencies. + var missingDependencies map[string]struct{} + + // eventTimer is used to trigger the firing of an event showing the missing + // dependencies. + var eventTimer *time.Timer + var eventTimerCh <-chan time.Time + + // outstandingEvent tracks whether there is an outstanding event that should + // be fired. + outstandingEvent := false - // Handle the first rendering // Wait till all the templates have been rendered WAIT: for { @@ -176,7 +260,7 @@ WAIT: continue } - tm.hook.Kill("consul-template", err.Error(), true) + tm.config.Hooks.Kill("template", err.Error(), true) case <-tm.runner.TemplateRenderedCh(): // A template has been rendered, figure out what to do events := tm.runner.RenderEvents() @@ -194,28 +278,85 @@ WAIT: } break WAIT - } - } + case <-tm.runner.RenderEventCh(): + events := tm.runner.RenderEvents() + joinedSet := make(map[string]struct{}) + for _, event := range events { + missing := event.MissingDeps + if missing == nil { + continue + } - // Read environment variables from env templates - envMap, err := loadTemplateEnv(tm.templates, taskDir) - if err != nil { - tm.hook.Kill("consul-template", err.Error(), true) - return - } - envBuilder.SetTemplateEnv(envMap) + for _, dep := range missing.List() { + joinedSet[dep.String()] = struct{}{} + } + } - allRenderedTime = time.Now() - tm.hook.UnblockStart("consul-template") + // Check to see if the new joined set is the same as the old + different := len(joinedSet) != len(missingDependencies) + if !different { + for k := range joinedSet { + if _, ok := missingDependencies[k]; !ok { + different = true + break + } + } + } - // If all our templates are change mode no-op, then we can exit here - if tm.allTemplatesNoop() { - return + // Nothing to do + if !different { + continue + } + + // Update the missing set + missingDependencies = joinedSet + + // Update the event timer channel + if eventTimer == nil { + // We are creating the event channel for the first time. + eventTimer = time.NewTimer(tm.config.MaxTemplateEventRate) + eventTimerCh = eventTimer.C + defer eventTimer.Stop() + outstandingEvent = true + continue + } else if !outstandingEvent { + // We got new data so reset + outstandingEvent = true + eventTimer.Reset(tm.config.MaxTemplateEventRate) + } + case <-eventTimerCh: + if missingDependencies == nil { + continue + } + + // Clear the outstanding event + outstandingEvent = false + + // Build the missing set + missingSlice := make([]string, 0, len(missingDependencies)) + for k := range missingDependencies { + missingSlice = append(missingSlice, k) + } + sort.Strings(missingSlice) + + if l := len(missingSlice); l > missingDepEventLimit { + missingSlice[missingDepEventLimit] = fmt.Sprintf("and %d more", l-missingDepEventLimit) + missingSlice = missingSlice[:missingDepEventLimit+1] + } + + missingStr := strings.Join(missingSlice, ", ") + tm.config.Hooks.EmitEvent("Template", fmt.Sprintf("Missing: %s", missingStr)) + } } +} +// handleTemplateRerenders is used to handle template render events after they +// have all rendered. It takes action based on which set of templates re-render. +// The passed allRenderedTime is the time at which all templates have rendered. +// This is used to avoid signaling the task for any render event before hand. +func (tm *TaskTemplateManager) handleTemplateRerenders(allRenderedTime time.Time) { // A lookup for the last time the template was handled - numTemplates := len(tm.templates) - handledRenders := make(map[string]time.Time, numTemplates) + handledRenders := make(map[string]time.Time, len(tm.config.Templates)) for { select { @@ -226,7 +367,7 @@ WAIT: continue } - tm.hook.Kill("consul-template", err.Error(), true) + tm.config.Hooks.Kill("Template", err.Error(), true) case <-tm.runner.TemplateRenderedCh(): // A template has been rendered, figure out what to do var handling []string @@ -251,17 +392,17 @@ WAIT: // Lookup the template and determine what to do tmpls, ok := tm.lookup[id] if !ok { - tm.hook.Kill("consul-template", fmt.Sprintf("consul-template runner returned unknown template id %q", id), true) + tm.config.Hooks.Kill("Template", fmt.Sprintf("template runner returned unknown template id %q", id), true) return } // Read environment variables from templates - envMap, err := loadTemplateEnv(tmpls, taskDir) + envMap, err := loadTemplateEnv(tmpls, tm.config.TaskDir) if err != nil { - tm.hook.Kill("consul-template", err.Error(), true) + tm.config.Hooks.Kill("Template", err.Error(), true) return } - envBuilder.SetTemplateEnv(envMap) + tm.config.EnvBuilder.SetTemplateEnv(envMap) for _, tmpl := range tmpls { switch tmpl.ChangeMode { @@ -300,11 +441,11 @@ WAIT: } if restart { - tm.hook.Restart("consul-template", "template with change_mode restart re-rendered") + tm.config.Hooks.Restart("template", "template with change_mode restart re-rendered") } else if len(signals) != 0 { var mErr multierror.Error for signal := range signals { - err := tm.hook.Signal("consul-template", "template re-rendered", tm.signals[signal]) + err := tm.config.Hooks.Signal("Template", "template re-rendered", tm.signals[signal]) if err != nil { multierror.Append(&mErr, err) } @@ -315,7 +456,7 @@ WAIT: for signal := range signals { flat = append(flat, tm.signals[signal]) } - tm.hook.Kill("consul-template", fmt.Sprintf("Sending signals %v failed: %v", flat, err), true) + tm.config.Hooks.Kill("Template", fmt.Sprintf("Sending signals %v failed: %v", flat, err), true) } } } @@ -325,7 +466,7 @@ WAIT: // allTemplatesNoop returns whether all the managed templates have change mode noop. func (tm *TaskTemplateManager) allTemplatesNoop() bool { - for _, tmpl := range tm.templates { + for _, tmpl := range tm.config.Templates { if tmpl.ChangeMode != structs.TemplateChangeModeNoop { return false } @@ -335,25 +476,23 @@ func (tm *TaskTemplateManager) allTemplatesNoop() bool { } // templateRunner returns a consul-template runner for the given templates and a -// lookup by destination to the template. If no templates are given, a nil -// template runner and lookup is returned. -func templateRunner(tmpls []*structs.Template, config *config.Config, - vaultToken, taskDir string, taskEnv *env.TaskEnv) ( +// lookup by destination to the template. If no templates are in the config, a +// nil template runner and lookup is returned. +func templateRunner(config *TaskTemplateManagerConfig) ( *manager.Runner, map[string][]*structs.Template, error) { - if len(tmpls) == 0 { + if len(config.Templates) == 0 { return nil, nil, nil } // Parse the templates - allowAbs := config.ReadBoolDefault(hostSrcOption, true) - ctmplMapping, err := parseTemplateConfigs(tmpls, taskDir, taskEnv, allowAbs) + ctmplMapping, err := parseTemplateConfigs(config) if err != nil { return nil, nil, err } // Create the runner configuration. - runnerConfig, err := newRunnerConfig(config, vaultToken, ctmplMapping) + runnerConfig, err := newRunnerConfig(config, ctmplMapping) if err != nil { return nil, nil, err } @@ -364,7 +503,7 @@ func templateRunner(tmpls []*structs.Template, config *config.Config, } // Set Nomad's environment variables - runner.Env = taskEnv.All() + runner.Env = config.EnvBuilder.Build().All() // Build the lookup idMap := runner.TemplateConfigMapping() @@ -380,12 +519,14 @@ func templateRunner(tmpls []*structs.Template, config *config.Config, return runner, lookup, nil } -// parseTemplateConfigs converts the tasks templates into consul-templates -func parseTemplateConfigs(tmpls []*structs.Template, taskDir string, - taskEnv *env.TaskEnv, allowAbs bool) (map[ctconf.TemplateConfig]*structs.Template, error) { +// parseTemplateConfigs converts the tasks templates in the config into +// consul-templates +func parseTemplateConfigs(config *TaskTemplateManagerConfig) (map[ctconf.TemplateConfig]*structs.Template, error) { + allowAbs := config.ClientConfig.ReadBoolDefault(hostSrcOption, true) + taskEnv := config.EnvBuilder.Build() - ctmpls := make(map[ctconf.TemplateConfig]*structs.Template, len(tmpls)) - for _, tmpl := range tmpls { + ctmpls := make(map[ctconf.TemplateConfig]*structs.Template, len(config.Templates)) + for _, tmpl := range config.Templates { var src, dest string if tmpl.SourcePath != "" { if filepath.IsAbs(tmpl.SourcePath) { @@ -395,11 +536,11 @@ func parseTemplateConfigs(tmpls []*structs.Template, taskDir string, src = tmpl.SourcePath } else { - src = filepath.Join(taskDir, taskEnv.ReplaceEnv(tmpl.SourcePath)) + src = filepath.Join(config.TaskDir, taskEnv.ReplaceEnv(tmpl.SourcePath)) } } if tmpl.DestPath != "" { - dest = filepath.Join(taskDir, taskEnv.ReplaceEnv(tmpl.DestPath)) + dest = filepath.Join(config.TaskDir, taskEnv.ReplaceEnv(tmpl.DestPath)) } ct := ctconf.DefaultTemplateConfig() @@ -427,12 +568,11 @@ func parseTemplateConfigs(tmpls []*structs.Template, taskDir string, } // newRunnerConfig returns a consul-template runner configuration, setting the -// Vault and Consul configurations based on the clients configs. The parameters -// are the client config, Vault token if set and the mapping of consul-templates -// to Nomad templates. -func newRunnerConfig(config *config.Config, vaultToken string, +// Vault and Consul configurations based on the clients configs. +func newRunnerConfig(config *TaskTemplateManagerConfig, templateMapping map[ctconf.TemplateConfig]*structs.Template) (*ctconf.Config, error) { + cc := config.ClientConfig conf := ctconf.DefaultConfig() // Gather the consul-template tempates @@ -455,29 +595,29 @@ func newRunnerConfig(config *config.Config, vaultToken string, } // Force faster retries - if testRetryRate != 0 { - rate := testRetryRate + if config.retryRate != 0 { + rate := config.retryRate conf.Consul.Retry.Backoff = &rate } // Setup the Consul config - if config.ConsulConfig != nil { - conf.Consul.Address = &config.ConsulConfig.Addr - conf.Consul.Token = &config.ConsulConfig.Token + if cc.ConsulConfig != nil { + conf.Consul.Address = &cc.ConsulConfig.Addr + conf.Consul.Token = &cc.ConsulConfig.Token - if config.ConsulConfig.EnableSSL != nil && *config.ConsulConfig.EnableSSL { - verify := config.ConsulConfig.VerifySSL != nil && *config.ConsulConfig.VerifySSL + if cc.ConsulConfig.EnableSSL != nil && *cc.ConsulConfig.EnableSSL { + verify := cc.ConsulConfig.VerifySSL != nil && *cc.ConsulConfig.VerifySSL conf.Consul.SSL = &ctconf.SSLConfig{ Enabled: helper.BoolToPtr(true), Verify: &verify, - Cert: &config.ConsulConfig.CertFile, - Key: &config.ConsulConfig.KeyFile, - CaCert: &config.ConsulConfig.CAFile, + Cert: &cc.ConsulConfig.CertFile, + Key: &cc.ConsulConfig.KeyFile, + CaCert: &cc.ConsulConfig.CAFile, } } - if config.ConsulConfig.Auth != "" { - parts := strings.SplitN(config.ConsulConfig.Auth, ":", 2) + if cc.ConsulConfig.Auth != "" { + parts := strings.SplitN(cc.ConsulConfig.Auth, ":", 2) if len(parts) != 2 { return nil, fmt.Errorf("Failed to parse Consul Auth config") } @@ -495,22 +635,22 @@ func newRunnerConfig(config *config.Config, vaultToken string, emptyStr := "" conf.Vault.RenewToken = helper.BoolToPtr(false) conf.Vault.Token = &emptyStr - if config.VaultConfig != nil && config.VaultConfig.IsEnabled() { - conf.Vault.Address = &config.VaultConfig.Addr - conf.Vault.Token = &vaultToken + if cc.VaultConfig != nil && cc.VaultConfig.IsEnabled() { + conf.Vault.Address = &cc.VaultConfig.Addr + conf.Vault.Token = &config.VaultToken conf.Vault.Grace = helper.TimeToPtr(vaultGrace) - if strings.HasPrefix(config.VaultConfig.Addr, "https") || config.VaultConfig.TLSCertFile != "" { - skipVerify := config.VaultConfig.TLSSkipVerify != nil && *config.VaultConfig.TLSSkipVerify + if strings.HasPrefix(cc.VaultConfig.Addr, "https") || cc.VaultConfig.TLSCertFile != "" { + skipVerify := cc.VaultConfig.TLSSkipVerify != nil && *cc.VaultConfig.TLSSkipVerify verify := !skipVerify conf.Vault.SSL = &ctconf.SSLConfig{ Enabled: helper.BoolToPtr(true), Verify: &verify, - Cert: &config.VaultConfig.TLSCertFile, - Key: &config.VaultConfig.TLSKeyFile, - CaCert: &config.VaultConfig.TLSCaFile, - CaPath: &config.VaultConfig.TLSCaPath, - ServerName: &config.VaultConfig.TLSServerName, + Cert: &cc.VaultConfig.TLSCertFile, + Key: &cc.VaultConfig.TLSKeyFile, + CaCert: &cc.VaultConfig.TLSCaFile, + CaPath: &cc.VaultConfig.TLSCaPath, + ServerName: &cc.VaultConfig.TLSServerName, } } else { conf.Vault.SSL = &ctconf.SSLConfig{ diff --git a/client/consul_template_test.go b/client/consul_template_test.go index d893858020ba..88ee17b0e50d 100644 --- a/client/consul_template_test.go +++ b/client/consul_template_test.go @@ -112,6 +112,7 @@ type testHarness struct { taskDir string vault *testutil.TestVault consul *ctestutil.TestServer + emitRate time.Duration } // newTestHarness returns a harness starting a dev consul and vault server, @@ -123,6 +124,7 @@ func newTestHarness(t *testing.T, templates []*structs.Template, consul, vault b templates: templates, node: mock.Node(), config: &config.Config{Region: region}, + emitRate: DefaultMaxTemplateEventRate, } // Build the task environment @@ -158,22 +160,31 @@ func newTestHarness(t *testing.T, templates []*structs.Template, consul, vault b } func (h *testHarness) start(t *testing.T) { - manager, err := NewTaskTemplateManager(h.mockHooks, h.templates, - h.config, h.vaultToken, h.taskDir, h.envBuilder) - if err != nil { + if err := h.startWithErr(); err != nil { t.Fatalf("failed to build task template manager: %v", err) } - - h.manager = manager } func (h *testHarness) startWithErr() error { - manager, err := NewTaskTemplateManager(h.mockHooks, h.templates, - h.config, h.vaultToken, h.taskDir, h.envBuilder) - h.manager = manager + var err error + h.manager, err = NewTaskTemplateManager(&TaskTemplateManagerConfig{ + Hooks: h.mockHooks, + Templates: h.templates, + ClientConfig: h.config, + VaultToken: h.vaultToken, + TaskDir: h.taskDir, + EnvBuilder: h.envBuilder, + MaxTemplateEventRate: h.emitRate, + retryRate: 10 * time.Millisecond, + }) + return err } +func (h *testHarness) setEmitRate(d time.Duration) { + h.emitRate = d +} + // stop is used to stop any running Vault or Consul server plus the task manager func (h *testHarness) stop() { if h.vault != nil { @@ -190,61 +201,118 @@ func (h *testHarness) stop() { } } -func TestTaskTemplateManager_Invalid(t *testing.T) { +func TestTaskTemplateManager_InvalidConfig(t *testing.T) { t.Parallel() hooks := NewMockTaskHooks() - var tmpls []*structs.Template - region := "global" - config := &config.Config{Region: region} + clientConfig := &config.Config{Region: "global"} taskDir := "foo" - vaultToken := "" a := mock.Alloc() - envBuilder := env.NewBuilder(mock.Node(), a, a.Job.TaskGroups[0].Tasks[0], config.Region) + envBuilder := env.NewBuilder(mock.Node(), a, a.Job.TaskGroups[0].Tasks[0], clientConfig.Region) - _, err := NewTaskTemplateManager(nil, nil, nil, "", "", nil) - if err == nil { - t.Fatalf("Expected error") - } - - _, err = NewTaskTemplateManager(nil, tmpls, config, vaultToken, taskDir, envBuilder) - if err == nil || !strings.Contains(err.Error(), "task hook") { - t.Fatalf("Expected invalid task hook error: %v", err) - } - - _, err = NewTaskTemplateManager(hooks, tmpls, nil, vaultToken, taskDir, envBuilder) - if err == nil || !strings.Contains(err.Error(), "config") { - t.Fatalf("Expected invalid config error: %v", err) - } - - _, err = NewTaskTemplateManager(hooks, tmpls, config, vaultToken, "", envBuilder) - if err == nil || !strings.Contains(err.Error(), "task directory") { - t.Fatalf("Expected invalid task dir error: %v", err) - } - - _, err = NewTaskTemplateManager(hooks, tmpls, config, vaultToken, taskDir, nil) - if err == nil || !strings.Contains(err.Error(), "task environment") { - t.Fatalf("Expected invalid task environment error: %v", err) - } - - tm, err := NewTaskTemplateManager(hooks, tmpls, config, vaultToken, taskDir, envBuilder) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } else if tm == nil { - t.Fatalf("Bad %v", tm) - } - - // Build a template with a bad signal - tmpl := &structs.Template{ - DestPath: "foo", - EmbeddedTmpl: "hello, world", - ChangeMode: structs.TemplateChangeModeSignal, - ChangeSignal: "foobarbaz", + cases := []struct { + name string + config *TaskTemplateManagerConfig + expectedErr string + }{ + { + name: "nil config", + config: nil, + expectedErr: "Nil config passed", + }, + { + name: "bad hooks", + config: &TaskTemplateManagerConfig{ + ClientConfig: clientConfig, + TaskDir: taskDir, + EnvBuilder: envBuilder, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }, + expectedErr: "task hooks", + }, + { + name: "bad client config", + config: &TaskTemplateManagerConfig{ + Hooks: hooks, + TaskDir: taskDir, + EnvBuilder: envBuilder, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }, + expectedErr: "client config", + }, + { + name: "bad task dir", + config: &TaskTemplateManagerConfig{ + ClientConfig: clientConfig, + Hooks: hooks, + EnvBuilder: envBuilder, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }, + expectedErr: "task directory", + }, + { + name: "bad env builder", + config: &TaskTemplateManagerConfig{ + ClientConfig: clientConfig, + Hooks: hooks, + TaskDir: taskDir, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }, + expectedErr: "task environment", + }, + { + name: "bad max event rate", + config: &TaskTemplateManagerConfig{ + ClientConfig: clientConfig, + Hooks: hooks, + TaskDir: taskDir, + EnvBuilder: envBuilder, + }, + expectedErr: "template event rate", + }, + { + name: "valid", + config: &TaskTemplateManagerConfig{ + ClientConfig: clientConfig, + Hooks: hooks, + TaskDir: taskDir, + EnvBuilder: envBuilder, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }, + }, + { + name: "invalid signal", + config: &TaskTemplateManagerConfig{ + Templates: []*structs.Template{ + { + DestPath: "foo", + EmbeddedTmpl: "hello, world", + ChangeMode: structs.TemplateChangeModeSignal, + ChangeSignal: "foobarbaz", + }, + }, + ClientConfig: clientConfig, + Hooks: hooks, + TaskDir: taskDir, + EnvBuilder: envBuilder, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }, + expectedErr: "parse signal", + }, } - tmpls = append(tmpls, tmpl) - tm, err = NewTaskTemplateManager(hooks, tmpls, config, vaultToken, taskDir, envBuilder) - if err == nil || !strings.Contains(err.Error(), "Failed to parse signal") { - t.Fatalf("Expected signal parsing error: %v", err) + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + _, err := NewTaskTemplateManager(c.config) + if err != nil { + if c.expectedErr == "" { + t.Fatalf("unexpected error: %v", err) + } else if !strings.Contains(err.Error(), c.expectedErr) { + t.Fatalf("expected error to contain %q; got %q", c.expectedErr, err.Error()) + } + } else if c.expectedErr != "" { + t.Fatalf("expected an error to contain %q", c.expectedErr) + } + }) } } @@ -460,9 +528,6 @@ func TestTaskTemplateManager_Unblock_Consul(t *testing.T) { ChangeMode: structs.TemplateChangeModeNoop, } - // Drop the retry rate - testRetryRate = 10 * time.Millisecond - harness := newTestHarness(t, []*structs.Template{template}, true, false) harness.start(t) defer harness.stop() @@ -510,9 +575,6 @@ func TestTaskTemplateManager_Unblock_Vault(t *testing.T) { ChangeMode: structs.TemplateChangeModeNoop, } - // Drop the retry rate - testRetryRate = 10 * time.Millisecond - harness := newTestHarness(t, []*structs.Template{template}, false, true) harness.start(t) defer harness.stop() @@ -569,9 +631,6 @@ func TestTaskTemplateManager_Unblock_Multi_Template(t *testing.T) { ChangeMode: structs.TemplateChangeModeNoop, } - // Drop the retry rate - testRetryRate = 10 * time.Millisecond - harness := newTestHarness(t, []*structs.Template{template, template2}, true, false) harness.start(t) defer harness.stop() @@ -630,9 +689,6 @@ func TestTaskTemplateManager_Rerender_Noop(t *testing.T) { ChangeMode: structs.TemplateChangeModeNoop, } - // Drop the retry rate - testRetryRate = 10 * time.Millisecond - harness := newTestHarness(t, []*structs.Template{template}, true, false) harness.start(t) defer harness.stop() @@ -716,9 +772,6 @@ func TestTaskTemplateManager_Rerender_Signal(t *testing.T) { ChangeSignal: "SIGBUS", } - // Drop the retry rate - testRetryRate = 10 * time.Millisecond - harness := newTestHarness(t, []*structs.Template{template, template2}, true, false) harness.start(t) defer harness.stop() @@ -802,9 +855,6 @@ func TestTaskTemplateManager_Rerender_Restart(t *testing.T) { ChangeMode: structs.TemplateChangeModeRestart, } - // Drop the retry rate - testRetryRate = 10 * time.Millisecond - harness := newTestHarness(t, []*structs.Template{template}, true, false) harness.start(t) defer harness.stop() @@ -905,9 +955,6 @@ func TestTaskTemplateManager_Signal_Error(t *testing.T) { ChangeSignal: "SIGALRM", } - // Drop the retry rate - testRetryRate = 10 * time.Millisecond - harness := newTestHarness(t, []*structs.Template{template}, true, false) harness.start(t) defer harness.stop() @@ -1075,7 +1122,11 @@ func TestTaskTemplateManager_Config_ServerName(t *testing.T) { Addr: "https://localhost/", TLSServerName: "notlocalhost", } - ctconf, err := newRunnerConfig(c, "token", nil) + config := &TaskTemplateManagerConfig{ + ClientConfig: c, + VaultToken: "token", + } + ctconf, err := newRunnerConfig(config, nil) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -1091,34 +1142,97 @@ func TestTaskTemplateManager_Config_VaultGrace(t *testing.T) { t.Parallel() assert := assert.New(t) c := config.DefaultConfig() + c.Node = mock.Node() c.VaultConfig = &sconfig.VaultConfig{ Enabled: helper.BoolToPtr(true), Addr: "https://localhost/", TLSServerName: "notlocalhost", } - // Make a template that will render immediately - templates := []*structs.Template{ - { - EmbeddedTmpl: "bar", - DestPath: "foo", - ChangeMode: structs.TemplateChangeModeNoop, - VaultGrace: 10 * time.Second, - }, - { - EmbeddedTmpl: "baz", - DestPath: "bam", - ChangeMode: structs.TemplateChangeModeNoop, - VaultGrace: 100 * time.Second, + alloc := mock.Alloc() + config := &TaskTemplateManagerConfig{ + ClientConfig: c, + VaultToken: "token", + + // Make a template that will render immediately + Templates: []*structs.Template{ + { + EmbeddedTmpl: "bar", + DestPath: "foo", + ChangeMode: structs.TemplateChangeModeNoop, + VaultGrace: 10 * time.Second, + }, + { + EmbeddedTmpl: "baz", + DestPath: "bam", + ChangeMode: structs.TemplateChangeModeNoop, + VaultGrace: 100 * time.Second, + }, }, + EnvBuilder: env.NewBuilder(c.Node, alloc, alloc.Job.TaskGroups[0].Tasks[0], c.Region), } - taskEnv := env.NewTaskEnv(nil, nil) - ctmplMapping, err := parseTemplateConfigs(templates, "/fake/dir", taskEnv, false) + ctmplMapping, err := parseTemplateConfigs(config) assert.Nil(err, "Parsing Templates") - ctconf, err := newRunnerConfig(c, "token", ctmplMapping) + ctconf, err := newRunnerConfig(config, ctmplMapping) assert.Nil(err, "Building Runner Config") assert.NotNil(ctconf.Vault.Grace, "Vault Grace Pointer") assert.Equal(10*time.Second, *ctconf.Vault.Grace, "Vault Grace Value") } + +func TestTaskTemplateManager_BlockedEvents(t *testing.T) { + t.Parallel() + // Make a template that will render based on a key in Consul + var embedded string + for i := 0; i < 5; i++ { + embedded += fmt.Sprintf(`{{key "%d"}}`, i) + } + + file := "my.tmpl" + template := &structs.Template{ + EmbeddedTmpl: embedded, + DestPath: file, + ChangeMode: structs.TemplateChangeModeNoop, + } + + harness := newTestHarness(t, []*structs.Template{template}, true, false) + harness.setEmitRate(100 * time.Millisecond) + harness.start(t) + defer harness.stop() + + // Ensure that we get a blocked event + select { + case <-harness.mockHooks.UnblockCh: + t.Fatalf("Task unblock should have not have been called") + case <-harness.mockHooks.EmitEventCh: + case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second): + t.Fatalf("timeout") + } + + // Check to see we got a correct message + event := harness.mockHooks.Events[0] + if !strings.Contains(event, "and 2 more") { + t.Fatalf("bad event: %q", event) + } + + // Write 3 keys to Consul + for i := 0; i < 3; i++ { + harness.consul.SetKV(t, fmt.Sprintf("%d", i), []byte{0xa}) + } + + // Ensure that we get a blocked event + select { + case <-harness.mockHooks.UnblockCh: + t.Fatalf("Task unblock should have not have been called") + case <-harness.mockHooks.EmitEventCh: + case <-time.After(time.Duration(1*testutil.TestMultiplier()) * time.Second): + t.Fatalf("timeout") + } + + // Check to see we got a correct message + event = harness.mockHooks.Events[len(harness.mockHooks.Events)-1] + if !strings.Contains(event, "Missing") || strings.Contains(event, "more") { + t.Fatalf("bad event: %q", event) + } +} diff --git a/client/task_runner.go b/client/task_runner.go index 97367e543159..7f0398b21e2b 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -188,7 +188,8 @@ func (s *taskRunnerState) Hash() []byte { return h.Sum(nil) } -// TaskStateUpdater is used to signal that tasks state has changed. +// TaskStateUpdater is used to signal that tasks state has changed. If lazySync +// is set the event won't be immediately pushed to the server. type TaskStateUpdater func(taskName, state string, event *structs.TaskEvent, lazySync bool) // SignalEvent is a tuple of the signal and the event generating it @@ -857,8 +858,16 @@ func (r *TaskRunner) updatedTokenHandler() { // Create a new templateManager var err error - r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates, - r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.envBuilder) + r.templateManager, err = NewTaskTemplateManager(&TaskTemplateManagerConfig{ + Hooks: r, + Templates: r.task.Templates, + ClientConfig: r.config, + VaultToken: r.vaultFuture.Get(), + TaskDir: r.taskDir.Dir, + EnvBuilder: r.envBuilder, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }) + if err != nil { err := fmt.Errorf("failed to build task's template manager: %v", err) r.setState(structs.TaskStateDead, @@ -965,8 +974,15 @@ func (r *TaskRunner) prestart(alloc *structs.Allocation, task *structs.Task, res // Build the template manager if r.templateManager == nil { var err error - r.templateManager, err = NewTaskTemplateManager(r, task.Templates, - r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.envBuilder) + r.templateManager, err = NewTaskTemplateManager(&TaskTemplateManagerConfig{ + Hooks: r, + Templates: r.task.Templates, + ClientConfig: r.config, + VaultToken: r.vaultFuture.Get(), + TaskDir: r.taskDir.Dir, + EnvBuilder: r.envBuilder, + MaxTemplateEventRate: DefaultMaxTemplateEventRate, + }) if err != nil { err := fmt.Errorf("failed to build task's template manager: %v", err) r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask(), false)