diff --git a/e2e/etcd_test.go b/e2e/etcd_test.go index a8f180ba595..105d6c0f337 100644 --- a/e2e/etcd_test.go +++ b/e2e/etcd_test.go @@ -145,10 +145,12 @@ type etcdProcessConfig struct { args []string dataDirPath string acurl url.URL + isProxy bool } type etcdProcessClusterConfig struct { clusterSize int + proxySize int isClientTLS bool isPeerTLS bool initialToken string @@ -160,7 +162,7 @@ func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, etcdCfgs := cfg.etcdProcessConfigs() epc := &etcdProcessCluster{ cfg: cfg, - procs: make([]*etcdProcess, cfg.clusterSize), + procs: make([]*etcdProcess, cfg.clusterSize+cfg.proxySize), } // launch etcd processes @@ -174,11 +176,15 @@ func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, } // wait for cluster to start - readyC := make(chan error, cfg.clusterSize) + readyC := make(chan error, cfg.clusterSize+cfg.proxySize) readyStr := "set the initial cluster version" for i := range etcdCfgs { go func(etcdp *etcdProcess) { - _, err := etcdp.proc.ExpectRegex(readyStr) + rs := readyStr + if etcdp.cfg.isProxy { + rs = "listening for client requests" + } + _, err := etcdp.proc.ExpectRegex(rs) readyC <- err etcdp.proc.ReadLine() etcdp.proc.Interact() // this blocks(leaks) if another goroutine is reading @@ -220,7 +226,7 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig { peerScheme = "https" } - etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize) + etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize+cfg.proxySize) initialCluster := make([]string, cfg.clusterSize) for i := 0; i < cfg.clusterSize; i++ { port := etcdProcessBasePort + 2*i @@ -262,6 +268,24 @@ func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig { acurl: curl, } } + for i := 0; i < cfg.proxySize; i++ { + port := etcdProcessBasePort + 2*cfg.clusterSize + i + 1 + curl := url.URL{Scheme: clientScheme, Host: fmt.Sprintf("localhost:%d", port)} + name := fmt.Sprintf("testname-proxy%d", i) + dataDirPath := name + ".etcd" + args := []string{ + "--name", name, + "--proxy", "on", + "--listen-client-urls", curl.String(), + "--data-dir", dataDirPath, + } + etcdCfgs[cfg.clusterSize+i] = &etcdProcessConfig{ + args: args, + dataDirPath: dataDirPath, + acurl: curl, + isProxy: true, + } + } initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")} for i := range etcdCfgs { @@ -289,6 +313,15 @@ func (epc *etcdProcessCluster) Close() (err error) { return err } +// proxies returns only the proxy etcdProcess. +func (epc *etcdProcessCluster) proxies() []*etcdProcess { + return epc.procs[epc.cfg.clusterSize:] +} + +func (epc *etcdProcessCluster) backends() []*etcdProcess { + return epc.procs[:epc.cfg.clusterSize] +} + func spawnCmd(args []string) (*gexpect.ExpectSubprocess, error) { // redirect stderr to stdout since gexpect only uses stdout cmd := `/bin/sh -c "` + strings.Join(args, " ") + ` 2>&1 "` diff --git a/e2e/etcdctl_test.go b/e2e/etcdctl_test.go new file mode 100644 index 00000000000..f8618cd62f0 --- /dev/null +++ b/e2e/etcdctl_test.go @@ -0,0 +1,125 @@ +// Copyright 2016 CoreOS, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "testing" + "time" + + "github.com/coreos/etcd/pkg/fileutil" + "github.com/coreos/etcd/pkg/testutil" +) + +func TestBasicOpsV2CtlWatchWithProxy(t *testing.T) { + defer testutil.AfterTest(t) + testProcessClusterV2CtlWatch( + t, + &etcdProcessClusterConfig{ + clusterSize: 3, + proxySize: 1, + isClientTLS: false, + isPeerTLS: false, + initialToken: "new", + }, + false, + ) +} + +func TestBasicOpsV2CtlWatchWithProxyNoSync(t *testing.T) { + defer testutil.AfterTest(t) + testProcessClusterV2CtlWatch( + t, + &etcdProcessClusterConfig{ + clusterSize: 3, + proxySize: 1, + isClientTLS: false, + isPeerTLS: false, + initialToken: "new", + }, + true, + ) +} + +func etcdctlSet(epc *etcdProcessCluster, key, value string, noSync bool) error { + endpoint := "" + if proxies := epc.proxies(); len(proxies) != 0 { + endpoint = proxies[0].cfg.acurl.String() + } else if backends := epc.backends(); len(backends) != 0 { + endpoint = backends[0].cfg.acurl.String() + } + + putArgs := []string{"../bin/etcdctl", "--endpoint", endpoint} + if noSync { + putArgs = append(putArgs, "--no-sync") + } + putArgs = append(putArgs, "set", key, value) + + return spawnWithExpect(putArgs, value) +} + +func etcdctlWatch(epc *etcdProcessCluster, key, value string, noSync bool, done chan struct{}, errChan chan error) { + endpoint := "" + if proxies := epc.proxies(); len(proxies) != 0 { + endpoint = proxies[0].cfg.acurl.String() + } else if backends := epc.backends(); len(backends) != 0 { + endpoint = backends[0].cfg.acurl.String() + } + + watchArgs := []string{"../bin/etcdctl", "--endpoint", endpoint} + if noSync { + watchArgs = append(watchArgs, "--no-sync") + } + watchArgs = append(watchArgs, "watch", key) + + if err := spawnWithExpect(watchArgs, value); err != nil { + errChan <- err + return + } + done <- struct{}{} +} + +func testProcessClusterV2CtlWatch(t *testing.T, cfg *etcdProcessClusterConfig, noSync bool) { + if fileutil.Exist("../bin/etcdctl") == false { + t.Fatalf("could not find etcdctl binary") + } + + epc, errC := newEtcdProcessCluster(cfg) + if errC != nil { + t.Fatalf("could not start etcd process cluster (%v)", errC) + } + defer func() { + if errC := epc.Close(); errC != nil { + t.Fatalf("error closing etcd processes (%v)", errC) + } + }() + + key, value := "foo", "bar" + done, errChan := make(chan struct{}), make(chan error) + + go etcdctlWatch(epc, key, value, noSync, done, errChan) + + if err := etcdctlSet(epc, key, value, noSync); err != nil { + t.Fatalf("failed set (%v)", err) + } + + select { + case <-done: + return + case err := <-errChan: + t.Fatalf("failed watch (%v)", err) + case <-time.After(2 * time.Second): + t.Fatalf("watch timed out!") + } +}