Skip to content

Commit

Permalink
client: do not timeout when wait is true
Browse files Browse the repository at this point in the history
Current V2 watch waits by encoding URL with wait=true.
When a client sets 'no-sync', it requests directly to
proxy and the proxy redirects it by cloning the request
object, which leads to cancel the original request when
it times out and the cloned request gets closed prematurely.

This fixes etcd-io#3894 by querying
the original client request in order to not use context timeout
when 'wait=true'.
  • Loading branch information
gyuho committed Jan 21, 2016
1 parent 1db0148 commit 982c901
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 11 deletions.
8 changes: 7 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/url"
"reflect"
"sort"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -442,9 +443,14 @@ func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Respon
return nil, nil, err
}

isWait := false
if req != nil && req.URL != nil {
isWait, _ = strconv.ParseBool(req.URL.Query().Get("wait"))
}

var hctx context.Context
var hcancel context.CancelFunc
if c.headerTimeout > 0 {
if !isWait && c.headerTimeout > 0 {
hctx, hcancel = context.WithTimeout(ctx, c.headerTimeout)
} else {
hctx, hcancel = context.WithCancel(ctx)
Expand Down
173 changes: 163 additions & 10 deletions e2e/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"net/url"
"os"
"strings"
"sync"
"testing"
"time"

"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/gexpect"
"github.com/coreos/etcd/pkg/fileutil"
Expand All @@ -42,6 +44,7 @@ func TestBasicOpsNoTLS(t *testing.T) {
isPeerTLS: false,
initialToken: "new",
},
etcdProcessBasePort,
)
}

Expand All @@ -54,6 +57,7 @@ func TestBasicOpsAllTLS(t *testing.T) {
isPeerTLS: true,
initialToken: "new",
},
etcdProcessBasePort+10,
)
}

Expand All @@ -66,6 +70,7 @@ func TestBasicOpsPeerTLS(t *testing.T) {
isPeerTLS: true,
initialToken: "new",
},
etcdProcessBasePort+20,
)
}

Expand All @@ -78,11 +83,12 @@ func TestBasicOpsClientTLS(t *testing.T) {
isPeerTLS: false,
initialToken: "new",
},
etcdProcessBasePort+30,
)
}

func testProcessClusterPutGet(t *testing.T, cfg *etcdProcessClusterConfig) {
epc, err := newEtcdProcessCluster(cfg)
func testProcessClusterPutGet(t *testing.T, cfg *etcdProcessClusterConfig, basePort int) {
epc, err := newEtcdProcessCluster(cfg, basePort)
if err != nil {
t.Fatalf("could not start etcd process cluster (%v)", err)
}
Expand All @@ -103,6 +109,108 @@ func testProcessClusterPutGet(t *testing.T, cfg *etcdProcessClusterConfig) {
}
}

func TestBasicOpsV2CtlWatchWithProxy(t *testing.T) {
testProcessClusterV2CtlWatch(
t,
&etcdProcessClusterConfig{
clusterSize: 3,
proxySize: 1,
isClientTLS: false,
isPeerTLS: false,
initialToken: "new",
},
etcdProcessBasePort+40,
false,
"foo", "bar",
)
}

func TestBasicOpsV2CtlWatchWithProxyNoSync(t *testing.T) {
testProcessClusterV2CtlWatch(
t,
&etcdProcessClusterConfig{
clusterSize: 3,
proxySize: 1,
isClientTLS: false,
isPeerTLS: false,
initialToken: "new",
},
etcdProcessBasePort+50,
true,
"foo", "bar",
)
}

func testProcessClusterV2CtlWatch(t *testing.T, cfg *etcdProcessClusterConfig, basePort int, noSync bool, key, value string) {
if fileutil.Exist("../bin/etcdctl") == false {
t.Fatalf("could not find etcdctl binary")
}

epc, errC := newEtcdProcessCluster(cfg, basePort)
if errC != nil {
t.Fatalf("could not start etcd process cluster (%v)", errC)
}
defer func() {
if errC := epc.Close(); errC != nil {
t.Fatalf("error closing etcd processes (%v)", errC)
}
}()

endpoint, be := "", ""
for _, p := range epc.procs {
if p.cfg.isProxy {
endpoint = p.cfg.acurl.String()
time.Sleep(5 * time.Second) // give some time for proxy to find cluster
break
} else {
be = p.cfg.acurl.String()
}
}
if endpoint == "" {
endpoint = be
}

args := []string{"../bin/etcdctl", "--endpoint", endpoint}
if noSync {
args = append(args, "--no-sync")
}

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
args = append(args, "watch", key)
proc, err := spawnCmd(args)
if err != nil {
t.Fatalf("failed watch (%v)", err)
}

s, _ := proc.ReadLine()
if strings.Contains(s, "client: etcd cluster is unavailable or misconfigured") {
t.Fatalf("failed watch (%v)", s)
}
}()

time.Sleep(time.Second)

putArgs := []string{"../bin/etcdctl", "--endpoint", endpoint}
if noSync {
args = append(args, "--no-sync")
}
putArgs = append(putArgs, "set", key, value)
proc, err := spawnCmd(putArgs)
if err != nil {
t.Fatalf("failed put (%v)", err)
}

s, _ := proc.ReadLine()
if !strings.Contains(s, value) {
t.Fatalf("failed put (%v)", s)
}

wg.Wait()
}

// cURLPrefixArgs builds the beginning of a curl command for a given key
// addressed to a random URL in the given cluster.
func cURLPrefixArgs(clus *etcdProcessCluster, key string) []string {
Expand Down Expand Up @@ -140,22 +248,24 @@ type etcdProcessConfig struct {
args []string
dataDirPath string
acurl url.URL
isProxy bool
}

type etcdProcessClusterConfig struct {
clusterSize int
proxySize int
isClientTLS bool
isPeerTLS bool
initialToken string
}

// newEtcdProcessCluster launches a new cluster from etcd processes, returning
// a new etcdProcessCluster once all nodes are ready to accept client requests.
func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) {
etcdCfgs := cfg.etcdProcessConfigs()
func newEtcdProcessCluster(cfg *etcdProcessClusterConfig, basePort int) (*etcdProcessCluster, error) {
etcdCfgs := cfg.etcdProcessConfigs(basePort)
epc := &etcdProcessCluster{
cfg: cfg,
procs: make([]*etcdProcess, cfg.clusterSize),
procs: make([]*etcdProcess, cfg.clusterSize+cfg.proxySize),
}

// launch etcd processes
Expand All @@ -169,11 +279,17 @@ func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster,
}

// wait for cluster to start
readyC := make(chan error, cfg.clusterSize)
readyStr := "set the initial cluster version"
readyC := make(chan error, cfg.clusterSize+cfg.proxySize)
readyStr, readyStrProxy := "set the initial cluster version", "proxy: listening for client requests on"

for i := range etcdCfgs {
go func(etcdp *etcdProcess) {
_, err := etcdp.proc.ExpectRegex(readyStr)
var err error
if !etcdp.cfg.isProxy {
_, err = etcdp.proc.ExpectRegex(readyStr)
} else {
_, err = etcdp.proc.ExpectRegex(readyStrProxy)
}
readyC <- err
etcdp.proc.ReadUntil('\n') // don't display rest of line
etcdp.proc.Interact()
Expand Down Expand Up @@ -204,7 +320,7 @@ func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) {
return &etcdProcess{cfg: cfg, proc: child, donec: make(chan struct{})}, nil
}

func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
func (cfg *etcdProcessClusterConfig) etcdProcessConfigs(basePort int) []*etcdProcessConfig {
clientScheme := "http"
if cfg.isClientTLS {
clientScheme = "https"
Expand All @@ -217,7 +333,7 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize)
initialCluster := make([]string, cfg.clusterSize)
for i := 0; i < cfg.clusterSize; i++ {
port := etcdProcessBasePort + 2*i
port := basePort + 2*i
curl := url.URL{Scheme: clientScheme, Host: fmt.Sprintf("localhost:%d", port)}
purl := url.URL{Scheme: peerScheme, Host: fmt.Sprintf("localhost:%d", port+1)}
name := fmt.Sprintf("testname%d", i)
Expand Down Expand Up @@ -257,6 +373,43 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig {
}
}

for i := 0; i < cfg.proxySize; i++ {
port := basePort + 2*cfg.clusterSize + i
curl := url.URL{Scheme: clientScheme, Host: fmt.Sprintf("localhost:%d", port)}
name := fmt.Sprintf("testname-proxy%d", i)
dataDirPath := name + ".etcd"

args := []string{
"--name", name,
"--proxy", "on",
"--listen-client-urls", curl.String(),
"--data-dir", dataDirPath,
}
if cfg.isClientTLS {
tlsClientArgs := []string{
"--cert-file", certPath,
"--key-file", privateKeyPath,
"--ca-file", caPath,
}
args = append(args, tlsClientArgs...)
}
if cfg.isPeerTLS {
tlsPeerArgs := []string{
"--peer-cert-file", certPath,
"--peer-key-file", privateKeyPath,
"--peer-ca-file", caPath,
}
args = append(args, tlsPeerArgs...)
}

etcdCfgs = append(etcdCfgs, &etcdProcessConfig{
args: args,
dataDirPath: dataDirPath,
acurl: curl,
isProxy: true,
})
}

initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")}
for i := range etcdCfgs {
etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...)
Expand Down

0 comments on commit 982c901

Please sign in to comment.