Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Fixes #1142: Handle hanging plugin
Browse files Browse the repository at this point in the history
Adds handling of the timeout to the native go client which enforces that each
request for {collect, process, publish} finishes in this amount of time.

Ups the default client timeout to 10 seconds.
  • Loading branch information
IRCody committed Sep 9, 2016
1 parent 14a1611 commit 92d4272
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 3 deletions.
2 changes: 1 addition & 1 deletion control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (

const (
// DefaultClientTimeout - default timeout for a client connection attempt
DefaultClientTimeout = time.Second * 3
DefaultClientTimeout = time.Second * 10
// DefaultHealthCheckTimeout - default timeout for a health check
DefaultHealthCheckTimeout = time.Second * 1
// DefaultHealthCheckFailureLimit - how any consecutive health check timeouts must occur to trigger a failure
Expand Down
22 changes: 20 additions & 2 deletions control/plugin/client/native.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type PluginNativeClient struct {
pluginType plugin.PluginType
encoder encoding.Encoder
encrypter *encrypter.Encrypter
timeout time.Duration
}

func NewCollectorNativeClient(address string, timeout time.Duration, pub *rsa.PublicKey, secure bool) (PluginCollectorClient, error) {
Expand Down Expand Up @@ -140,6 +141,15 @@ func decodeMetrics(bts []byte) ([]core.Metric, error) {
return cmetrics, nil
}

func enforceTimeout(p *PluginNativeClient, dl time.Duration, done chan int) {
select {
case <-time.After(dl):
p.Kill("Passed deadline")
case <-done:
return
}
}

func (p *PluginNativeClient) Publish(metrics []core.Metric, config map[string]ctypes.ConfigValue) error {

args := plugin.PublishArgs{
Expand All @@ -152,9 +162,11 @@ func (p *PluginNativeClient) Publish(metrics []core.Metric, config map[string]ct
if err != nil {
return err
}

var reply []byte
done := make(chan int)
go enforceTimeout(p, p.timeout, done)
err = p.connection.Call("Publisher.Publish", out, &reply)
close(done)
return err
return nil
}
Expand All @@ -173,7 +185,10 @@ func (p *PluginNativeClient) Process(metrics []core.Metric, config map[string]ct
}

var reply []byte
done := make(chan int)
go enforceTimeout(p, p.timeout, done)
err = p.connection.Call("Processor.Process", out, &reply)
close(done)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -211,14 +226,16 @@ func (p *PluginNativeClient) CollectMetrics(mts []core.Metric) ([]core.Metric, e
}

args := plugin.CollectMetricsArgs{MetricTypes: metricsToCollect}

out, err := p.encoder.Encode(args)
if err != nil {
return nil, err
}

var reply []byte
done := make(chan int)
go enforceTimeout(p, p.timeout, done)
err = p.connection.Call("Collector.CollectMetrics", out, &reply)
close(done)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -303,6 +320,7 @@ func newNativeClient(address string, timeout time.Duration, t plugin.PluginType,
p := &PluginNativeClient{
connection: r,
pluginType: t,
timeout: timeout,
}

p.encoder = encoding.NewGobEncoder()
Expand Down

0 comments on commit 92d4272

Please sign in to comment.