diff --git a/actions/action.go b/actions/action.go index 9dc79e7b2..fc14562dd 100644 --- a/actions/action.go +++ b/actions/action.go @@ -1,6 +1,7 @@ package actions import ( + "context" "log" "github.com/karimra/gnmic/types" @@ -17,7 +18,7 @@ type Action interface { // - `Vars` : a map[string]interface{} containing variables passed to the action // - `Targets`: a map[string]*types.TargetConfig containing (if the action is ran by a loader) // the currently known targets configurations - Run(aCtx *Context) (interface{}, error) + Run(ctx context.Context, aCtx *Context) (interface{}, error) // NName returns the configured action name NName() string // WithTargets passes the known configured targets to the action when initialized diff --git a/actions/gnmi_action/gnmi_action.go b/actions/gnmi_action/gnmi_action.go index 9b78688b3..a497160ad 100644 --- a/actions/gnmi_action/gnmi_action.go +++ b/actions/gnmi_action/gnmi_action.go @@ -104,7 +104,7 @@ func (g *gnmiAction) Init(cfg map[string]interface{}, opts ...actions.Option) er return nil } -func (g *gnmiAction) Run(aCtx *actions.Context) (interface{}, error) { +func (g *gnmiAction) Run(ctx context.Context, aCtx *actions.Context) (interface{}, error) { g.m.Lock() for n, tc := range aCtx.Targets { g.targetsConfigs[n] = tc @@ -126,7 +126,7 @@ func (g *gnmiAction) Run(aCtx *actions.Context) (interface{}, error) { if err != nil { return nil, err } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer cancel() result := make(map[string]interface{}) resCh := make(chan *gnmiResponse) diff --git a/actions/http_action/http_action.go b/actions/http_action/http_action.go index 2151a72e0..d277f58e3 100644 --- a/actions/http_action/http_action.go +++ b/actions/http_action/http_action.go @@ -2,6 +2,7 @@ package http_action import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -75,7 +76,7 @@ func (h *httpAction) Init(cfg map[string]interface{}, opts ...actions.Option) er return err } -func (h *httpAction) Run(aCtx *actions.Context) (interface{}, error) { +func (h *httpAction) Run(ctx context.Context, aCtx *actions.Context) (interface{}, error) { if h.url == nil { return nil, errors.New("missing url template") } @@ -117,7 +118,9 @@ func (h *httpAction) Run(aCtx *actions.Context) (interface{}, error) { client := &http.Client{ Timeout: h.Timeout, } - resp, err := client.Do(req) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + resp, err := client.Do(req.WithContext(ctx)) if err != nil { return nil, err } diff --git a/actions/http_action/http_action_test.go b/actions/http_action/http_action_test.go index fb38a6461..3b1003725 100644 --- a/actions/http_action/http_action_test.go +++ b/actions/http_action/http_action_test.go @@ -486,7 +486,7 @@ func TestHTTPAction(t *testing.T) { for i, item := range ts.tests { t.Run(name, func(t *testing.T) { t.Logf("running test item %d", i) - res, err := a.Run(&actions.Context{Input: item.input}) + res, err := a.Run(context.TODO(), &actions.Context{Input: item.input}) if err != nil { t.Errorf("failed at %s item %d, %v", name, i, err) t.Fail() diff --git a/actions/script_action/script_action.go b/actions/script_action/script_action.go index c4b974699..ea3a8b4f8 100644 --- a/actions/script_action/script_action.go +++ b/actions/script_action/script_action.go @@ -2,6 +2,7 @@ package script_action import ( "bytes" + "context" "fmt" "io" "log" @@ -56,7 +57,7 @@ func (s *scriptAction) Init(cfg map[string]interface{}, opts ...actions.Option) return nil } -func (s *scriptAction) Run(aCtx *actions.Context) (interface{}, error) { +func (s *scriptAction) Run(_ context.Context, aCtx *actions.Context) (interface{}, error) { if s.Command == "" && s.File == "" { return nil, nil } diff --git a/actions/template_action/template_action.go b/actions/template_action/template_action.go index 9e2bf5e60..cf75cbfdf 100644 --- a/actions/template_action/template_action.go +++ b/actions/template_action/template_action.go @@ -72,7 +72,7 @@ func (t *templateAction) Init(cfg map[string]interface{}, opts ...actions.Option return nil } -func (t *templateAction) Run(aCtx *actions.Context) (interface{}, error) { +func (t *templateAction) Run(_ context.Context, aCtx *actions.Context) (interface{}, error) { b := new(bytes.Buffer) err := t.tpl.Execute(b, &actions.Context{ Input: aCtx.Input, diff --git a/app/app.go b/app/app.go index c43fd03f7..9bf358706 100644 --- a/app/app.go +++ b/app/app.go @@ -400,7 +400,7 @@ func (a *App) loadTargets(e fsnotify.Event) { // delete targets for t := range dist { if _, ok := newTargets[t]; !ok { - err = a.deleteTarget(t) + err = a.deleteTarget(ctx, t) if err != nil { a.Logger.Printf("failed to delete target %q: %v", t, err) continue diff --git a/app/clustering.go b/app/clustering.go index dbdf825b1..87c717848 100644 --- a/app/clustering.go +++ b/app/clustering.go @@ -255,8 +255,9 @@ func (a *App) dispatchTargets(ctx context.Context) { } var err error //a.m.RLock() + dctx, cancel := context.WithTimeout(ctx, a.Config.Clustering.TargetsWatchTimer) for _, tc := range a.Config.Targets { - err = a.dispatchTarget(ctx, tc) + err = a.dispatchTarget(dctx, tc) if err != nil { a.Logger.Printf("failed to dispatch target %q: %v", tc.Name, err) } @@ -273,7 +274,7 @@ func (a *App) dispatchTargets(ctx context.Context) { } } //a.m.RUnlock() - + cancel() select { case <-ctx.Done(): return @@ -339,7 +340,7 @@ WAIT: retries++ if (retries+1)*int(lockWaitTime) >= int(a.Config.Clustering.TargetAssignmentTimeout) { a.Logger.Printf("[cluster-leader] max retries reached for target %q and service %q, reselecting...", tc.Name, service.ID) - err = a.unassignTarget(tc.Name, service.ID) + err = a.unassignTarget(ctx, tc.Name, service.ID) if err != nil { a.Logger.Printf("failed to unassign target %q from %q", tc.Name, service.ID) } @@ -357,7 +358,7 @@ WAIT: retries++ if (retries+1)*int(lockWaitTime) >= int(a.Config.Clustering.TargetAssignmentTimeout) { a.Logger.Printf("[cluster-leader] max retries reached for target %q and service %q, reselecting...", tc.Name, service.ID) - err = a.unassignTarget(tc.Name, service.ID) + err = a.unassignTarget(ctx, tc.Name, service.ID) if err != nil { a.Logger.Printf("failed to unassign target %q from %q", tc.Name, service.ID) } @@ -529,7 +530,7 @@ func (a *App) getHighestTagsMatches(tagsCount map[string]int) []string { return ss } -func (a *App) deleteTarget(name string) error { +func (a *App) deleteTarget(ctx context.Context, name string) error { errs := make([]error, 0, len(a.apiServices)) for _, s := range a.apiServices { scheme := "http" @@ -549,7 +550,7 @@ func (a *App) deleteTarget(name string) error { }, } } - ctx, cancel := context.WithCancel(a.ctx) + ctx, cancel := context.WithCancel(ctx) defer cancel() url := fmt.Sprintf("%s://%s/api/v1/config/targets/%s", scheme, s.Address, name) req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil) @@ -630,7 +631,7 @@ func (a *App) assignTarget(ctx context.Context, tc *types.TargetConfig, service return nil } -func (a *App) unassignTarget(name string, serviceID string) error { +func (a *App) unassignTarget(ctx context.Context, name string, serviceID string) error { for _, s := range a.apiServices { if s.ID != serviceID { continue @@ -653,15 +654,17 @@ func (a *App) unassignTarget(name string, serviceID string) error { } } url := fmt.Sprintf("%s://%s/api/v1/targets/%s", scheme, s.Address, name) - ctx, cancel := context.WithTimeout(a.ctx, 500*time.Millisecond) + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil) if err != nil { + a.Logger.Printf("failed to create HTTP request: %v", err) continue } rsp, err := client.Do(req) if err != nil { rsp.Body.Close() + a.Logger.Printf("failed HTTP request: %v", err) continue } rsp.Body.Close() diff --git a/app/gnmi_client_subscribe.go b/app/gnmi_client_subscribe.go index aaec341df..65d02397b 100644 --- a/app/gnmi_client_subscribe.go +++ b/app/gnmi_client_subscribe.go @@ -78,6 +78,9 @@ START: select { case <-nctx.Done(): a.Logger.Printf("target %q stopped: %v", tc.Name, nctx.Err()) + // drain errChan + err := <-errChan + a.Logger.Printf("target %q keepLock returned: %v", tc.Name, err) return case <-doneChan: a.Logger.Printf("target lock %q removed", tc.Name) diff --git a/app/loaders.go b/app/loaders.go index 654c981dc..96006a968 100644 --- a/app/loaders.go +++ b/app/loaders.go @@ -47,7 +47,7 @@ START: continue } // clustered, delete target in all instances of the cluster - err = a.deleteTarget(del) + err = a.deleteTarget(ctx, del) if err != nil { a.Logger.Printf("failed to delete target %q: %v", del, err) } diff --git a/app/version.go b/app/version.go index c2515e2d3..be5a8d318 100644 --- a/app/version.go +++ b/app/version.go @@ -1,6 +1,7 @@ package app import ( + "context" "encoding/json" "fmt" "io" @@ -78,9 +79,15 @@ func (a *App) VersionUpgradeRun(cmd *cobra.Command, args []string) error { // downloadFile will download a file from a URL and write its content to a file func downloadFile(url string, file *os.File) error { - client := http.Client{Timeout: 10 * time.Second} + client := http.Client{Timeout: 30 * time.Second} // Get the data - resp, err := client.Get(url) + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return err + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + resp, err := client.Do(req.WithContext(ctx)) if err != nil { return err } diff --git a/formatters/event_trigger/event_trigger.go b/formatters/event_trigger/event_trigger.go index a538b880f..401f680f5 100644 --- a/formatters/event_trigger/event_trigger.go +++ b/formatters/event_trigger/event_trigger.go @@ -222,7 +222,7 @@ func (p *Trigger) readVars() error { func (p *Trigger) triggerActions(e *formatters.EventMsg) { actx := &actions.Context{Input: e, Env: make(map[string]interface{}), Vars: p.vars} for _, act := range p.actions { - res, err := act.Run(actx) + res, err := act.Run(context.TODO(), actx) if err != nil { p.logger.Printf("trigger action %q failed: %+v", act.NName(), err) return diff --git a/loaders/consul_loader/consul_loader.go b/loaders/consul_loader/consul_loader.go index 8037abc71..3f6529236 100644 --- a/loaders/consul_loader/consul_loader.go +++ b/loaders/consul_loader/consul_loader.go @@ -28,7 +28,8 @@ const ( defaultAddress = "localhost:8500" defaultPrefix = "gnmic/config/targets" // - defaultWatchTimeout = 1 * time.Minute + defaultWatchTimeout = 1 * time.Minute + defaultActionTimeout = 30 * time.Second ) func init() { @@ -77,16 +78,19 @@ type cfg struct { Services []*serviceDef `mapstructure:"services,omitempty" json:"services,omitempty"` // if true, registers consulLoader prometheus metrics with the provided // prometheus registry - EnableMetrics bool `json:"enable-metrics,omitempty" mapstructure:"enable-metrics,omitempty"` + EnableMetrics bool `mapstructure:"enable-metrics,omitempty" json:"enable-metrics,omitempty"` // variables definitions to be passed to the actions Vars map[string]interface{} // variable file, values in this file will be overwritten by // the ones defined in Vars - VarsFile string `mapstructure:"vars-file,omitempty"` + VarsFile string `mapstructure:"vars-file,omitempty" json:"vars-file,omitempty"` // list of Actions to run on new target discovery - OnAdd []string `json:"on-add,omitempty" mapstructure:"on-add,omitempty"` + OnAdd []string `mapstructure:"on-add,omitempty" json:"on-add,omitempty"` // list of Actions to run on target removal - OnDelete []string `json:"on-delete,omitempty" mapstructure:"on-delete,omitempty"` + OnDelete []string `mapstructure:"on-delete,omitempty" json:"on-delete,omitempty"` + // timeout for the actions, this applies for all actions as a whole (on-add + on-delete), + // not to each action individually. + ActionsTimeout time.Duration `mapstructure:"actions-timeout,omitempty" json:"actions-timeout,omitempty"` } type serviceDef struct { @@ -274,6 +278,9 @@ func (c *consulLoader) setDefaults() error { if c.cfg.KeyPrefix == "" && len(c.cfg.Services) == 0 { c.cfg.KeyPrefix = defaultPrefix } + if c.cfg.ActionsTimeout <= 0 { + c.cfg.ActionsTimeout = defaultActionTimeout + } return nil } @@ -460,6 +467,8 @@ func (c *consulLoader) runActions(ctx context.Context, tcs map[string]*types.Tar Add: make([]*types.TargetConfig, 0, len(targetOp.Add)), Del: make([]string, 0, len(targetOp.Del)), } + ctx, cancel := context.WithTimeout(ctx, c.cfg.ActionsTimeout) + defer cancel() // start operation gathering goroutine go func() { for { @@ -483,7 +492,7 @@ func (c *consulLoader) runActions(ctx context.Context, tcs map[string]*types.Tar for _, tAdd := range targetOp.Add { go func(tc *types.TargetConfig) { defer wg.Done() - err := c.runOnAddActions(tc.Name, tcs) + err := c.runOnAddActions(ctx, tc.Name, tcs) if err != nil { c.logger.Printf("failed running OnAdd actions: %v", err) return @@ -495,7 +504,7 @@ func (c *consulLoader) runActions(ctx context.Context, tcs map[string]*types.Tar for _, tDel := range targetOp.Del { go func(name string) { defer wg.Done() - err := c.runOnDeleteActions(name, tcs) + err := c.runOnDeleteActions(ctx, name, tcs) if err != nil { c.logger.Printf("failed running OnDelete actions: %v", err) return @@ -509,7 +518,7 @@ func (c *consulLoader) runActions(ctx context.Context, tcs map[string]*types.Tar return result, nil } -func (c *consulLoader) runOnAddActions(tName string, tcs map[string]*types.TargetConfig) error { +func (c *consulLoader) runOnAddActions(ctx context.Context, tName string, tcs map[string]*types.TargetConfig) error { aCtx := &actions.Context{ Input: tName, Env: make(map[string]interface{}), @@ -518,7 +527,7 @@ func (c *consulLoader) runOnAddActions(tName string, tcs map[string]*types.Targe } for _, act := range c.addActions { c.logger.Printf("running action %q for target %q", act.NName(), tName) - res, err := act.Run(aCtx) + res, err := act.Run(ctx, aCtx) if err != nil { // delete target from known targets map c.m.Lock() @@ -537,10 +546,10 @@ func (c *consulLoader) runOnAddActions(tName string, tcs map[string]*types.Targe return nil } -func (c *consulLoader) runOnDeleteActions(tName string, tcs map[string]*types.TargetConfig) error { +func (c *consulLoader) runOnDeleteActions(ctx context.Context, tName string, tcs map[string]*types.TargetConfig) error { env := make(map[string]interface{}) for _, act := range c.delActions { - res, err := act.Run(&actions.Context{Input: tName, Env: env, Vars: c.vars}) + res, err := act.Run(ctx, &actions.Context{Input: tName, Env: env, Vars: c.vars}) if err != nil { return fmt.Errorf("action %q for target %q failed: %v", act.NName(), tName, err) } diff --git a/loaders/docker_loader/docker_loader.go b/loaders/docker_loader/docker_loader.go index 09c847192..e18a88cbd 100644 --- a/loaders/docker_loader/docker_loader.go +++ b/loaders/docker_loader/docker_loader.go @@ -569,6 +569,8 @@ func (d *dockerLoader) runActions(ctx context.Context, tcs map[string]*types.Tar Add: make([]*types.TargetConfig, 0, len(targetOp.Add)), Del: make([]string, 0, len(targetOp.Del)), } + ctx, cancel := context.WithTimeout(ctx, d.cfg.Interval) + defer cancel() // start gathering goroutine go func() { for { @@ -593,7 +595,7 @@ func (d *dockerLoader) runActions(ctx context.Context, tcs map[string]*types.Tar for _, tAdd := range targetOp.Add { go func(tc *types.TargetConfig) { defer wg.Done() - err := d.runOnAddActions(tc.Name, tcs) + err := d.runOnAddActions(ctx, tc.Name, tcs) if err != nil { d.logger.Printf("failed running OnAdd actions: %v", err) return @@ -605,7 +607,7 @@ func (d *dockerLoader) runActions(ctx context.Context, tcs map[string]*types.Tar for _, tDel := range targetOp.Del { go func(name string) { defer wg.Done() - err := d.runOnDeleteActions(name, tcs) + err := d.runOnDeleteActions(ctx, name, tcs) if err != nil { d.logger.Printf("failed running OnDelete actions: %v", err) return @@ -619,7 +621,7 @@ func (d *dockerLoader) runActions(ctx context.Context, tcs map[string]*types.Tar return result, nil } -func (d *dockerLoader) runOnAddActions(tName string, tcs map[string]*types.TargetConfig) error { +func (d *dockerLoader) runOnAddActions(ctx context.Context, tName string, tcs map[string]*types.TargetConfig) error { aCtx := &actions.Context{ Input: tName, Env: make(map[string]interface{}), @@ -628,7 +630,7 @@ func (d *dockerLoader) runOnAddActions(tName string, tcs map[string]*types.Targe } for _, act := range d.addActions { d.logger.Printf("running action %q for target %q", act.NName(), tName) - res, err := act.Run(aCtx) + res, err := act.Run(ctx, aCtx) if err != nil { // delete target from known targets map d.m.Lock() @@ -647,10 +649,10 @@ func (d *dockerLoader) runOnAddActions(tName string, tcs map[string]*types.Targe return nil } -func (d *dockerLoader) runOnDeleteActions(tName string, tcs map[string]*types.TargetConfig) error { +func (d *dockerLoader) runOnDeleteActions(ctx context.Context, tName string, tcs map[string]*types.TargetConfig) error { env := make(map[string]interface{}) for _, act := range d.delActions { - res, err := act.Run(&actions.Context{Input: tName, Env: env, Vars: d.vars}) + res, err := act.Run(ctx, &actions.Context{Input: tName, Env: env, Vars: d.vars}) if err != nil { return fmt.Errorf("action %q for target %q failed: %v", act.NName(), tName, err) } diff --git a/loaders/file_loader/file_loader.go b/loaders/file_loader/file_loader.go index a872d541e..ab4d051a8 100644 --- a/loaders/file_loader/file_loader.go +++ b/loaders/file_loader/file_loader.go @@ -341,6 +341,8 @@ func (f *fileLoader) runActions(ctx context.Context, tcs map[string]*types.Targe Add: make([]*types.TargetConfig, 0, len(targetOp.Add)), Del: make([]string, 0, len(targetOp.Del)), } + ctx, cancel := context.WithTimeout(ctx, f.cfg.Interval) + defer cancel() // start gathering goroutine go func() { for { @@ -365,7 +367,7 @@ func (f *fileLoader) runActions(ctx context.Context, tcs map[string]*types.Targe for _, tAdd := range targetOp.Add { go func(tc *types.TargetConfig) { defer wg.Done() - err := f.runOnAddActions(tc.Name, tcs) + err := f.runOnAddActions(ctx, tc.Name, tcs) if err != nil { f.logger.Printf("failed running OnAdd actions: %v", err) return @@ -377,7 +379,7 @@ func (f *fileLoader) runActions(ctx context.Context, tcs map[string]*types.Targe for _, tDel := range targetOp.Del { go func(name string) { defer wg.Done() - err := f.runOnDeleteActions(name, tcs) + err := f.runOnDeleteActions(ctx, name, tcs) if err != nil { f.logger.Printf("failed running OnDelete actions: %v", err) return @@ -391,7 +393,7 @@ func (f *fileLoader) runActions(ctx context.Context, tcs map[string]*types.Targe return result, nil } -func (d *fileLoader) runOnAddActions(tName string, tcs map[string]*types.TargetConfig) error { +func (d *fileLoader) runOnAddActions(ctx context.Context, tName string, tcs map[string]*types.TargetConfig) error { aCtx := &actions.Context{ Input: tName, Env: make(map[string]interface{}), @@ -400,7 +402,7 @@ func (d *fileLoader) runOnAddActions(tName string, tcs map[string]*types.TargetC } for _, act := range d.addActions { d.logger.Printf("running action %q for target %q", act.NName(), tName) - res, err := act.Run(aCtx) + res, err := act.Run(ctx, aCtx) if err != nil { // delete target from known targets map d.m.Lock() @@ -419,10 +421,10 @@ func (d *fileLoader) runOnAddActions(tName string, tcs map[string]*types.TargetC return nil } -func (d *fileLoader) runOnDeleteActions(tName string, tcs map[string]*types.TargetConfig) error { +func (d *fileLoader) runOnDeleteActions(ctx context.Context, tName string, tcs map[string]*types.TargetConfig) error { env := make(map[string]interface{}) for _, act := range d.delActions { - res, err := act.Run(&actions.Context{Input: tName, Env: env, Vars: d.vars}) + res, err := act.Run(ctx, &actions.Context{Input: tName, Env: env, Vars: d.vars}) if err != nil { return fmt.Errorf("action %q for target %q failed: %v", act.NName(), tName, err) } diff --git a/loaders/http_loader/http_loader.go b/loaders/http_loader/http_loader.go index 7b6c97e9c..197dc1da1 100644 --- a/loaders/http_loader/http_loader.go +++ b/loaders/http_loader/http_loader.go @@ -113,7 +113,7 @@ func (h *httpLoader) Init(ctx context.Context, cfg map[string]interface{}, logge return err } } - err = h.readVars() + err = h.readVars(ctx) if err != nil { return err } @@ -310,12 +310,14 @@ func (h *httpLoader) updateTargets(ctx context.Context, tcs map[string]*types.Ta opChan <- targetOp } -func (h *httpLoader) readVars() error { +func (h *httpLoader) readVars(ctx context.Context) error { if h.cfg.VarsFile == "" { h.vars = h.cfg.Vars return nil } - b, err := utils.ReadFile(context.TODO(), h.cfg.VarsFile) + ctx, cancel := context.WithTimeout(ctx, h.cfg.Interval) + defer cancel() + b, err := utils.ReadFile(ctx, h.cfg.VarsFile) if err != nil { return err } @@ -363,6 +365,8 @@ func (f *httpLoader) runActions(ctx context.Context, tcs map[string]*types.Targe Add: make([]*types.TargetConfig, 0, len(targetOp.Add)), Del: make([]string, 0, len(targetOp.Del)), } + ctx, cancel := context.WithTimeout(ctx, f.cfg.Interval) + defer cancel() // start operation gathering goroutine go func() { for { @@ -386,7 +390,7 @@ func (f *httpLoader) runActions(ctx context.Context, tcs map[string]*types.Targe for _, tAdd := range targetOp.Add { go func(tc *types.TargetConfig) { defer wg.Done() - err := f.runOnAddActions(tc.Name, tcs) + err := f.runOnAddActions(ctx, tc.Name, tcs) if err != nil { f.logger.Printf("failed running OnAdd actions: %v", err) return @@ -398,7 +402,7 @@ func (f *httpLoader) runActions(ctx context.Context, tcs map[string]*types.Targe for _, tDel := range targetOp.Del { go func(name string) { defer wg.Done() - err := f.runOnDeleteActions(name, tcs) + err := f.runOnDeleteActions(ctx, name, tcs) if err != nil { f.logger.Printf("failed running OnDelete actions: %v", err) return @@ -412,7 +416,7 @@ func (f *httpLoader) runActions(ctx context.Context, tcs map[string]*types.Targe return result, nil } -func (d *httpLoader) runOnAddActions(tName string, tcs map[string]*types.TargetConfig) error { +func (d *httpLoader) runOnAddActions(ctx context.Context, tName string, tcs map[string]*types.TargetConfig) error { aCtx := &actions.Context{ Input: tName, Env: make(map[string]interface{}), @@ -421,7 +425,7 @@ func (d *httpLoader) runOnAddActions(tName string, tcs map[string]*types.TargetC } for _, act := range d.addActions { d.logger.Printf("running action %q for target %q", act.NName(), tName) - res, err := act.Run(aCtx) + res, err := act.Run(ctx, aCtx) if err != nil { // delete target from known targets map d.m.Lock() @@ -440,10 +444,10 @@ func (d *httpLoader) runOnAddActions(tName string, tcs map[string]*types.TargetC return nil } -func (d *httpLoader) runOnDeleteActions(tName string, tcs map[string]*types.TargetConfig) error { +func (d *httpLoader) runOnDeleteActions(ctx context.Context, tName string, tcs map[string]*types.TargetConfig) error { env := make(map[string]interface{}) for _, act := range d.delActions { - res, err := act.Run(&actions.Context{Input: tName, Env: env, Vars: d.vars}) + res, err := act.Run(ctx, &actions.Context{Input: tName, Env: env, Vars: d.vars}) if err != nil { return fmt.Errorf("action %q for target %q failed: %v", act.NName(), tName, err) } diff --git a/lockers/k8s_locker/k8s_locker.go b/lockers/k8s_locker/k8s_locker.go index e729c5632..e42953fe9 100644 --- a/lockers/k8s_locker/k8s_locker.go +++ b/lockers/k8s_locker/k8s_locker.go @@ -215,6 +215,7 @@ func (k *k8sLocker) KeepLock(ctx context.Context, key string) (chan struct{}, ch for { select { case <-ctx.Done(): + errChan <- ctx.Err() return case <-doneChan: return diff --git a/tests/clab/loaders/loaders.clab.yaml b/tests/clab/loaders/loaders.clab.yaml index e591d6c76..89cc5c2ad 100644 --- a/tests/clab/loaders/loaders.clab.yaml +++ b/tests/clab/loaders/loaders.clab.yaml @@ -27,7 +27,7 @@ topology: binds: - {{ .gnmic_config_file }}:/app/gnmic.yaml:ro - /var/run/docker.sock:/var/run/docker.sock - - clab/loaders/targets/targets.yaml:/app/targets/targets.yaml + - targets/targets.yaml:/app/targets/targets.yaml cmd: '--config /app/gnmic.yaml subscribe' ports: - 7890:7890 @@ -44,7 +44,7 @@ topology: binds: - {{ .gnmic_config_file }}:/app/gnmic.yaml:ro - /var/run/docker.sock:/var/run/docker.sock - - clab/loaders/targets/targets.yaml:/app/targets/targets.yaml + - targets/targets.yaml:/app/targets/targets.yaml cmd: '--config /app/gnmic.yaml subscribe' ports: - 7891:7891 @@ -61,7 +61,7 @@ topology: binds: - {{ .gnmic_config_file }}:/app/gnmic.yaml:ro - /var/run/docker.sock:/var/run/docker.sock - - clab/loaders/targets/targets.yaml:/app/targets/targets.yaml + - targets/targets.yaml:/app/targets/targets.yaml cmd: '--config /app/gnmic.yaml subscribe' ports: - 7892:7892 @@ -76,7 +76,7 @@ topology: agg-gnmic1: binds: - - ./clab/loaders/gnmic-agg.yaml:/app/gnmic.yaml:ro + - gnmic-agg.yaml:/app/gnmic.yaml:ro - /var/run/docker.sock:/var/run/docker.sock cmd: '--config /app/gnmic.yaml subscribe' ports: @@ -90,7 +90,7 @@ topology: agg-gnmic2: binds: - - ./clab/loaders/gnmic-agg.yaml:/app/gnmic.yaml:ro + - gnmic-agg.yaml:/app/gnmic.yaml:ro - /var/run/docker.sock:/var/run/docker.sock cmd: '--config /app/gnmic.yaml subscribe' ports: @@ -104,7 +104,7 @@ topology: agg-gnmic3: binds: - - ./clab/loaders/gnmic-agg.yaml:/app/gnmic.yaml:ro + - gnmic-agg.yaml:/app/gnmic.yaml:ro - /var/run/docker.sock:/var/run/docker.sock cmd: '--config /app/gnmic.yaml subscribe' ports: diff --git a/tests/clab/loaders/loaders.clab_vars.yaml b/tests/clab/loaders/loaders.clab_vars.yaml index 9d28db626..b029a0cc7 100644 --- a/tests/clab/loaders/loaders.clab_vars.yaml +++ b/tests/clab/loaders/loaders.clab_vars.yaml @@ -1 +1 @@ -gnmic_config_file: ./clab/loaders/gnmic-docker-loader.yaml +gnmic_config_file: gnmic-docker-loader.yaml diff --git a/tests/clab/telemetry/grafana/datasources/datasource.yaml b/tests/clab/telemetry/grafana/datasources/datasource.yaml index 0fcf5d13c..000bbe27a 100644 --- a/tests/clab/telemetry/grafana/datasources/datasource.yaml +++ b/tests/clab/telemetry/grafana/datasources/datasource.yaml @@ -8,7 +8,7 @@ datasources: - name: Prometheus type: prometheus orgId: 1 - url: http://clab-metrics-prometheus:9090 + url: http://clab-telemetry-prometheus:9090 password: user: database: diff --git a/tests/clab/telemetry/telemetry.clab.yaml b/tests/clab/telemetry/telemetry.clab.yaml index e0e49aa94..b25e00a59 100644 --- a/tests/clab/telemetry/telemetry.clab.yaml +++ b/tests/clab/telemetry/telemetry.clab.yaml @@ -5,7 +5,7 @@ topology: kind: linux image: gnmic:0.0.0-rc1 - nodes: + nodes: gnmic1: binds: - ./gnmic.yaml:/app/gnmic.yaml:ro @@ -158,6 +158,12 @@ topology: binds: - grafana/datasources/datasource.yaml:/etc/grafana/provisioning/datasources/datasource.yaml:ro - grafana/dashboards.yaml:/etc/grafana/provisioning/dashboards/dashboards.yaml:ro - - grafana/dashboards/:/var/lib/grafana/dashboards + - ../../dashboards/:/var/lib/grafana/dashboards ports: - 3000:3000 + env: + GF_AUTH_DISABLE_LOGIN_FORM: "true" + GF_AUTH_ANONYMOUS_ENABLED: "true" + GF_AUTH_ANONYMOUS_ORG_NAME: Main Org. + GF_AUTH_ANONYMOUS_ORG_ROLE: Admin + GF_USERS_ALLOW_SIGN_UP: "false" diff --git a/tests/cluster_checks.sh b/tests/cluster_checks.sh index 0451be689..dc15c7842 100755 --- a/tests/cluster_checks.sh +++ b/tests/cluster_checks.sh @@ -3,5 +3,3 @@ source ./cluster_funcs.sh print_clusters - - diff --git a/tests/loaders.sh b/tests/loaders.sh index ad200ed5a..df1434fdb 100755 --- a/tests/loaders.sh +++ b/tests/loaders.sh @@ -12,7 +12,7 @@ failure() { export -f failure function cleanup() { - echo "gnmic_config_file: ./clab/loaders/gnmic-docker-loader.yaml" > clab/loaders/loaders.clab_vars.yaml + echo "gnmic_config_file: gnmic-docker-loader.yaml" > clab/loaders/loaders.clab_vars.yaml sudo clab des --cleanup -t clab/loaders/loaders.clab.yaml docker image prune -f } @@ -27,14 +27,14 @@ docker build -t gnmic:0.0.0-rc1 ../ start=`date +%s` # docker loader -echo "gnmic_config_file: ./clab/loaders/gnmic-docker-loader.yaml" > clab/loaders/loaders.clab_vars.yaml +echo "gnmic_config_file: gnmic-docker-loader.yaml" > clab/loaders/loaders.clab_vars.yaml sudo clab dep -t clab/loaders/loaders.clab.yaml --reconfigure sleep 45 sudo clab des -t clab/loaders/loaders.clab.yaml --cleanup # file loader # change gnmic config file -echo "gnmic_config_file: ./clab/loaders/gnmic-file-loader.yaml" > clab/loaders/loaders.clab_vars.yaml +echo "gnmic_config_file: gnmic-file-loader.yaml" > clab/loaders/loaders.clab_vars.yaml # deploy lab with file loader echo "clab-loaders-srl1:" > ./clab/loaders/targets/targets.yaml diff --git a/tests/telemetry_labs.sh b/tests/telemetry_labs.sh index 3a84b8b51..94ea5d034 100755 --- a/tests/telemetry_labs.sh +++ b/tests/telemetry_labs.sh @@ -164,7 +164,7 @@ for i in `seq 1 $NUM_LABS` # rm clab/lab$i.clab.yaml # rm -rf .lab$i.clab.yaml done - +sleep 60 done ####### # END #