Skip to content

Commit

Permalink
Limit agent injection concurrency
Browse files Browse the repository at this point in the history
Signed-off-by: Pablo Chacin <pablochacin@gmail.com>
  • Loading branch information
pablochacin committed Nov 17, 2022
1 parent 01973ef commit d98dab7
Showing 1 changed file with 21 additions and 5 deletions.
26 changes: 21 additions & 5 deletions pkg/api/disruptors/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type PodDisruptorOptions struct {
// timeout when waiting agent to be injected in seconds (default 30s). A zero value forces default.
// A Negative value forces no waiting.
InjectTimeout int `js:"injectTimeout"`
// Maximum concurrent agent injections
ConcurrentInjections int `js:"concurrentInjections"`
}

// podDisruptor is an instance of a PodDisruptor initialized with a list ot target pods
Expand Down Expand Up @@ -115,10 +117,11 @@ func (s *PodSelector) GetTargets(k8s kubernetes.Kubernetes) ([]string, error) {

// AgentController controls de agents in a set of target pods
type AgentController struct {
k8s kubernetes.Kubernetes
namespace string
targets []string
timeout time.Duration
k8s kubernetes.Kubernetes
namespace string
targets []string
timeout time.Duration
concurrency int
}

// InjectDisruptorAgent injects the Disruptor agent in the target pods
Expand All @@ -139,10 +142,18 @@ func (c *AgentController) InjectDisruptorAgent() error {
},
}

// Set a limit to concurrent injections to prevent client side throttling
// This is a workaround for https://github.com/grafana/xk6-disruptor/issues/44
// Default to the default RQS in the Kubernetes client
concurrency := c.concurrency
if concurrency == 0 {
concurrency = 5
}

var wg sync.WaitGroup
// ensure errors channel has enough space to avoid blocking gorutines
errors := make(chan error, len(c.targets))
for _, pod := range c.targets {
for i , pod := range c.targets {
wg.Add(1)
// attach each container asynchronously
go func(podName string) {
Expand Down Expand Up @@ -172,6 +183,11 @@ func (c *AgentController) InjectDisruptorAgent() error {
errors <- err
}
}(pod)

// pause when concurrency limit reached
if i % concurrency == 0 {
time.Sleep(time.Second)
}
}

wg.Wait()
Expand Down

0 comments on commit d98dab7

Please sign in to comment.