Skip to content
This repository has been archived by the owner on Oct 28, 2022. It is now read-only.

Commit

Permalink
add context to gnmic actions
Browse files Browse the repository at this point in the history
  • Loading branch information
karimra committed Apr 19, 2022
1 parent 57c7e9c commit 4e4b68f
Show file tree
Hide file tree
Showing 24 changed files with 109 additions and 69 deletions.
3 changes: 2 additions & 1 deletion actions/action.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package actions

import (
"context"
"log"

"github.com/karimra/gnmic/types"
Expand All @@ -17,7 +18,7 @@ type Action interface {
// - `Vars` : a map[string]interface{} containing variables passed to the action
// - `Targets`: a map[string]*types.TargetConfig containing (if the action is ran by a loader)
// the currently known targets configurations
Run(aCtx *Context) (interface{}, error)
Run(ctx context.Context, aCtx *Context) (interface{}, error)
// NName returns the configured action name
NName() string
// WithTargets passes the known configured targets to the action when initialized
Expand Down
4 changes: 2 additions & 2 deletions actions/gnmi_action/gnmi_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (g *gnmiAction) Init(cfg map[string]interface{}, opts ...actions.Option) er
return nil
}

func (g *gnmiAction) Run(aCtx *actions.Context) (interface{}, error) {
func (g *gnmiAction) Run(ctx context.Context, aCtx *actions.Context) (interface{}, error) {
g.m.Lock()
for n, tc := range aCtx.Targets {
g.targetsConfigs[n] = tc
Expand All @@ -126,7 +126,7 @@ func (g *gnmiAction) Run(aCtx *actions.Context) (interface{}, error) {
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
defer cancel()
result := make(map[string]interface{})
resCh := make(chan *gnmiResponse)
Expand Down
7 changes: 5 additions & 2 deletions actions/http_action/http_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package http_action

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -75,7 +76,7 @@ func (h *httpAction) Init(cfg map[string]interface{}, opts ...actions.Option) er
return err
}

func (h *httpAction) Run(aCtx *actions.Context) (interface{}, error) {
func (h *httpAction) Run(ctx context.Context, aCtx *actions.Context) (interface{}, error) {
if h.url == nil {
return nil, errors.New("missing url template")
}
Expand Down Expand Up @@ -117,7 +118,9 @@ func (h *httpAction) Run(aCtx *actions.Context) (interface{}, error) {
client := &http.Client{
Timeout: h.Timeout,
}
resp, err := client.Do(req)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
resp, err := client.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion actions/http_action/http_action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ func TestHTTPAction(t *testing.T) {
for i, item := range ts.tests {
t.Run(name, func(t *testing.T) {
t.Logf("running test item %d", i)
res, err := a.Run(&actions.Context{Input: item.input})
res, err := a.Run(context.TODO(), &actions.Context{Input: item.input})
if err != nil {
t.Errorf("failed at %s item %d, %v", name, i, err)
t.Fail()
Expand Down
3 changes: 2 additions & 1 deletion actions/script_action/script_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package script_action

import (
"bytes"
"context"
"fmt"
"io"
"log"
Expand Down Expand Up @@ -56,7 +57,7 @@ func (s *scriptAction) Init(cfg map[string]interface{}, opts ...actions.Option)
return nil
}

func (s *scriptAction) Run(aCtx *actions.Context) (interface{}, error) {
func (s *scriptAction) Run(_ context.Context, aCtx *actions.Context) (interface{}, error) {
if s.Command == "" && s.File == "" {
return nil, nil
}
Expand Down
2 changes: 1 addition & 1 deletion actions/template_action/template_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (t *templateAction) Init(cfg map[string]interface{}, opts ...actions.Option
return nil
}

func (t *templateAction) Run(aCtx *actions.Context) (interface{}, error) {
func (t *templateAction) Run(_ context.Context, aCtx *actions.Context) (interface{}, error) {
b := new(bytes.Buffer)
err := t.tpl.Execute(b, &actions.Context{
Input: aCtx.Input,
Expand Down
2 changes: 1 addition & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (a *App) loadTargets(e fsnotify.Event) {
// delete targets
for t := range dist {
if _, ok := newTargets[t]; !ok {
err = a.deleteTarget(t)
err = a.deleteTarget(ctx, t)
if err != nil {
a.Logger.Printf("failed to delete target %q: %v", t, err)
continue
Expand Down
19 changes: 11 additions & 8 deletions app/clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,9 @@ func (a *App) dispatchTargets(ctx context.Context) {
}
var err error
//a.m.RLock()
dctx, cancel := context.WithTimeout(ctx, a.Config.Clustering.TargetsWatchTimer)
for _, tc := range a.Config.Targets {
err = a.dispatchTarget(ctx, tc)
err = a.dispatchTarget(dctx, tc)
if err != nil {
a.Logger.Printf("failed to dispatch target %q: %v", tc.Name, err)
}
Expand All @@ -273,7 +274,7 @@ func (a *App) dispatchTargets(ctx context.Context) {
}
}
//a.m.RUnlock()

cancel()
select {
case <-ctx.Done():
return
Expand Down Expand Up @@ -339,7 +340,7 @@ WAIT:
retries++
if (retries+1)*int(lockWaitTime) >= int(a.Config.Clustering.TargetAssignmentTimeout) {
a.Logger.Printf("[cluster-leader] max retries reached for target %q and service %q, reselecting...", tc.Name, service.ID)
err = a.unassignTarget(tc.Name, service.ID)
err = a.unassignTarget(ctx, tc.Name, service.ID)
if err != nil {
a.Logger.Printf("failed to unassign target %q from %q", tc.Name, service.ID)
}
Expand All @@ -357,7 +358,7 @@ WAIT:
retries++
if (retries+1)*int(lockWaitTime) >= int(a.Config.Clustering.TargetAssignmentTimeout) {
a.Logger.Printf("[cluster-leader] max retries reached for target %q and service %q, reselecting...", tc.Name, service.ID)
err = a.unassignTarget(tc.Name, service.ID)
err = a.unassignTarget(ctx, tc.Name, service.ID)
if err != nil {
a.Logger.Printf("failed to unassign target %q from %q", tc.Name, service.ID)
}
Expand Down Expand Up @@ -529,7 +530,7 @@ func (a *App) getHighestTagsMatches(tagsCount map[string]int) []string {
return ss
}

func (a *App) deleteTarget(name string) error {
func (a *App) deleteTarget(ctx context.Context, name string) error {
errs := make([]error, 0, len(a.apiServices))
for _, s := range a.apiServices {
scheme := "http"
Expand All @@ -549,7 +550,7 @@ func (a *App) deleteTarget(name string) error {
},
}
}
ctx, cancel := context.WithCancel(a.ctx)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
url := fmt.Sprintf("%s://%s/api/v1/config/targets/%s", scheme, s.Address, name)
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil)
Expand Down Expand Up @@ -630,7 +631,7 @@ func (a *App) assignTarget(ctx context.Context, tc *types.TargetConfig, service
return nil
}

func (a *App) unassignTarget(name string, serviceID string) error {
func (a *App) unassignTarget(ctx context.Context, name string, serviceID string) error {
for _, s := range a.apiServices {
if s.ID != serviceID {
continue
Expand All @@ -653,15 +654,17 @@ func (a *App) unassignTarget(name string, serviceID string) error {
}
}
url := fmt.Sprintf("%s://%s/api/v1/targets/%s", scheme, s.Address, name)
ctx, cancel := context.WithTimeout(a.ctx, 500*time.Millisecond)
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, url, nil)
if err != nil {
a.Logger.Printf("failed to create HTTP request: %v", err)
continue
}
rsp, err := client.Do(req)
if err != nil {
rsp.Body.Close()
a.Logger.Printf("failed HTTP request: %v", err)
continue
}
rsp.Body.Close()
Expand Down
3 changes: 3 additions & 0 deletions app/gnmi_client_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ START:
select {
case <-nctx.Done():
a.Logger.Printf("target %q stopped: %v", tc.Name, nctx.Err())
// drain errChan
err := <-errChan
a.Logger.Printf("target %q keepLock returned: %v", tc.Name, err)
return
case <-doneChan:
a.Logger.Printf("target lock %q removed", tc.Name)
Expand Down
2 changes: 1 addition & 1 deletion app/loaders.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ START:
continue
}
// clustered, delete target in all instances of the cluster
err = a.deleteTarget(del)
err = a.deleteTarget(ctx, del)
if err != nil {
a.Logger.Printf("failed to delete target %q: %v", del, err)
}
Expand Down
11 changes: 9 additions & 2 deletions app/version.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app

import (
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -78,9 +79,15 @@ func (a *App) VersionUpgradeRun(cmd *cobra.Command, args []string) error {

// downloadFile will download a file from a URL and write its content to a file
func downloadFile(url string, file *os.File) error {
client := http.Client{Timeout: 10 * time.Second}
client := http.Client{Timeout: 30 * time.Second}
// Get the data
resp, err := client.Get(url)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resp, err := client.Do(req.WithContext(ctx))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion formatters/event_trigger/event_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (p *Trigger) readVars() error {
func (p *Trigger) triggerActions(e *formatters.EventMsg) {
actx := &actions.Context{Input: e, Env: make(map[string]interface{}), Vars: p.vars}
for _, act := range p.actions {
res, err := act.Run(actx)
res, err := act.Run(context.TODO(), actx)
if err != nil {
p.logger.Printf("trigger action %q failed: %+v", act.NName(), err)
return
Expand Down
31 changes: 20 additions & 11 deletions loaders/consul_loader/consul_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ const (
defaultAddress = "localhost:8500"
defaultPrefix = "gnmic/config/targets"
//
defaultWatchTimeout = 1 * time.Minute
defaultWatchTimeout = 1 * time.Minute
defaultActionTimeout = 30 * time.Second
)

func init() {
Expand Down Expand Up @@ -77,16 +78,19 @@ type cfg struct {
Services []*serviceDef `mapstructure:"services,omitempty" json:"services,omitempty"`
// if true, registers consulLoader prometheus metrics with the provided
// prometheus registry
EnableMetrics bool `json:"enable-metrics,omitempty" mapstructure:"enable-metrics,omitempty"`
EnableMetrics bool `mapstructure:"enable-metrics,omitempty" json:"enable-metrics,omitempty"`
// variables definitions to be passed to the actions
Vars map[string]interface{}
// variable file, values in this file will be overwritten by
// the ones defined in Vars
VarsFile string `mapstructure:"vars-file,omitempty"`
VarsFile string `mapstructure:"vars-file,omitempty" json:"vars-file,omitempty"`
// list of Actions to run on new target discovery
OnAdd []string `json:"on-add,omitempty" mapstructure:"on-add,omitempty"`
OnAdd []string `mapstructure:"on-add,omitempty" json:"on-add,omitempty"`
// list of Actions to run on target removal
OnDelete []string `json:"on-delete,omitempty" mapstructure:"on-delete,omitempty"`
OnDelete []string `mapstructure:"on-delete,omitempty" json:"on-delete,omitempty"`
// timeout for the actions, this applies for all actions as a whole (on-add + on-delete),
// not to each action individually.
ActionsTimeout time.Duration `mapstructure:"actions-timeout,omitempty" json:"actions-timeout,omitempty"`
}

type serviceDef struct {
Expand Down Expand Up @@ -274,6 +278,9 @@ func (c *consulLoader) setDefaults() error {
if c.cfg.KeyPrefix == "" && len(c.cfg.Services) == 0 {
c.cfg.KeyPrefix = defaultPrefix
}
if c.cfg.ActionsTimeout <= 0 {
c.cfg.ActionsTimeout = defaultActionTimeout
}
return nil
}

Expand Down Expand Up @@ -460,6 +467,8 @@ func (c *consulLoader) runActions(ctx context.Context, tcs map[string]*types.Tar
Add: make([]*types.TargetConfig, 0, len(targetOp.Add)),
Del: make([]string, 0, len(targetOp.Del)),
}
ctx, cancel := context.WithTimeout(ctx, c.cfg.ActionsTimeout)
defer cancel()
// start operation gathering goroutine
go func() {
for {
Expand All @@ -483,7 +492,7 @@ func (c *consulLoader) runActions(ctx context.Context, tcs map[string]*types.Tar
for _, tAdd := range targetOp.Add {
go func(tc *types.TargetConfig) {
defer wg.Done()
err := c.runOnAddActions(tc.Name, tcs)
err := c.runOnAddActions(ctx, tc.Name, tcs)
if err != nil {
c.logger.Printf("failed running OnAdd actions: %v", err)
return
Expand All @@ -495,7 +504,7 @@ func (c *consulLoader) runActions(ctx context.Context, tcs map[string]*types.Tar
for _, tDel := range targetOp.Del {
go func(name string) {
defer wg.Done()
err := c.runOnDeleteActions(name, tcs)
err := c.runOnDeleteActions(ctx, name, tcs)
if err != nil {
c.logger.Printf("failed running OnDelete actions: %v", err)
return
Expand All @@ -509,7 +518,7 @@ func (c *consulLoader) runActions(ctx context.Context, tcs map[string]*types.Tar
return result, nil
}

func (c *consulLoader) runOnAddActions(tName string, tcs map[string]*types.TargetConfig) error {
func (c *consulLoader) runOnAddActions(ctx context.Context, tName string, tcs map[string]*types.TargetConfig) error {
aCtx := &actions.Context{
Input: tName,
Env: make(map[string]interface{}),
Expand All @@ -518,7 +527,7 @@ func (c *consulLoader) runOnAddActions(tName string, tcs map[string]*types.Targe
}
for _, act := range c.addActions {
c.logger.Printf("running action %q for target %q", act.NName(), tName)
res, err := act.Run(aCtx)
res, err := act.Run(ctx, aCtx)
if err != nil {
// delete target from known targets map
c.m.Lock()
Expand All @@ -537,10 +546,10 @@ func (c *consulLoader) runOnAddActions(tName string, tcs map[string]*types.Targe
return nil
}

func (c *consulLoader) runOnDeleteActions(tName string, tcs map[string]*types.TargetConfig) error {
func (c *consulLoader) runOnDeleteActions(ctx context.Context, tName string, tcs map[string]*types.TargetConfig) error {
env := make(map[string]interface{})
for _, act := range c.delActions {
res, err := act.Run(&actions.Context{Input: tName, Env: env, Vars: c.vars})
res, err := act.Run(ctx, &actions.Context{Input: tName, Env: env, Vars: c.vars})
if err != nil {
return fmt.Errorf("action %q for target %q failed: %v", act.NName(), tName, err)
}
Expand Down
Loading

0 comments on commit 4e4b68f

Please sign in to comment.