From 993a7d646e98e6acecf782ef7d68fc1b82fba5dd Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Mon, 28 Jul 2014 16:38:02 -0700 Subject: [PATCH 1/3] etcd: pass timeout into NewClient --- client/registry.go | 5 +++-- etcd/client.go | 20 +++++++++++--------- etcd/client_test.go | 14 +++++++------- fleetctl/fleetctl.go | 2 +- server/server.go | 3 ++- 5 files changed, 24 insertions(+), 20 deletions(-) diff --git a/client/registry.go b/client/registry.go index dcecf032b..486c60ef7 100644 --- a/client/registry.go +++ b/client/registry.go @@ -2,14 +2,15 @@ package client import ( "net/http" + "time" "github.com/coreos/fleet/etcd" "github.com/coreos/fleet/registry" ) -func NewRegistryClient(trans *http.Transport, endpoint, keyPrefix string) (API, error) { +func NewRegistryClient(trans *http.Transport, endpoint, keyPrefix string, requestTimeout time.Duration) (API, error) { machines := []string{endpoint} - client, err := etcd.NewClient(machines, *trans) + client, err := etcd.NewClient(machines, *trans, requestTimeout) if err != nil { return nil, err } diff --git a/etcd/client.go b/etcd/client.go index 8b3de42b6..96722ead5 100644 --- a/etcd/client.go +++ b/etcd/client.go @@ -15,9 +15,9 @@ import ( ) const ( - defaultEndpoint = "http://localhost:4001" - redirectMax = 10 - actionTimeout = time.Second + defaultEndpoint = "http://localhost:4001" + redirectMax = 10 + DefaultActionTimeout = time.Second ) type Client interface { @@ -33,7 +33,7 @@ type transport interface { CancelRequest(req *http.Request) } -func NewClient(endpoints []string, transport http.Transport) (*client, error) { +func NewClient(endpoints []string, transport http.Transport, actionTimeout time.Duration) (*client, error) { if len(endpoints) == 0 { endpoints = []string{defaultEndpoint} } @@ -55,8 +55,9 @@ func NewClient(endpoints []string, transport http.Transport) (*client, error) { } return &client{ - endpoints: parsed, - transport: &transport, + endpoints: parsed, + transport: &transport, + actionTimeout: actionTimeout, }, nil } @@ -100,8 +101,9 @@ func filterURL(u *url.URL) error { } type client struct { - endpoints []url.URL - transport transport + endpoints []url.URL + transport transport + actionTimeout time.Duration } // a requestFunc must never return a nil *http.Response and a nil error together @@ -232,7 +234,7 @@ func (c *client) Do(act Action) (*Result, error) { }() select { - case <-time.After(actionTimeout): + case <-time.After(c.actionTimeout): close(cancel) return nil, errors.New("timeout reached") case r := <-result: diff --git a/etcd/client_test.go b/etcd/client_test.go index a40a29664..92a6524a3 100644 --- a/etcd/client_test.go +++ b/etcd/client_test.go @@ -40,7 +40,7 @@ func TestNewClient(t *testing.T) { } for i, tt := range tests { - _, err := NewClient(tt.endpoints, http.Transport{}) + _, err := NewClient(tt.endpoints, http.Transport{}, DefaultActionTimeout) if tt.pass != (err == nil) { t.Errorf("case %d %v: expected to pass=%t, err=%v", i, tt.endpoints, tt.pass, err) } @@ -137,7 +137,7 @@ func TestFilterURL(t *testing.T) { // Ensure the channel passed into c.resolve is actually wired up func TestClientCancel(t *testing.T) { act := Get{Key: "/foo"} - c, err := NewClient(nil, http.Transport{}) + c, err := NewClient(nil, http.Transport{}, DefaultActionTimeout) if err != nil { t.Fatalf("Failed building Client: %v", err) } @@ -246,7 +246,7 @@ func TestClientRedirectsFollowed(t *testing.T) { }, } - c, err := NewClient([]string{"http://192.0.2.1:4001"}, http.Transport{}) + c, err := NewClient([]string{"http://192.0.2.1:4001"}, http.Transport{}, DefaultActionTimeout) if err != nil { t.Fatalf("NewClient failed: %v", err) } @@ -283,7 +283,7 @@ func TestClientRedirectsAndAlternateEndpoints(t *testing.T) { }, } - c, err := NewClient([]string{"http://192.0.2.1:4001", "http://192.0.2.2:4002"}, http.Transport{}) + c, err := NewClient([]string{"http://192.0.2.1:4001", "http://192.0.2.2:4002"}, http.Transport{}, DefaultActionTimeout) if err != nil { t.Fatalf("NewClient failed: %v", err) } @@ -422,7 +422,7 @@ func newTestingRequestAndClient(t *testing.T, handler http.Handler) (*client, *h if err != nil { t.Fatalf("error creating request: %v", err) } - c, err := NewClient(nil, http.Transport{}) + c, err := NewClient(nil, http.Transport{}, DefaultActionTimeout) if err != nil { t.Fatalf("error creating client: %v", err) } @@ -465,7 +465,7 @@ func (n *nilNilTransport) CancelRequest(req *http.Request) {} // Ensure that any request that somehow returns (nil, nil) propagates an actual error func TestNilNilRequestHTTP(t *testing.T) { - c := &client{[]url.URL{}, &nilNilTransport{}} + c := &client{[]url.URL{}, &nilNilTransport{}, DefaultActionTimeout} cancel := make(chan bool) resp, body, err := c.requestHTTP(nil, cancel) if err == nil { @@ -499,7 +499,7 @@ func (r *respAndErrTransport) CancelRequest(req *http.Request) {} // Ensure that the body of a response is closed even when an error is returned func TestRespAndErrRequestHTTP(t *testing.T) { - c := &client{[]url.URL{}, &respAndErrTransport{}} + c := &client{[]url.URL{}, &respAndErrTransport{}, DefaultActionTimeout} cancel := make(chan bool) resp, body, err := c.requestHTTP(nil, cancel) if err == nil { diff --git a/fleetctl/fleetctl.go b/fleetctl/fleetctl.go index 197916f60..55e9c79b6 100644 --- a/fleetctl/fleetctl.go +++ b/fleetctl/fleetctl.go @@ -294,7 +294,7 @@ func getRegistryClient() (client.API, error) { TLSClientConfig: tlsConfig, } - return client.NewRegistryClient(&trans, globalFlags.Endpoint, globalFlags.EtcdKeyPrefix) + return client.NewRegistryClient(&trans, globalFlags.Endpoint, globalFlags.EtcdKeyPrefix, etcd.DefaultActionTimeout) } // getChecker creates and returns a HostKeyChecker, or nil if any error is encountered diff --git a/server/server.go b/server/server.go index 4d86e55b7..32c8db126 100644 --- a/server/server.go +++ b/server/server.go @@ -63,7 +63,8 @@ func New(cfg config.Config) (*Server, error) { return nil, err } - eClient, err := etcd.NewClient(cfg.EtcdServers, http.Transport{TLSClientConfig: tlsConfig}) + eTrans := http.Transport{TLSClientConfig: tlsConfig} + eClient, err := etcd.NewClient(cfg.EtcdServers, eTrans, etcd.DefaultActionTimeout) if err != nil { return nil, err } From bd867a36ec8b7f3267b5dea3d89181130568bb05 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Mon, 28 Jul 2014 16:43:03 -0700 Subject: [PATCH 2/3] fleetctl: add --request-timeout --- fleetctl/fleetctl.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/fleetctl/fleetctl.go b/fleetctl/fleetctl.go index 55e9c79b6..3bc4595a5 100644 --- a/fleetctl/fleetctl.go +++ b/fleetctl/fleetctl.go @@ -62,6 +62,7 @@ var ( KnownHostsFile string StrictHostKeyChecking bool Tunnel string + RequestTimeout float64 }{} // flags used by multiple commands @@ -90,6 +91,7 @@ func init() { globalFlagset.StringVar(&globalFlags.KnownHostsFile, "known-hosts-file", ssh.DefaultKnownHostsFile, "File used to store remote machine fingerprints. Ignored if strict host key checking is disabled.") globalFlagset.BoolVar(&globalFlags.StrictHostKeyChecking, "strict-host-key-checking", true, "Verify host keys presented by remote machines before initiating SSH connections.") globalFlagset.StringVar(&globalFlags.Tunnel, "tunnel", "", "Establish an SSH tunnel through the provided address for communication with fleet and etcd.") + globalFlagset.Float64Var(&globalFlags.RequestTimeout, "request-timeout", 1.0, "Amount of time in seconds to allow a single request before considering it failed.") } type Command struct { @@ -294,7 +296,8 @@ func getRegistryClient() (client.API, error) { TLSClientConfig: tlsConfig, } - return client.NewRegistryClient(&trans, globalFlags.Endpoint, globalFlags.EtcdKeyPrefix, etcd.DefaultActionTimeout) + timeout := time.Duration(globalFlags.RequestTimeout*1000) * time.Millisecond + return client.NewRegistryClient(&trans, globalFlags.Endpoint, globalFlags.EtcdKeyPrefix, timeout) } // getChecker creates and returns a HostKeyChecker, or nil if any error is encountered From bb530f41fc54d73cfd2ca11f69f6cee95ca7ed23 Mon Sep 17 00:00:00 2001 From: Brian Waldon Date: Mon, 28 Jul 2014 16:49:47 -0700 Subject: [PATCH 3/3] server: add etcd_request_timeout config option --- Documentation/configuration.md | 6 ++++++ config/config.go | 1 + etcd/client.go | 5 ++--- etcd/client_test.go | 14 +++++++------- fleet.conf.sample | 3 +++ fleet.go | 2 ++ server/server.go | 3 ++- 7 files changed, 23 insertions(+), 11 deletions(-) diff --git a/Documentation/configuration.md b/Documentation/configuration.md index 1e3c5139d..fd1e61fce 100644 --- a/Documentation/configuration.md +++ b/Documentation/configuration.md @@ -29,6 +29,12 @@ Provide a custom set of etcd endpoints. Default: ["http://127.0.0.1:4001"] +#### etcd_request_timeout + +Amount of time in seconds to allow a single etcd request before considering it failed. + +Default: 1.0 + #### etcd_cafile, etcd_keyfile, etcd_certfile Provide TLS configuration when SSL certificate authentication is enabled in etcd endpoints diff --git a/config/config.go b/config/config.go index d2f8a141c..2ed58b8b3 100644 --- a/config/config.go +++ b/config/config.go @@ -14,6 +14,7 @@ type Config struct { EtcdKeyFile string EtcdCertFile string EtcdCAFile string + EtcdRequestTimeout float64 PublicIP string Verbosity int RawMetadata string diff --git a/etcd/client.go b/etcd/client.go index 96722ead5..1d5fb7e48 100644 --- a/etcd/client.go +++ b/etcd/client.go @@ -15,9 +15,8 @@ import ( ) const ( - defaultEndpoint = "http://localhost:4001" - redirectMax = 10 - DefaultActionTimeout = time.Second + defaultEndpoint = "http://localhost:4001" + redirectMax = 10 ) type Client interface { diff --git a/etcd/client_test.go b/etcd/client_test.go index 92a6524a3..95587e5e8 100644 --- a/etcd/client_test.go +++ b/etcd/client_test.go @@ -40,7 +40,7 @@ func TestNewClient(t *testing.T) { } for i, tt := range tests { - _, err := NewClient(tt.endpoints, http.Transport{}, DefaultActionTimeout) + _, err := NewClient(tt.endpoints, http.Transport{}, time.Second) if tt.pass != (err == nil) { t.Errorf("case %d %v: expected to pass=%t, err=%v", i, tt.endpoints, tt.pass, err) } @@ -137,7 +137,7 @@ func TestFilterURL(t *testing.T) { // Ensure the channel passed into c.resolve is actually wired up func TestClientCancel(t *testing.T) { act := Get{Key: "/foo"} - c, err := NewClient(nil, http.Transport{}, DefaultActionTimeout) + c, err := NewClient(nil, http.Transport{}, time.Second) if err != nil { t.Fatalf("Failed building Client: %v", err) } @@ -246,7 +246,7 @@ func TestClientRedirectsFollowed(t *testing.T) { }, } - c, err := NewClient([]string{"http://192.0.2.1:4001"}, http.Transport{}, DefaultActionTimeout) + c, err := NewClient([]string{"http://192.0.2.1:4001"}, http.Transport{}, time.Second) if err != nil { t.Fatalf("NewClient failed: %v", err) } @@ -283,7 +283,7 @@ func TestClientRedirectsAndAlternateEndpoints(t *testing.T) { }, } - c, err := NewClient([]string{"http://192.0.2.1:4001", "http://192.0.2.2:4002"}, http.Transport{}, DefaultActionTimeout) + c, err := NewClient([]string{"http://192.0.2.1:4001", "http://192.0.2.2:4002"}, http.Transport{}, time.Second) if err != nil { t.Fatalf("NewClient failed: %v", err) } @@ -422,7 +422,7 @@ func newTestingRequestAndClient(t *testing.T, handler http.Handler) (*client, *h if err != nil { t.Fatalf("error creating request: %v", err) } - c, err := NewClient(nil, http.Transport{}, DefaultActionTimeout) + c, err := NewClient(nil, http.Transport{}, time.Second) if err != nil { t.Fatalf("error creating client: %v", err) } @@ -465,7 +465,7 @@ func (n *nilNilTransport) CancelRequest(req *http.Request) {} // Ensure that any request that somehow returns (nil, nil) propagates an actual error func TestNilNilRequestHTTP(t *testing.T) { - c := &client{[]url.URL{}, &nilNilTransport{}, DefaultActionTimeout} + c := &client{[]url.URL{}, &nilNilTransport{}, time.Second} cancel := make(chan bool) resp, body, err := c.requestHTTP(nil, cancel) if err == nil { @@ -499,7 +499,7 @@ func (r *respAndErrTransport) CancelRequest(req *http.Request) {} // Ensure that the body of a response is closed even when an error is returned func TestRespAndErrRequestHTTP(t *testing.T) { - c := &client{[]url.URL{}, &respAndErrTransport{}, DefaultActionTimeout} + c := &client{[]url.URL{}, &respAndErrTransport{}, time.Second} cancel := make(chan bool) resp, body, err := c.requestHTTP(nil, cancel) if err == nil { diff --git a/fleet.conf.sample b/fleet.conf.sample index 0d0ef9013..3c6034bef 100644 --- a/fleet.conf.sample +++ b/fleet.conf.sample @@ -8,6 +8,9 @@ # by the underlying go-etcd library. # etcd_servers=["http://127.0.0.1:4001"] +# Amount of time in seconds to allow a single etcd request before considering it failed. +# etcd_request_timeout=1.0 + # Provide TLS configuration when SSL certificate authentication is enabled in etcd endpoints # etcd_cafile=/path/to/CAfile # etcd_keyfile=/path/to/keyfile diff --git a/fleet.go b/fleet.go index 4fbdb7bac..39a8f0e89 100644 --- a/fleet.go +++ b/fleet.go @@ -51,6 +51,7 @@ func main() { cfgset.String("etcd_certfile", "", "SSL certification file used to secure etcd communication") cfgset.String("etcd_cafile", "", "SSL Certificate Authority file used to secure etcd communication") cfgset.String("etcd_key_prefix", registry.DefaultKeyPrefix, "Keyspace for fleet data in etcd") + cfgset.Float64("etcd_request_timeout", 1.0, "Amount of time in seconds to allow a single etcd request before considering it failed.") cfgset.String("public_ip", "", "IP address that fleet machine should publish") cfgset.String("metadata", "", "List of key-value metadata to assign to the fleet machine") cfgset.String("agent_ttl", agent.DefaultTTL, "TTL in seconds of fleet machine state in etcd") @@ -161,6 +162,7 @@ func getConfig(flagset *flag.FlagSet, userCfgFile string) (*config.Config, error EtcdKeyFile: (*flagset.Lookup("etcd_keyfile")).Value.(flag.Getter).Get().(string), EtcdCertFile: (*flagset.Lookup("etcd_certfile")).Value.(flag.Getter).Get().(string), EtcdCAFile: (*flagset.Lookup("etcd_cafile")).Value.(flag.Getter).Get().(string), + EtcdRequestTimeout: (*flagset.Lookup("etcd_request_timeout")).Value.(flag.Getter).Get().(float64), PublicIP: (*flagset.Lookup("public_ip")).Value.(flag.Getter).Get().(string), RawMetadata: (*flagset.Lookup("metadata")).Value.(flag.Getter).Get().(string), AgentTTL: (*flagset.Lookup("agent_ttl")).Value.(flag.Getter).Get().(string), diff --git a/server/server.go b/server/server.go index 32c8db126..46f228bad 100644 --- a/server/server.go +++ b/server/server.go @@ -64,7 +64,8 @@ func New(cfg config.Config) (*Server, error) { } eTrans := http.Transport{TLSClientConfig: tlsConfig} - eClient, err := etcd.NewClient(cfg.EtcdServers, eTrans, etcd.DefaultActionTimeout) + timeout := time.Duration(cfg.EtcdRequestTimeout*1000) * time.Millisecond + eClient, err := etcd.NewClient(cfg.EtcdServers, eTrans, timeout) if err != nil { return nil, err }