diff --git a/e2e/cluster_direct_test.go b/e2e/cluster_direct_test.go new file mode 100644 index 00000000000..15a16c9257a --- /dev/null +++ b/e2e/cluster_direct_test.go @@ -0,0 +1,21 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !cluster_proxy + +package e2e + +func newEtcdProcess(cfg *etcdServerProcessConfig) (etcdProcess, error) { + return newEtcdServerProcess(cfg) +} diff --git a/e2e/cluster_proxy_test.go b/e2e/cluster_proxy_test.go new file mode 100644 index 00000000000..a2bab6f587e --- /dev/null +++ b/e2e/cluster_proxy_test.go @@ -0,0 +1,278 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build cluster_proxy + +package e2e + +import ( + "fmt" + "net" + "net/url" + "os" + "strconv" + "strings" + + "github.com/coreos/etcd/pkg/expect" +) + +type proxyEtcdProcess struct { + etcdProc etcdProcess + proxyV2 *proxyV2Proc + proxyV3 *proxyV3Proc +} + +func newEtcdProcess(cfg *etcdServerProcessConfig) (etcdProcess, error) { + return newProxyEtcdProcess(cfg) +} + +func newProxyEtcdProcess(cfg *etcdServerProcessConfig) (*proxyEtcdProcess, error) { + ep, err := newEtcdServerProcess(cfg) + if err != nil { + return nil, err + } + pep := &proxyEtcdProcess{ + etcdProc: ep, + proxyV2: newProxyV2Proc(cfg), + proxyV3: newProxyV3Proc(cfg), + } + return pep, nil +} + +func (p *proxyEtcdProcess) Config() *etcdServerProcessConfig { return p.etcdProc.Config() } + +func (p *proxyEtcdProcess) EndpointsV2() []string { return p.proxyV2.endpoints() } +func (p *proxyEtcdProcess) EndpointsV3() []string { return p.proxyV3.endpoints() } + +func (p *proxyEtcdProcess) Start() error { + if err := p.etcdProc.Start(); err != nil { + return err + } + if err := p.proxyV2.Start(); err != nil { + return err + } + return p.proxyV3.Start() +} + +func (p *proxyEtcdProcess) Restart() error { + if err := p.etcdProc.Restart(); err != nil { + return err + } + if err := p.proxyV2.Restart(); err != nil { + return err + } + return p.proxyV3.Restart() +} + +func (p *proxyEtcdProcess) Stop() error { + err := p.proxyV2.Stop() + if v3err := p.proxyV3.Stop(); err == nil { + err = v3err + } + if eerr := p.etcdProc.Stop(); eerr != nil && err == nil { + // fails on go-grpc issue #1384 + if !strings.Contains(eerr.Error(), "exit status 2") { + err = eerr + } + } + return err +} + +func (p *proxyEtcdProcess) Close() error { + err := p.proxyV2.Close() + if v3err := p.proxyV3.Close(); err == nil { + err = v3err + } + if eerr := p.etcdProc.Close(); eerr != nil && err == nil { + // fails on go-grpc issue #1384 + if !strings.Contains(eerr.Error(), "exit status 2") { + err = eerr + } + } + return err +} + +func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal { + p.proxyV3.WithStopSignal(sig) + p.proxyV3.WithStopSignal(sig) + return p.etcdProc.WithStopSignal(sig) +} + +type proxyProc struct { + execPath string + args []string + ep string + donec chan struct{} + + proc *expect.ExpectProcess +} + +func (pp *proxyProc) endpoints() []string { return []string{pp.ep} } + +func (pp *proxyProc) start() error { + if pp.proc != nil { + panic("already started") + } + proc, err := spawnCmd(append([]string{pp.execPath}, pp.args...)) + if err != nil { + return err + } + pp.proc = proc + return nil +} + +func (pp *proxyProc) waitReady(readyStr string) error { + defer close(pp.donec) + return waitReadyExpectProc(pp.proc, []string{readyStr}) +} + +func (pp *proxyProc) Stop() error { + if pp.proc == nil { + return nil + } + if err := pp.proc.Stop(); err != nil && !strings.Contains(err.Error(), "exit status 1") { + // v2proxy exits with status 1 on auto tls; not sure why + return err + } + pp.proc = nil + <-pp.donec + pp.donec = make(chan struct{}) + return nil +} + +func (pp *proxyProc) WithStopSignal(sig os.Signal) os.Signal { + ret := pp.proc.StopSignal + pp.proc.StopSignal = sig + return ret +} + +func (pp *proxyProc) Close() error { return pp.Stop() } + +type proxyV2Proc struct { + proxyProc + dataDir string +} + +func proxyListenURL(cfg *etcdServerProcessConfig, portOffset int) string { + u, err := url.Parse(cfg.acurl) + if err != nil { + panic(err) + } + host, port, _ := net.SplitHostPort(u.Host) + p, _ := strconv.ParseInt(port, 10, 16) + u.Host = fmt.Sprintf("%s:%d", host, int(p)+portOffset) + return u.String() +} + +func newProxyV2Proc(cfg *etcdServerProcessConfig) *proxyV2Proc { + listenAddr := proxyListenURL(cfg, 2) + name := fmt.Sprintf("testname-proxy-%p", cfg) + args := []string{ + "--name", name, + "--proxy", "on", + "--listen-client-urls", listenAddr, + "--initial-cluster", cfg.name + "=" + cfg.purl.String(), + } + return &proxyV2Proc{ + proxyProc{ + execPath: cfg.execPath, + args: append(args, cfg.tlsArgs...), + ep: listenAddr, + donec: make(chan struct{}), + }, + name + ".etcd", + } +} + +func (v2p *proxyV2Proc) Start() error { + os.RemoveAll(v2p.dataDir) + if err := v2p.start(); err != nil { + return err + } + return v2p.waitReady("httpproxy: endpoints found") +} + +func (v2p *proxyV2Proc) Restart() error { + if err := v2p.Stop(); err != nil { + return err + } + return v2p.Start() +} + +func (v2p *proxyV2Proc) Stop() error { + if err := v2p.proxyProc.Stop(); err != nil { + return err + } + // v2 proxy caches members; avoid reuse of directory + return os.RemoveAll(v2p.dataDir) +} + +type proxyV3Proc struct { + proxyProc +} + +func newProxyV3Proc(cfg *etcdServerProcessConfig) *proxyV3Proc { + listenAddr := proxyListenURL(cfg, 3) + args := []string{ + "grpc-proxy", + "start", + "--listen-addr", strings.Split(listenAddr, "/")[2], + "--endpoints", cfg.acurl, + // pass-through member RPCs + "--advertise-client-url", "", + } + tlsArgs := []string{} + for i := 0; i < len(cfg.tlsArgs); i++ { + switch cfg.tlsArgs[i] { + case "--cert-file": + tlsArgs = append(tlsArgs, "--cert", cfg.tlsArgs[i+1], "--cert-file", cfg.tlsArgs[i+1]) + i++ + case "--key-file": + tlsArgs = append(tlsArgs, "--key", cfg.tlsArgs[i+1], "--key-file", cfg.tlsArgs[i+1]) + i++ + case "--ca-file": + tlsArgs = append(tlsArgs, "--cacert", cfg.tlsArgs[i+1], "--trusted-ca-file", cfg.tlsArgs[i+1]) + i++ + case "--auto-tls": + tlsArgs = append(tlsArgs, "--auto-tls", "--insecure-skip-tls-verify") + case "--peer-ca-file", "--peer-cert-file", "--peer-key-file": + i++ // skip arg + case "--client-cert-auth", "--peer-auto-tls": + default: + tlsArgs = append(tlsArgs, cfg.tlsArgs[i]) + } + } + return &proxyV3Proc{ + proxyProc{ + execPath: cfg.execPath, + args: append(args, tlsArgs...), + ep: listenAddr, + donec: make(chan struct{}), + }, + } +} + +func (v3p *proxyV3Proc) Restart() error { + if err := v3p.Stop(); err != nil { + return err + } + return v3p.Start() +} + +func (v3p *proxyV3Proc) Start() error { + if err := v3p.start(); err != nil { + return err + } + return v3p.waitReady("listening for grpc-proxy client requests") +} diff --git a/e2e/cluster_test.go b/e2e/cluster_test.go new file mode 100644 index 00000000000..ebd2c265d7e --- /dev/null +++ b/e2e/cluster_test.go @@ -0,0 +1,359 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "fmt" + "io/ioutil" + "net/url" + "os" + "strings" + + "github.com/coreos/etcd/etcdserver" +) + +const etcdProcessBasePort = 20000 + +type clientConnType int + +const ( + clientNonTLS clientConnType = iota + clientTLS + clientTLSAndNonTLS +) + +var ( + configNoTLS = etcdProcessClusterConfig{ + clusterSize: 3, + initialToken: "new", + } + configAutoTLS = etcdProcessClusterConfig{ + clusterSize: 3, + isPeerTLS: true, + isPeerAutoTLS: true, + initialToken: "new", + } + configTLS = etcdProcessClusterConfig{ + clusterSize: 3, + clientTLS: clientTLS, + isPeerTLS: true, + initialToken: "new", + } + configClientTLS = etcdProcessClusterConfig{ + clusterSize: 3, + clientTLS: clientTLS, + initialToken: "new", + } + configClientBoth = etcdProcessClusterConfig{ + clusterSize: 1, + clientTLS: clientTLSAndNonTLS, + initialToken: "new", + } + configClientAutoTLS = etcdProcessClusterConfig{ + clusterSize: 1, + isClientAutoTLS: true, + clientTLS: clientTLS, + initialToken: "new", + } + configPeerTLS = etcdProcessClusterConfig{ + clusterSize: 3, + isPeerTLS: true, + initialToken: "new", + } + configClientTLSCertAuth = etcdProcessClusterConfig{ + clusterSize: 1, + clientTLS: clientTLS, + initialToken: "new", + clientCertAuthEnabled: true, + } +) + +func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig { + ret := cfg + ret.clusterSize = 1 + return &ret +} + +type etcdProcessCluster struct { + cfg *etcdProcessClusterConfig + procs []etcdProcess +} + +type etcdProcessClusterConfig struct { + execPath string + dataDirPath string + keepDataDir bool + + clusterSize int + + baseScheme string + basePort int + + snapCount int // default is 10000 + + clientTLS clientConnType + clientCertAuthEnabled bool + isPeerTLS bool + isPeerAutoTLS bool + isClientAutoTLS bool + isClientCRL bool + + forceNewCluster bool + initialToken string + quotaBackendBytes int64 + noStrictReconfig bool +} + +// newEtcdProcessCluster launches a new cluster from etcd processes, returning +// a new etcdProcessCluster once all nodes are ready to accept client requests. +func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) { + etcdCfgs := cfg.etcdServerProcessConfigs() + epc := &etcdProcessCluster{ + cfg: cfg, + procs: make([]etcdProcess, cfg.clusterSize), + } + + // launch etcd processes + for i := range etcdCfgs { + proc, err := newEtcdProcess(etcdCfgs[i]) + if err != nil { + epc.Close() + return nil, err + } + epc.procs[i] = proc + } + + if err := epc.Start(); err != nil { + return nil, err + } + return epc, nil +} + +func (cfg *etcdProcessClusterConfig) clientScheme() string { + if cfg.clientTLS == clientTLS { + return "https" + } + return "http" +} + +func (cfg *etcdProcessClusterConfig) peerScheme() string { + peerScheme := cfg.baseScheme + if peerScheme == "" { + peerScheme = "http" + } + if cfg.isPeerTLS { + peerScheme += "s" + } + return peerScheme +} + +func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerProcessConfig { + if cfg.basePort == 0 { + cfg.basePort = etcdProcessBasePort + } + if cfg.execPath == "" { + cfg.execPath = binPath + } + if cfg.snapCount == 0 { + cfg.snapCount = etcdserver.DefaultSnapCount + } + + etcdCfgs := make([]*etcdServerProcessConfig, cfg.clusterSize) + initialCluster := make([]string, cfg.clusterSize) + for i := 0; i < cfg.clusterSize; i++ { + var curls []string + var curl, curltls string + port := cfg.basePort + 4*i + curlHost := fmt.Sprintf("localhost:%d", port) + + switch cfg.clientTLS { + case clientNonTLS, clientTLS: + curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String() + curls = []string{curl} + case clientTLSAndNonTLS: + curl = (&url.URL{Scheme: "http", Host: curlHost}).String() + curltls = (&url.URL{Scheme: "https", Host: curlHost}).String() + curls = []string{curl, curltls} + } + + purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)} + name := fmt.Sprintf("testname%d", i) + dataDirPath := cfg.dataDirPath + if cfg.dataDirPath == "" { + var derr error + dataDirPath, derr = ioutil.TempDir("", name+".etcd") + if derr != nil { + panic("could not get tempdir for datadir") + } + } + initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String()) + + args := []string{ + "--name", name, + "--listen-client-urls", strings.Join(curls, ","), + "--advertise-client-urls", strings.Join(curls, ","), + "--listen-peer-urls", purl.String(), + "--initial-advertise-peer-urls", purl.String(), + "--initial-cluster-token", cfg.initialToken, + "--data-dir", dataDirPath, + "--snapshot-count", fmt.Sprintf("%d", cfg.snapCount), + } + if cfg.forceNewCluster { + args = append(args, "--force-new-cluster") + } + if cfg.quotaBackendBytes > 0 { + args = append(args, + "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes), + ) + } + if cfg.noStrictReconfig { + args = append(args, "--strict-reconfig-check=false") + } + + args = append(args, cfg.tlsArgs()...) + etcdCfgs[i] = &etcdServerProcessConfig{ + execPath: cfg.execPath, + args: args, + tlsArgs: cfg.tlsArgs(), + dataDirPath: dataDirPath, + keepDataDir: cfg.keepDataDir, + name: name, + purl: purl, + acurl: curl, + initialToken: cfg.initialToken, + } + } + + initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")} + for i := range etcdCfgs { + etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",") + etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...) + } + + return etcdCfgs +} + +func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) { + if cfg.clientTLS != clientNonTLS { + if cfg.isClientAutoTLS { + args = append(args, "--auto-tls") + } else { + tlsClientArgs := []string{ + "--cert-file", certPath, + "--key-file", privateKeyPath, + "--ca-file", caPath, + } + args = append(args, tlsClientArgs...) + + if cfg.clientCertAuthEnabled { + args = append(args, "--client-cert-auth") + } + } + } + + if cfg.isPeerTLS { + if cfg.isPeerAutoTLS { + args = append(args, "--peer-auto-tls") + } else { + tlsPeerArgs := []string{ + "--peer-cert-file", certPath, + "--peer-key-file", privateKeyPath, + "--peer-ca-file", caPath, + } + args = append(args, tlsPeerArgs...) + } + } + + if cfg.isClientCRL { + args = append(args, "--client-crl-file", crlPath, "--client-cert-auth") + } + + return args +} + +func (epc *etcdProcessCluster) EndpointsV2() []string { + return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV2() }) +} + +func (epc *etcdProcessCluster) EndpointsV3() []string { + return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV3() }) +} + +func (epc *etcdProcessCluster) endpoints(f func(ep etcdProcess) []string) (ret []string) { + for _, p := range epc.procs { + ret = append(ret, f(p)...) + } + return ret +} + +func (epc *etcdProcessCluster) Start() error { + return epc.start(func(ep etcdProcess) error { return ep.Start() }) +} + +func (epc *etcdProcessCluster) Restart() error { + return epc.start(func(ep etcdProcess) error { return ep.Restart() }) +} + +func (epc *etcdProcessCluster) start(f func(ep etcdProcess) error) error { + readyC := make(chan error, len(epc.procs)) + for i := range epc.procs { + go func(n int) { readyC <- f(epc.procs[n]) }(i) + } + for range epc.procs { + if err := <-readyC; err != nil { + epc.Close() + return err + } + } + return nil +} + +func (epc *etcdProcessCluster) Stop() (err error) { + for _, p := range epc.procs { + if p == nil { + continue + } + if curErr := p.Stop(); curErr != nil { + if err != nil { + err = fmt.Errorf("%v; %v", err, curErr) + } else { + err = curErr + } + } + } + return err +} + +func (epc *etcdProcessCluster) Close() error { + err := epc.Stop() + for _, p := range epc.procs { + // p is nil when newEtcdProcess fails in the middle + // Close still gets called to clean up test data + if p == nil { + continue + } + if cerr := p.Close(); cerr != nil { + err = cerr + } + } + return err +} + +func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) { + for _, p := range epc.procs { + ret = p.WithStopSignal(sig) + } + return ret +} diff --git a/e2e/ctl_v2_test.go b/e2e/ctl_v2_test.go index c1635887dbe..f986eb1f311 100644 --- a/e2e/ctl_v2_test.go +++ b/e2e/ctl_v2_test.go @@ -128,10 +128,9 @@ func testCtlV2Ls(t *testing.T, cfg *etcdProcessClusterConfig, quorum bool) { } } -func TestCtlV2Watch(t *testing.T) { testCtlV2Watch(t, &configNoTLS, false) } -func TestCtlV2WatchTLS(t *testing.T) { testCtlV2Watch(t, &configTLS, false) } -func TestCtlV2WatchWithProxy(t *testing.T) { testCtlV2Watch(t, &configWithProxy, false) } -func TestCtlV2WatchWithProxyNoSync(t *testing.T) { testCtlV2Watch(t, &configWithProxy, true) } +func TestCtlV2Watch(t *testing.T) { testCtlV2Watch(t, &configNoTLS, false) } +func TestCtlV2WatchTLS(t *testing.T) { testCtlV2Watch(t, &configTLS, false) } + func testCtlV2Watch(t *testing.T, cfg *etcdProcessClusterConfig, noSync bool) { defer testutil.AfterTest(t) @@ -158,12 +157,10 @@ func testCtlV2Watch(t *testing.T, cfg *etcdProcessClusterConfig, noSync bool) { } } -func TestCtlV2GetRoleUser(t *testing.T) { testCtlV2GetRoleUser(t, &configNoTLS) } -func TestCtlV2GetRoleUserWithProxy(t *testing.T) { testCtlV2GetRoleUser(t, &configWithProxy) } -func testCtlV2GetRoleUser(t *testing.T, cfg *etcdProcessClusterConfig) { +func TestCtlV2GetRoleUser(t *testing.T) { defer testutil.AfterTest(t) - epc := setupEtcdctlTest(t, cfg, false) + epc := setupEtcdctlTest(t, &configNoTLS, false) defer func() { if err := epc.Close(); err != nil { t.Fatalf("error closing etcd processes (%v)", err) @@ -196,7 +193,7 @@ func TestCtlV2UserListRoot(t *testing.T) { testCtlV2UserList(t, "root") } func testCtlV2UserList(t *testing.T, username string) { defer testutil.AfterTest(t) - epc := setupEtcdctlTest(t, &configWithProxy, false) + epc := setupEtcdctlTest(t, &configNoTLS, false) defer func() { if err := epc.Close(); err != nil { t.Fatalf("error closing etcd processes (%v)", err) @@ -214,7 +211,7 @@ func testCtlV2UserList(t *testing.T, username string) { func TestCtlV2RoleList(t *testing.T) { defer testutil.AfterTest(t) - epc := setupEtcdctlTest(t, &configWithProxy, false) + epc := setupEtcdctlTest(t, &configNoTLS, false) defer func() { if err := epc.Close(); err != nil { t.Fatalf("error closing etcd processes (%v)", err) @@ -243,7 +240,7 @@ func TestCtlV2Backup(t *testing.T) { // For https://github.com/coreos/etcd/issue t.Fatal(err) } - if err := etcdctlBackup(epc1, epc1.procs[0].cfg.dataDirPath, backupDir); err != nil { + if err := etcdctlBackup(epc1, epc1.procs[0].Config().dataDirPath, backupDir); err != nil { t.Fatal(err) } @@ -350,16 +347,7 @@ func TestCtlV2ClusterHealth(t *testing.T) { } func etcdctlPrefixArgs(clus *etcdProcessCluster) []string { - endpoints := "" - if proxies := clus.proxies(); len(proxies) != 0 { - endpoints = proxies[0].cfg.acurl - } else if processes := clus.processes(); len(processes) != 0 { - es := []string{} - for _, b := range processes { - es = append(es, b.cfg.acurl) - } - endpoints = strings.Join(es, ",") - } + endpoints := strings.Join(clus.EndpointsV2(), ",") cmdArgs := []string{ctlBinPath, "--endpoints", endpoints} if clus.cfg.clientTLS == clientTLS { cmdArgs = append(cmdArgs, "--ca-file", caPath, "--cert-file", certPath, "--key-file", privateKeyPath) diff --git a/e2e/ctl_v3_alarm_test.go b/e2e/ctl_v3_alarm_test.go index 78dfeaddcdd..50baae5e9e7 100644 --- a/e2e/ctl_v3_alarm_test.go +++ b/e2e/ctl_v3_alarm_test.go @@ -64,7 +64,7 @@ func alarmTest(cx ctlCtx) { } } - eps := cx.epc.grpcEndpoints() + eps := cx.epc.EndpointsV3() // get latest revision to compact cli, err := clientv3.New(clientv3.Config{ diff --git a/e2e/ctl_v3_auth_test.go b/e2e/ctl_v3_auth_test.go index cf3043ccf09..e8fc31f81d5 100644 --- a/e2e/ctl_v3_auth_test.go +++ b/e2e/ctl_v3_auth_test.go @@ -12,10 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Skip proxy tests for now since auth is broken on grpcproxy. +// +build !cluster_proxy + package e2e import ( "fmt" + "os" "testing" "github.com/coreos/etcd/clientv3" @@ -44,6 +48,12 @@ func TestCtlV3AuthRoleGet(t *testing.T) { testCtl(t, authTestRoleGet) } func TestCtlV3AuthUserGet(t *testing.T) { testCtl(t, authTestUserGet) } func TestCtlV3AuthRoleList(t *testing.T) { testCtl(t, authTestRoleList) } +func TestCtlV3AuthDefrag(t *testing.T) { testCtl(t, authTestDefrag) } +func TestCtlV3AuthEndpointHealth(t *testing.T) { + testCtl(t, authTestEndpointHealth, withQuorum()) +} +func TestCtlV3AuthSnapshot(t *testing.T) { testCtl(t, authTestSnapshot) } + func authEnableTest(cx ctlCtx) { if err := authEnable(cx); err != nil { cx.t.Fatal(err) @@ -816,3 +826,92 @@ func authTestRoleList(cx ctlCtx) { cx.t.Fatal(err) } } + +func authTestDefrag(cx ctlCtx) { + maintenanceInitKeys(cx) + + if err := authEnable(cx); err != nil { + cx.t.Fatal(err) + } + + cx.user, cx.pass = "root", "root" + authSetupTestUser(cx) + + // ordinary user cannot defrag + cx.user, cx.pass = "test-user", "pass" + if err := ctlV3Defrag(cx); err == nil { + cx.t.Fatal("ordinary user should not be able to issue a defrag request") + } + + // root can defrag + cx.user, cx.pass = "root", "root" + if err := ctlV3Defrag(cx); err != nil { + cx.t.Fatal(err) + } +} + +func authTestSnapshot(cx ctlCtx) { + maintenanceInitKeys(cx) + + if err := authEnable(cx); err != nil { + cx.t.Fatal(err) + } + + cx.user, cx.pass = "root", "root" + authSetupTestUser(cx) + + fpath := "test.snapshot" + defer os.RemoveAll(fpath) + + // ordinary user cannot save a snapshot + cx.user, cx.pass = "test-user", "pass" + if err := ctlV3SnapshotSave(cx, fpath); err == nil { + cx.t.Fatal("ordinary user should not be able to save a snapshot") + } + + // root can save a snapshot + cx.user, cx.pass = "root", "root" + if err := ctlV3SnapshotSave(cx, fpath); err != nil { + cx.t.Fatalf("snapshotTest ctlV3SnapshotSave error (%v)", err) + } + + st, err := getSnapshotStatus(cx, fpath) + if err != nil { + cx.t.Fatalf("snapshotTest getSnapshotStatus error (%v)", err) + } + if st.Revision != 4 { + cx.t.Fatalf("expected 4, got %d", st.Revision) + } + if st.TotalKey < 3 { + cx.t.Fatalf("expected at least 3, got %d", st.TotalKey) + } +} + +func authTestEndpointHealth(cx ctlCtx) { + if err := authEnable(cx); err != nil { + cx.t.Fatal(err) + } + + cx.user, cx.pass = "root", "root" + authSetupTestUser(cx) + + if err := ctlV3EndpointHealth(cx); err != nil { + cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err) + } + + // health checking with an ordinary user "succeeds" since permission denial goes through consensus + cx.user, cx.pass = "test-user", "pass" + if err := ctlV3EndpointHealth(cx); err != nil { + cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err) + } + + // succeed if permissions granted for ordinary user + cx.user, cx.pass = "root", "root" + if err := ctlV3RoleGrantPermission(cx, "test-role", grantingPerm{true, true, "health", "", false}); err != nil { + cx.t.Fatal(err) + } + cx.user, cx.pass = "test-user", "pass" + if err := ctlV3EndpointHealth(cx); err != nil { + cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err) + } +} diff --git a/e2e/ctl_v3_defrag_test.go b/e2e/ctl_v3_defrag_test.go index cc197d36289..64c3bb9f0c3 100644 --- a/e2e/ctl_v3_defrag_test.go +++ b/e2e/ctl_v3_defrag_test.go @@ -16,8 +16,7 @@ package e2e import "testing" -func TestCtlV3Defrag(t *testing.T) { testCtl(t, defragTest) } -func TestCtlV3DefragWithAuth(t *testing.T) { testCtl(t, defragTestWithAuth) } +func TestCtlV3Defrag(t *testing.T) { testCtl(t, defragTest) } func maintenanceInitKeys(cx ctlCtx) { var kvs = []kv{{"key", "val1"}, {"key", "val2"}, {"key", "val3"}} @@ -40,29 +39,6 @@ func defragTest(cx ctlCtx) { } } -func defragTestWithAuth(cx ctlCtx) { - maintenanceInitKeys(cx) - - if err := authEnable(cx); err != nil { - cx.t.Fatal(err) - } - - cx.user, cx.pass = "root", "root" - authSetupTestUser(cx) - - // ordinary user cannot defrag - cx.user, cx.pass = "test-user", "pass" - if err := ctlV3Defrag(cx); err == nil { - cx.t.Fatal("ordinary user should not be able to issue a defrag request") - } - - // root can defrag - cx.user, cx.pass = "root", "root" - if err := ctlV3Defrag(cx); err != nil { - cx.t.Fatal(err) - } -} - func ctlV3Defrag(cx ctlCtx) error { cmdArgs := append(cx.PrefixArgs(), "defrag") lines := make([]string, cx.epc.cfg.clusterSize) diff --git a/e2e/ctl_v3_elect_test.go b/e2e/ctl_v3_elect_test.go index 2c9c986d3ac..410c00f813c 100644 --- a/e2e/ctl_v3_elect_test.go +++ b/e2e/ctl_v3_elect_test.go @@ -33,8 +33,8 @@ func TestCtlV3Elect(t *testing.T) { func testElect(cx ctlCtx) { // debugging for #6934 - sig := cx.epc.withStopSignal(debugLockSignal) - defer cx.epc.withStopSignal(sig) + sig := cx.epc.WithStopSignal(debugLockSignal) + defer cx.epc.WithStopSignal(sig) name := "a" diff --git a/e2e/ctl_v3_endpoint_test.go b/e2e/ctl_v3_endpoint_test.go index 3a42c1c9b4a..74a2ebb7a1d 100644 --- a/e2e/ctl_v3_endpoint_test.go +++ b/e2e/ctl_v3_endpoint_test.go @@ -21,9 +21,6 @@ import ( func TestCtlV3EndpointHealth(t *testing.T) { testCtl(t, endpointHealthTest, withQuorum()) } func TestCtlV3EndpointStatus(t *testing.T) { testCtl(t, endpointStatusTest, withQuorum()) } -func TestCtlV3EndpointHealthWithAuth(t *testing.T) { - testCtl(t, endpointHealthTestWithAuth, withQuorum()) -} func endpointHealthTest(cx ctlCtx) { if err := ctlV3EndpointHealth(cx); err != nil { @@ -49,38 +46,9 @@ func endpointStatusTest(cx ctlCtx) { func ctlV3EndpointStatus(cx ctlCtx) error { cmdArgs := append(cx.PrefixArgs(), "endpoint", "status") var eps []string - for _, ep := range cx.epc.endpoints() { + for _, ep := range cx.epc.EndpointsV3() { u, _ := url.Parse(ep) eps = append(eps, u.Host) } return spawnWithExpects(cmdArgs, eps...) } - -func endpointHealthTestWithAuth(cx ctlCtx) { - if err := authEnable(cx); err != nil { - cx.t.Fatal(err) - } - - cx.user, cx.pass = "root", "root" - authSetupTestUser(cx) - - if err := ctlV3EndpointHealth(cx); err != nil { - cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err) - } - - // health checking with an ordinary user "succeeds" since permission denial goes through consensus - cx.user, cx.pass = "test-user", "pass" - if err := ctlV3EndpointHealth(cx); err != nil { - cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err) - } - - // succeed if permissions granted for ordinary user - cx.user, cx.pass = "root", "root" - if err := ctlV3RoleGrantPermission(cx, "test-role", grantingPerm{true, true, "health", "", false}); err != nil { - cx.t.Fatal(err) - } - cx.user, cx.pass = "test-user", "pass" - if err := ctlV3EndpointHealth(cx); err != nil { - cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err) - } -} diff --git a/e2e/ctl_v3_lock_test.go b/e2e/ctl_v3_lock_test.go index 416f26d8a41..ddda3fc03cc 100644 --- a/e2e/ctl_v3_lock_test.go +++ b/e2e/ctl_v3_lock_test.go @@ -56,8 +56,8 @@ func TestCtlV3Lock(t *testing.T) { func testLock(cx ctlCtx) { // debugging for #6464 - sig := cx.epc.withStopSignal(debugLockSignal) - defer cx.epc.withStopSignal(sig) + sig := cx.epc.WithStopSignal(debugLockSignal) + defer cx.epc.WithStopSignal(sig) name := "a" diff --git a/e2e/ctl_v3_migrate_test.go b/e2e/ctl_v3_migrate_test.go index 3136c4920f8..7252fe0a3a0 100644 --- a/e2e/ctl_v3_migrate_test.go +++ b/e2e/ctl_v3_migrate_test.go @@ -48,8 +48,8 @@ func TestCtlV3Migrate(t *testing.T) { } } - dataDir := epc.procs[0].cfg.dataDirPath - if err := epc.StopAll(); err != nil { + dataDir := epc.procs[0].Config().dataDirPath + if err := epc.Stop(); err != nil { t.Fatalf("error closing etcd processes (%v)", err) } @@ -65,8 +65,8 @@ func TestCtlV3Migrate(t *testing.T) { t.Fatal(err) } - epc.procs[0].cfg.keepDataDir = true - if err := epc.RestartAll(); err != nil { + epc.procs[0].Config().keepDataDir = true + if err := epc.Restart(); err != nil { t.Fatal(err) } @@ -75,7 +75,7 @@ func TestCtlV3Migrate(t *testing.T) { t.Fatal(err) } cli, err := clientv3.New(clientv3.Config{ - Endpoints: epc.grpcEndpoints(), + Endpoints: epc.EndpointsV3(), DialTimeout: 3 * time.Second, }) if err != nil { diff --git a/e2e/ctl_v3_move_leader_test.go b/e2e/ctl_v3_move_leader_test.go index ec2f134d5e3..eb2afda5f0c 100644 --- a/e2e/ctl_v3_move_leader_test.go +++ b/e2e/ctl_v3_move_leader_test.go @@ -39,7 +39,7 @@ func TestCtlV3MoveLeader(t *testing.T) { var leadIdx int var leaderID uint64 var transferee uint64 - for i, ep := range epc.grpcEndpoints() { + for i, ep := range epc.EndpointsV3() { cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{ep}, DialTimeout: 3 * time.Second, @@ -75,11 +75,11 @@ func TestCtlV3MoveLeader(t *testing.T) { expect string }{ { // request to non-leader - cx.prefixArgs([]string{cx.epc.grpcEndpoints()[(leadIdx+1)%3]}), + cx.prefixArgs([]string{cx.epc.EndpointsV3()[(leadIdx+1)%3]}), "no leader endpoint given at ", }, { // request to leader - cx.prefixArgs([]string{cx.epc.grpcEndpoints()[leadIdx]}), + cx.prefixArgs([]string{cx.epc.EndpointsV3()[leadIdx]}), fmt.Sprintf("Leadership transferred from %s to %s", types.ID(leaderID), types.ID(transferee)), }, } diff --git a/e2e/ctl_v3_snapshot_test.go b/e2e/ctl_v3_snapshot_test.go index d2394885d62..234d5b037de 100644 --- a/e2e/ctl_v3_snapshot_test.go +++ b/e2e/ctl_v3_snapshot_test.go @@ -152,7 +152,7 @@ func TestIssue6361(t *testing.T) { }() dialTimeout := 7 * time.Second - prefixArgs := []string{ctlBinPath, "--endpoints", strings.Join(epc.grpcEndpoints(), ","), "--dial-timeout", dialTimeout.String()} + prefixArgs := []string{ctlBinPath, "--endpoints", strings.Join(epc.EndpointsV3(), ","), "--dial-timeout", dialTimeout.String()} // write some keys kvs := []kv{{"foo1", "val1"}, {"foo2", "val2"}, {"foo3", "val3"}} @@ -170,7 +170,7 @@ func TestIssue6361(t *testing.T) { t.Fatal(err) } - if err = epc.processes()[0].Stop(); err != nil { + if err = epc.procs[0].Stop(); err != nil { t.Fatal(err) } @@ -178,19 +178,19 @@ func TestIssue6361(t *testing.T) { defer os.RemoveAll(newDataDir) // etcdctl restore the snapshot - err = spawnWithExpect([]string{ctlBinPath, "snapshot", "restore", fpath, "--name", epc.procs[0].cfg.name, "--initial-cluster", epc.procs[0].cfg.initialCluster, "--initial-cluster-token", epc.procs[0].cfg.initialToken, "--initial-advertise-peer-urls", epc.procs[0].cfg.purl.String(), "--data-dir", newDataDir}, "membership: added member") + err = spawnWithExpect([]string{ctlBinPath, "snapshot", "restore", fpath, "--name", epc.procs[0].Config().name, "--initial-cluster", epc.procs[0].Config().initialCluster, "--initial-cluster-token", epc.procs[0].Config().initialToken, "--initial-advertise-peer-urls", epc.procs[0].Config().purl.String(), "--data-dir", newDataDir}, "membership: added member") if err != nil { t.Fatal(err) } // start the etcd member using the restored snapshot - epc.procs[0].cfg.dataDirPath = newDataDir - for i := range epc.procs[0].cfg.args { - if epc.procs[0].cfg.args[i] == "--data-dir" { - epc.procs[0].cfg.args[i+1] = newDataDir + epc.procs[0].Config().dataDirPath = newDataDir + for i := range epc.procs[0].Config().args { + if epc.procs[0].Config().args[i] == "--data-dir" { + epc.procs[0].Config().args[i+1] = newDataDir } } - if err = epc.processes()[0].Restart(); err != nil { + if err = epc.procs[0].Restart(); err != nil { t.Fatal(err) } @@ -217,11 +217,11 @@ func TestIssue6361(t *testing.T) { defer os.RemoveAll(newDataDir2) name2 := "infra2" - initialCluster2 := epc.procs[0].cfg.initialCluster + fmt.Sprintf(",%s=%s", name2, peerURL) + initialCluster2 := epc.procs[0].Config().initialCluster + fmt.Sprintf(",%s=%s", name2, peerURL) // start the new member var nepc *expect.ExpectProcess - nepc, err = spawnCmd([]string{epc.procs[0].cfg.execPath, "--name", name2, + nepc, err = spawnCmd([]string{epc.procs[0].Config().execPath, "--name", name2, "--listen-client-urls", clientURL, "--advertise-client-urls", clientURL, "--listen-peer-urls", peerURL, "--initial-advertise-peer-urls", peerURL, "--initial-cluster", initialCluster2, "--initial-cluster-state", "existing", "--data-dir", newDataDir2}) @@ -245,42 +245,3 @@ func TestIssue6361(t *testing.T) { t.Fatal(err) } } - -func TestCtlV3SnapshotWithAuth(t *testing.T) { testCtl(t, snapshotTestWithAuth) } - -func snapshotTestWithAuth(cx ctlCtx) { - maintenanceInitKeys(cx) - - if err := authEnable(cx); err != nil { - cx.t.Fatal(err) - } - - cx.user, cx.pass = "root", "root" - authSetupTestUser(cx) - - fpath := "test.snapshot" - defer os.RemoveAll(fpath) - - // ordinary user cannot save a snapshot - cx.user, cx.pass = "test-user", "pass" - if err := ctlV3SnapshotSave(cx, fpath); err == nil { - cx.t.Fatal("ordinary user should not be able to save a snapshot") - } - - // root can save a snapshot - cx.user, cx.pass = "root", "root" - if err := ctlV3SnapshotSave(cx, fpath); err != nil { - cx.t.Fatalf("snapshotTest ctlV3SnapshotSave error (%v)", err) - } - - st, err := getSnapshotStatus(cx, fpath) - if err != nil { - cx.t.Fatalf("snapshotTest getSnapshotStatus error (%v)", err) - } - if st.Revision != 4 { - cx.t.Fatalf("expected 4, got %d", st.Revision) - } - if st.TotalKey < 3 { - cx.t.Fatalf("expected at least 3, got %d", st.TotalKey) - } -} diff --git a/e2e/ctl_v3_test.go b/e2e/ctl_v3_test.go index a840aebc14a..45e2abeecb0 100644 --- a/e2e/ctl_v3_test.go +++ b/e2e/ctl_v3_test.go @@ -45,7 +45,7 @@ func TestCtlV3DialWithHTTPScheme(t *testing.T) { } func dialWithSchemeTest(cx ctlCtx) { - cmdArgs := append(cx.prefixArgs(cx.epc.endpoints()), "put", "foo", "bar") + cmdArgs := append(cx.prefixArgs(cx.epc.EndpointsV3()), "put", "foo", "bar") if err := spawnWithExpect(cmdArgs, "OK"); err != nil { cx.t.Fatal(err) } @@ -169,10 +169,6 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) { } func (cx *ctlCtx) prefixArgs(eps []string) []string { - if len(cx.epc.proxies()) > 0 { // TODO: add proxy check as in v2 - panic("v3 proxy not implemented") - } - fmap := make(map[string]string) fmap["endpoints"] = strings.Join(eps, ",") fmap["dial-timeout"] = cx.dialTimeout.String() @@ -212,7 +208,7 @@ func (cx *ctlCtx) prefixArgs(eps []string) []string { // PrefixArgs prefixes etcdctl command. // Make sure to unset environment variables after tests. func (cx *ctlCtx) PrefixArgs() []string { - return cx.prefixArgs(cx.epc.grpcEndpoints()) + return cx.prefixArgs(cx.epc.EndpointsV3()) } func isGRPCTimedout(err error) bool { diff --git a/e2e/etcd_config_test.go b/e2e/etcd_config_test.go index 9e308c3bcb9..e7531866ade 100644 --- a/e2e/etcd_config_test.go +++ b/e2e/etcd_config_test.go @@ -25,7 +25,7 @@ func TestEtcdExampleConfig(t *testing.T) { if err != nil { t.Fatal(err) } - if err = waitReadyExpectProc(proc, false); err != nil { + if err = waitReadyExpectProc(proc, etcdServerReadyLines); err != nil { t.Fatal(err) } if err = proc.Stop(); err != nil { diff --git a/e2e/etcd_process.go b/e2e/etcd_process.go new file mode 100644 index 00000000000..cfde0255a6e --- /dev/null +++ b/e2e/etcd_process.go @@ -0,0 +1,134 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "fmt" + "net/url" + "os" + + "github.com/coreos/etcd/pkg/expect" + "github.com/coreos/etcd/pkg/fileutil" +) + +var etcdServerReadyLines = []string{"enabled capabilities for version", "published"} + +// etcdProcess is a process that serves etcd requests. +type etcdProcess interface { + EndpointsV2() []string + EndpointsV3() []string + + Start() error + Restart() error + Stop() error + Close() error + WithStopSignal(sig os.Signal) os.Signal + Config() *etcdServerProcessConfig +} + +type etcdServerProcess struct { + cfg *etcdServerProcessConfig + proc *expect.ExpectProcess + donec chan struct{} // closed when Interact() terminates +} + +type etcdServerProcessConfig struct { + execPath string + args []string + tlsArgs []string + + dataDirPath string + keepDataDir bool + + name string + + purl url.URL + + acurl string + + initialToken string + initialCluster string +} + +func newEtcdServerProcess(cfg *etcdServerProcessConfig) (*etcdServerProcess, error) { + if !fileutil.Exist(cfg.execPath) { + return nil, fmt.Errorf("could not find etcd binary") + } + if !cfg.keepDataDir { + if err := os.RemoveAll(cfg.dataDirPath); err != nil { + return nil, err + } + } + return &etcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil +} + +func (ep *etcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.acurl} } +func (ep *etcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() } + +func (ep *etcdServerProcess) Start() error { + if ep.proc != nil { + panic("already started") + } + proc, err := spawnCmd(append([]string{ep.cfg.execPath}, ep.cfg.args...)) + if err != nil { + return err + } + ep.proc = proc + return ep.waitReady() +} + +func (ep *etcdServerProcess) Restart() error { + if err := ep.Stop(); err != nil { + return err + } + ep.donec = make(chan struct{}) + return ep.Start() +} + +func (ep *etcdServerProcess) Stop() error { + if ep == nil || ep.proc == nil { + return nil + } + if err := ep.proc.Stop(); err != nil { + return err + } + ep.proc = nil + <-ep.donec + ep.donec = make(chan struct{}) + if ep.cfg.purl.Scheme == "unix" || ep.cfg.purl.Scheme == "unixs" { + os.Remove(ep.cfg.purl.Host + ep.cfg.purl.Path) + } + return nil +} + +func (ep *etcdServerProcess) Close() error { + if err := ep.Stop(); err != nil { + return err + } + return os.RemoveAll(ep.cfg.dataDirPath) +} + +func (ep *etcdServerProcess) WithStopSignal(sig os.Signal) os.Signal { + ret := ep.proc.StopSignal + ep.proc.StopSignal = sig + return ret +} + +func (ep *etcdServerProcess) waitReady() error { + defer close(ep.donec) + return waitReadyExpectProc(ep.proc, etcdServerReadyLines) +} + +func (ep *etcdServerProcess) Config() *etcdServerProcessConfig { return ep.cfg } diff --git a/e2e/etcd_release_upgrade_test.go b/e2e/etcd_release_upgrade_test.go index af958e89213..6b1d42323e7 100644 --- a/e2e/etcd_release_upgrade_test.go +++ b/e2e/etcd_release_upgrade_test.go @@ -88,8 +88,8 @@ func TestReleaseUpgrade(t *testing.T) { if err := epc.procs[i].Stop(); err != nil { t.Fatalf("#%d: error closing etcd process (%v)", i, err) } - epc.procs[i].cfg.execPath = binDir + "/etcd" - epc.procs[i].cfg.keepDataDir = true + epc.procs[i].Config().execPath = binDir + "/etcd" + epc.procs[i].Config().keepDataDir = true if err := epc.procs[i].Restart(); err != nil { t.Fatalf("error restarting etcd process (%v)", err) @@ -155,8 +155,8 @@ func TestReleaseUpgradeWithRestart(t *testing.T) { wg.Add(len(epc.procs)) for i := range epc.procs { go func(i int) { - epc.procs[i].cfg.execPath = binDir + "/etcd" - epc.procs[i].cfg.keepDataDir = true + epc.procs[i].Config().execPath = binDir + "/etcd" + epc.procs[i].Config().keepDataDir = true if err := epc.procs[i].Restart(); err != nil { t.Fatalf("error restarting etcd process (%v)", err) } diff --git a/e2e/etcd_spawn_cov.go b/e2e/etcd_spawn_cov.go index 6032ef2018b..ca45a571efc 100644 --- a/e2e/etcd_spawn_cov.go +++ b/e2e/etcd_spawn_cov.go @@ -33,20 +33,7 @@ const noOutputLineCount = 2 // cov-enabled binaries emit PASS and coverage count func spawnCmd(args []string) (*expect.ExpectProcess, error) { if args[0] == binPath { - covArgs, err := getCovArgs() - if err != nil { - return nil, err - } - ep, err := expect.NewExpectWithEnv(binDir+"/etcd_test", covArgs, args2env(args[1:])) - if err != nil { - return nil, err - } - // ep sends SIGTERM to etcd_test process on ep.close() - // allowing the process to exit gracefully in order to generate a coverage report. - // note: go runtime ignores SIGINT but not SIGTERM - // if e2e test is run as a background process. - ep.StopSignal = syscall.SIGTERM - return ep, nil + return spawnEtcd(args) } if args[0] == ctlBinPath { @@ -73,6 +60,32 @@ func spawnCmd(args []string) (*expect.ExpectProcess, error) { return expect.NewExpect(args[0], args[1:]...) } +func spawnEtcd(args []string) (*expect.ExpectProcess, error) { + covArgs, err := getCovArgs() + if err != nil { + return nil, err + } + + env := []string{} + if args[1] == "grpc-proxy" { + // avoid test flag conflicts in coverage enabled etcd by putting flags in ETCDCOV_ARGS + env = append(os.Environ(), "ETCDCOV_ARGS="+strings.Join(args, "\xe7\xcd")) + } else { + env = args2env(args[1:]) + } + + ep, err := expect.NewExpectWithEnv(binDir+"/etcd_test", covArgs, env) + if err != nil { + return nil, err + } + // ep sends SIGTERM to etcd_test process on ep.close() + // allowing the process to exit gracefully in order to generate a coverage report. + // note: go runtime ignores SIGINT but not SIGTERM + // if e2e test is run as a background process. + ep.StopSignal = syscall.SIGTERM + return ep, nil +} + func getCovArgs() ([]string, error) { coverPath := os.Getenv("COVERDIR") if !filepath.IsAbs(coverPath) { @@ -92,7 +105,7 @@ func getCovArgs() ([]string, error) { func args2env(args []string) []string { var covEnvs []string - for i := range args[1:] { + for i := range args { if !strings.HasPrefix(args[i], "--") { continue } diff --git a/e2e/etcd_test.go b/e2e/etcd_test.go deleted file mode 100644 index 69e76985521..00000000000 --- a/e2e/etcd_test.go +++ /dev/null @@ -1,593 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package e2e - -import ( - "fmt" - "io/ioutil" - "net/url" - "os" - "strings" - "time" - - "github.com/coreos/etcd/etcdserver" - "github.com/coreos/etcd/pkg/expect" - "github.com/coreos/etcd/pkg/fileutil" -) - -const etcdProcessBasePort = 20000 - -var ( - binPath string - ctlBinPath string - certPath string - privateKeyPath string - caPath string - - crlPath string - revokedCertPath string - revokedPrivateKeyPath string -) - -type clientConnType int - -const ( - clientNonTLS clientConnType = iota - clientTLS - clientTLSAndNonTLS -) - -var ( - configNoTLS = etcdProcessClusterConfig{ - clusterSize: 3, - proxySize: 0, - initialToken: "new", - } - configAutoTLS = etcdProcessClusterConfig{ - clusterSize: 3, - isPeerTLS: true, - isPeerAutoTLS: true, - initialToken: "new", - } - configTLS = etcdProcessClusterConfig{ - clusterSize: 3, - proxySize: 0, - clientTLS: clientTLS, - isPeerTLS: true, - initialToken: "new", - } - configClientTLS = etcdProcessClusterConfig{ - clusterSize: 3, - proxySize: 0, - clientTLS: clientTLS, - initialToken: "new", - } - configClientBoth = etcdProcessClusterConfig{ - clusterSize: 1, - proxySize: 0, - clientTLS: clientTLSAndNonTLS, - initialToken: "new", - } - configClientAutoTLS = etcdProcessClusterConfig{ - clusterSize: 1, - proxySize: 0, - isClientAutoTLS: true, - clientTLS: clientTLS, - initialToken: "new", - } - configPeerTLS = etcdProcessClusterConfig{ - clusterSize: 3, - proxySize: 0, - isPeerTLS: true, - initialToken: "new", - } - configWithProxy = etcdProcessClusterConfig{ - clusterSize: 3, - proxySize: 1, - initialToken: "new", - } - configWithProxyTLS = etcdProcessClusterConfig{ - clusterSize: 3, - proxySize: 1, - clientTLS: clientTLS, - isPeerTLS: true, - initialToken: "new", - } - configWithProxyPeerTLS = etcdProcessClusterConfig{ - clusterSize: 3, - proxySize: 1, - isPeerTLS: true, - initialToken: "new", - } - configClientTLSCertAuth = etcdProcessClusterConfig{ - clusterSize: 1, - proxySize: 0, - clientTLS: clientTLS, - initialToken: "new", - clientCertAuthEnabled: true, - } -) - -func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig { - ret := cfg - ret.clusterSize = 1 - return &ret -} - -type etcdProcessCluster struct { - cfg *etcdProcessClusterConfig - procs []*etcdProcess -} - -type etcdProcess struct { - cfg *etcdProcessConfig - proc *expect.ExpectProcess - donec chan struct{} // closed when Interact() terminates -} - -type etcdProcessConfig struct { - execPath string - args []string - - dataDirPath string - keepDataDir bool - - name string - - purl url.URL - - acurl string - // additional url for tls connection when the etcd process - // serves both http and https - acurltls string - acurlHost string - - initialToken string - initialCluster string - - isProxy bool -} - -type etcdProcessClusterConfig struct { - execPath string - dataDirPath string - keepDataDir bool - - clusterSize int - - baseScheme string - basePort int - - proxySize int - - snapCount int // default is 10000 - - clientTLS clientConnType - clientCertAuthEnabled bool - isPeerTLS bool - isPeerAutoTLS bool - isClientAutoTLS bool - isClientCRL bool - - forceNewCluster bool - initialToken string - quotaBackendBytes int64 - noStrictReconfig bool -} - -// newEtcdProcessCluster launches a new cluster from etcd processes, returning -// a new etcdProcessCluster once all nodes are ready to accept client requests. -func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) { - etcdCfgs := cfg.etcdProcessConfigs() - epc := &etcdProcessCluster{ - cfg: cfg, - procs: make([]*etcdProcess, cfg.clusterSize+cfg.proxySize), - } - - // launch etcd processes - for i := range etcdCfgs { - proc, err := newEtcdProcess(etcdCfgs[i]) - if err != nil { - epc.Close() - return nil, err - } - epc.procs[i] = proc - } - - return epc, epc.Start() -} - -func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) { - if !fileutil.Exist(cfg.execPath) { - return nil, fmt.Errorf("could not find etcd binary") - } - - if !cfg.keepDataDir { - if err := os.RemoveAll(cfg.dataDirPath); err != nil { - return nil, err - } - } - - child, err := spawnCmd(append([]string{cfg.execPath}, cfg.args...)) - if err != nil { - return nil, err - } - return &etcdProcess{cfg: cfg, proc: child, donec: make(chan struct{})}, nil -} - -func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig { - binPath = binDir + "/etcd" - ctlBinPath = binDir + "/etcdctl" - certPath = certDir + "/server.crt" - privateKeyPath = certDir + "/server.key.insecure" - caPath = certDir + "/ca.crt" - - revokedCertPath = certDir + "/server-revoked.crt" - revokedPrivateKeyPath = certDir + "/server-revoked.key.insecure" - crlPath = certDir + "/revoke.crl" - - if cfg.basePort == 0 { - cfg.basePort = etcdProcessBasePort - } - - if cfg.execPath == "" { - cfg.execPath = binPath - } - if cfg.snapCount == 0 { - cfg.snapCount = etcdserver.DefaultSnapCount - } - - clientScheme := "http" - if cfg.clientTLS == clientTLS { - clientScheme = "https" - } - peerScheme := cfg.baseScheme - if peerScheme == "" { - peerScheme = "http" - } - if cfg.isPeerTLS { - peerScheme += "s" - } - - etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize+cfg.proxySize) - initialCluster := make([]string, cfg.clusterSize) - for i := 0; i < cfg.clusterSize; i++ { - var curls []string - var curl, curltls string - port := cfg.basePort + 2*i - curlHost := fmt.Sprintf("localhost:%d", port) - - switch cfg.clientTLS { - case clientNonTLS, clientTLS: - curl = (&url.URL{Scheme: clientScheme, Host: curlHost}).String() - curls = []string{curl} - case clientTLSAndNonTLS: - curl = (&url.URL{Scheme: "http", Host: curlHost}).String() - curltls = (&url.URL{Scheme: "https", Host: curlHost}).String() - curls = []string{curl, curltls} - } - - purl := url.URL{Scheme: peerScheme, Host: fmt.Sprintf("localhost:%d", port+1)} - name := fmt.Sprintf("testname%d", i) - dataDirPath := cfg.dataDirPath - if cfg.dataDirPath == "" { - var derr error - dataDirPath, derr = ioutil.TempDir("", name+".etcd") - if derr != nil { - panic("could not get tempdir for datadir") - } - } - initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String()) - - args := []string{ - "--name", name, - "--listen-client-urls", strings.Join(curls, ","), - "--advertise-client-urls", strings.Join(curls, ","), - "--listen-peer-urls", purl.String(), - "--initial-advertise-peer-urls", purl.String(), - "--initial-cluster-token", cfg.initialToken, - "--data-dir", dataDirPath, - "--snapshot-count", fmt.Sprintf("%d", cfg.snapCount), - } - if cfg.forceNewCluster { - args = append(args, "--force-new-cluster") - } - if cfg.quotaBackendBytes > 0 { - args = append(args, - "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes), - ) - } - if cfg.noStrictReconfig { - args = append(args, "--strict-reconfig-check=false") - } - - args = append(args, cfg.tlsArgs()...) - etcdCfgs[i] = &etcdProcessConfig{ - execPath: cfg.execPath, - args: args, - dataDirPath: dataDirPath, - keepDataDir: cfg.keepDataDir, - name: name, - purl: purl, - acurl: curl, - acurltls: curltls, - acurlHost: curlHost, - initialToken: cfg.initialToken, - } - } - for i := 0; i < cfg.proxySize; i++ { - port := cfg.basePort + 2*cfg.clusterSize + i + 1 - curlHost := fmt.Sprintf("localhost:%d", port) - curl := url.URL{Scheme: clientScheme, Host: curlHost} - name := fmt.Sprintf("testname-proxy%d", i) - dataDirPath, derr := ioutil.TempDir("", name+".etcd") - if derr != nil { - panic("could not get tempdir for datadir") - } - args := []string{ - "--name", name, - "--proxy", "on", - "--listen-client-urls", curl.String(), - "--data-dir", dataDirPath, - } - args = append(args, cfg.tlsArgs()...) - etcdCfgs[cfg.clusterSize+i] = &etcdProcessConfig{ - execPath: cfg.execPath, - args: args, - dataDirPath: dataDirPath, - keepDataDir: cfg.keepDataDir, - name: name, - acurl: curl.String(), - acurlHost: curlHost, - isProxy: true, - } - } - - initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")} - for i := range etcdCfgs { - etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",") - etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...) - } - - return etcdCfgs -} - -func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) { - if cfg.clientTLS != clientNonTLS { - if cfg.isClientAutoTLS { - args = append(args, "--auto-tls=true") - } else { - tlsClientArgs := []string{ - "--cert-file", certPath, - "--key-file", privateKeyPath, - "--ca-file", caPath, - } - args = append(args, tlsClientArgs...) - - if cfg.clientCertAuthEnabled { - args = append(args, "--client-cert-auth") - } - } - } - - if cfg.isPeerTLS { - if cfg.isPeerAutoTLS { - args = append(args, "--peer-auto-tls=true") - } else { - tlsPeerArgs := []string{ - "--peer-cert-file", certPath, - "--peer-key-file", privateKeyPath, - "--peer-ca-file", caPath, - } - args = append(args, tlsPeerArgs...) - } - } - - if cfg.isClientCRL { - args = append(args, "--client-crl-file", crlPath, "--client-cert-auth") - } - - return args -} - -func (epc *etcdProcessCluster) Start() (err error) { - readyC := make(chan error, epc.cfg.clusterSize+epc.cfg.proxySize) - for i := range epc.procs { - go func(n int) { readyC <- epc.procs[n].waitReady() }(i) - } - for range epc.procs { - if err := <-readyC; err != nil { - epc.Close() - return err - } - } - return nil -} - -func (epc *etcdProcessCluster) RestartAll() error { - for i := range epc.procs { - proc, err := newEtcdProcess(epc.procs[i].cfg) - if err != nil { - epc.Close() - return err - } - epc.procs[i] = proc - } - return epc.Start() -} - -func (epc *etcdProcessCluster) StopAll() (err error) { - for _, p := range epc.procs { - if p == nil { - continue - } - if curErr := p.Stop(); curErr != nil { - if err != nil { - err = fmt.Errorf("%v; %v", err, curErr) - } else { - err = curErr - } - } - } - return err -} - -func (epc *etcdProcessCluster) Close() error { - err := epc.StopAll() - for _, p := range epc.procs { - // p is nil when newEtcdProcess fails in the middle - // Close still gets called to clean up test data - if p == nil { - continue - } - os.RemoveAll(p.cfg.dataDirPath) - } - return err -} - -func (ep *etcdProcess) Restart() error { - newEp, err := newEtcdProcess(ep.cfg) - if err != nil { - ep.Stop() - return err - } - *ep = *newEp - if err = ep.waitReady(); err != nil { - ep.Stop() - return err - } - return nil -} - -func (ep *etcdProcess) Stop() error { - if ep == nil { - return nil - } - if err := ep.proc.Stop(); err != nil { - return err - } - <-ep.donec - - if ep.cfg.purl.Scheme == "unix" || ep.cfg.purl.Scheme == "unixs" { - os.Remove(ep.cfg.purl.Host + ep.cfg.purl.Path) - } - return nil -} - -func (ep *etcdProcess) waitReady() error { - defer close(ep.donec) - return waitReadyExpectProc(ep.proc, ep.cfg.isProxy) -} - -func waitReadyExpectProc(exproc *expect.ExpectProcess, isProxy bool) error { - readyStrs := []string{"enabled capabilities for version", "published"} - if isProxy { - readyStrs = []string{"httpproxy: endpoints found"} - } - c := 0 - matchSet := func(l string) bool { - for _, s := range readyStrs { - if strings.Contains(l, s) { - c++ - break - } - } - return c == len(readyStrs) - } - _, err := exproc.ExpectFunc(matchSet) - return err -} - -func spawnWithExpect(args []string, expected string) error { - return spawnWithExpects(args, []string{expected}...) -} - -func spawnWithExpects(args []string, xs ...string) error { - proc, err := spawnCmd(args) - if err != nil { - return err - } - // process until either stdout or stderr contains - // the expected string - var ( - lines []string - lineFunc = func(txt string) bool { return true } - ) - for _, txt := range xs { - for { - l, lerr := proc.ExpectFunc(lineFunc) - if lerr != nil { - proc.Close() - return fmt.Errorf("%v (expected %q, got %q)", lerr, txt, lines) - } - lines = append(lines, l) - if strings.Contains(l, txt) { - break - } - } - } - perr := proc.Close() - if len(xs) == 0 && proc.LineCount() != noOutputLineCount { // expect no output - return fmt.Errorf("unexpected output (got lines %q, line count %d)", lines, proc.LineCount()) - } - return perr -} - -// proxies returns only the proxy etcdProcess. -func (epc *etcdProcessCluster) proxies() []*etcdProcess { - return epc.procs[epc.cfg.clusterSize:] -} - -func (epc *etcdProcessCluster) processes() []*etcdProcess { - return epc.procs[:epc.cfg.clusterSize] -} - -func (epc *etcdProcessCluster) endpoints() []string { - eps := make([]string, epc.cfg.clusterSize) - for i, ep := range epc.processes() { - eps[i] = ep.cfg.acurl - } - return eps -} - -func (epc *etcdProcessCluster) grpcEndpoints() []string { - eps := make([]string, epc.cfg.clusterSize) - for i, ep := range epc.processes() { - eps[i] = ep.cfg.acurlHost - } - return eps -} - -func (epc *etcdProcessCluster) withStopSignal(sig os.Signal) os.Signal { - ret := epc.procs[0].proc.StopSignal - for _, p := range epc.procs { - p.proc.StopSignal = sig - } - return ret -} - -func closeWithTimeout(p *expect.ExpectProcess, d time.Duration) error { - errc := make(chan error, 1) - go func() { errc <- p.Close() }() - select { - case err := <-errc: - return err - case <-time.After(d): - p.Stop() - // retry close after stopping to collect SIGQUIT data, if any - closeWithTimeout(p, time.Second) - } - return fmt.Errorf("took longer than %v to Close process %+v", d, p) -} diff --git a/e2e/gateway_test.go b/e2e/gateway_test.go index 9eee0170e35..6539e6f84cb 100644 --- a/e2e/gateway_test.go +++ b/e2e/gateway_test.go @@ -31,9 +31,9 @@ func TestGateway(t *testing.T) { if err != nil { t.Fatal(err) } - defer ec.StopAll() + defer ec.Stop() - eps := strings.Join(ec.grpcEndpoints(), ",") + eps := strings.Join(ec.EndpointsV3(), ",") p := startGateway(t, eps) defer p.Stop() diff --git a/e2e/main_test.go b/e2e/main_test.go index 59589507841..858018a26a8 100644 --- a/e2e/main_test.go +++ b/e2e/main_test.go @@ -13,8 +13,20 @@ import ( "github.com/coreos/etcd/pkg/testutil" ) -var binDir string -var certDir string +var ( + binDir string + certDir string + + binPath string + ctlBinPath string + certPath string + privateKeyPath string + caPath string + + crlPath string + revokedCertPath string + revokedPrivateKeyPath string +) func TestMain(m *testing.M) { os.Setenv("ETCD_UNSUPPORTED_ARCH", runtime.GOARCH) @@ -24,6 +36,15 @@ func TestMain(m *testing.M) { flag.StringVar(&certDir, "cert-dir", "../integration/fixtures", "The directory for store certificate files.") flag.Parse() + binPath = binDir + "/etcd" + ctlBinPath = binDir + "/etcdctl" + certPath = certDir + "/server.crt" + privateKeyPath = certDir + "/server.key.insecure" + caPath = certDir + "/ca.crt" + revokedCertPath = certDir + "/server-revoked.crt" + revokedPrivateKeyPath = certDir + "/server-revoked.key.insecure" + crlPath = certDir + "/revoke.crl" + v := m.Run() if v == 0 && testutil.CheckLeakedGoroutine() { os.Exit(1) diff --git a/e2e/util.go b/e2e/util.go new file mode 100644 index 00000000000..e4125137a69 --- /dev/null +++ b/e2e/util.go @@ -0,0 +1,91 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "fmt" + "strings" + "time" + + "github.com/coreos/etcd/pkg/expect" +) + +func waitReadyExpectProc(exproc *expect.ExpectProcess, readyStrs []string) error { + c := 0 + matchSet := func(l string) bool { + for _, s := range readyStrs { + if strings.Contains(l, s) { + c++ + break + } + } + return c == len(readyStrs) + } + _, err := exproc.ExpectFunc(matchSet) + return err +} + +func spawnWithExpect(args []string, expected string) error { + return spawnWithExpects(args, []string{expected}...) +} + +func spawnWithExpects(args []string, xs ...string) error { + proc, err := spawnCmd(args) + if err != nil { + return err + } + // process until either stdout or stderr contains + // the expected string + var ( + lines []string + lineFunc = func(txt string) bool { return true } + ) + for _, txt := range xs { + for { + l, lerr := proc.ExpectFunc(lineFunc) + if lerr != nil { + proc.Close() + return fmt.Errorf("%v (expected %q, got %q)", lerr, txt, lines) + } + lines = append(lines, l) + if strings.Contains(l, txt) { + break + } + } + } + perr := proc.Close() + if len(xs) == 0 && proc.LineCount() != noOutputLineCount { // expect no output + return fmt.Errorf("unexpected output (got lines %q, line count %d)", lines, proc.LineCount()) + } + return perr +} + +func closeWithTimeout(p *expect.ExpectProcess, d time.Duration) error { + errc := make(chan error, 1) + go func() { errc <- p.Close() }() + select { + case err := <-errc: + return err + case <-time.After(d): + p.Stop() + // retry close after stopping to collect SIGQUIT data, if any + closeWithTimeout(p, time.Second) + } + return fmt.Errorf("took longer than %v to Close process %+v", d, p) +} + +func toTLS(s string) string { + return strings.Replace(s, "http://", "https://", 1) +} diff --git a/e2e/v2_curl_test.go b/e2e/v2_curl_test.go index a44227ec6d5..2322a8549f6 100644 --- a/e2e/v2_curl_test.go +++ b/e2e/v2_curl_test.go @@ -23,15 +23,12 @@ import ( "github.com/coreos/etcd/pkg/testutil" ) -func TestV2CurlNoTLS(t *testing.T) { testCurlPutGet(t, &configNoTLS) } -func TestV2CurlAutoTLS(t *testing.T) { testCurlPutGet(t, &configAutoTLS) } -func TestV2CurlAllTLS(t *testing.T) { testCurlPutGet(t, &configTLS) } -func TestV2CurlPeerTLS(t *testing.T) { testCurlPutGet(t, &configPeerTLS) } -func TestV2CurlClientTLS(t *testing.T) { testCurlPutGet(t, &configClientTLS) } -func TestV2CurlProxyNoTLS(t *testing.T) { testCurlPutGet(t, &configWithProxy) } -func TestV2CurlProxyTLS(t *testing.T) { testCurlPutGet(t, &configWithProxyTLS) } -func TestV2CurlProxyPeerTLS(t *testing.T) { testCurlPutGet(t, &configWithProxyPeerTLS) } -func TestV2CurlClientBoth(t *testing.T) { testCurlPutGet(t, &configClientBoth) } +func TestV2CurlNoTLS(t *testing.T) { testCurlPutGet(t, &configNoTLS) } +func TestV2CurlAutoTLS(t *testing.T) { testCurlPutGet(t, &configAutoTLS) } +func TestV2CurlAllTLS(t *testing.T) { testCurlPutGet(t, &configTLS) } +func TestV2CurlPeerTLS(t *testing.T) { testCurlPutGet(t, &configPeerTLS) } +func TestV2CurlClientTLS(t *testing.T) { testCurlPutGet(t, &configClientTLS) } +func TestV2CurlClientBoth(t *testing.T) { testCurlPutGet(t, &configClientBoth) } func testCurlPutGet(t *testing.T, cfg *etcdProcessClusterConfig) { defer testutil.AfterTest(t) @@ -135,14 +132,14 @@ type cURLReq struct { func cURLPrefixArgs(clus *etcdProcessCluster, method string, req cURLReq) []string { var ( cmdArgs = []string{"curl"} - acurl = clus.procs[rand.Intn(clus.cfg.clusterSize)].cfg.acurl + acurl = clus.procs[rand.Intn(clus.cfg.clusterSize)].Config().acurl ) if req.isTLS { if clus.cfg.clientTLS != clientTLSAndNonTLS { panic("should not use cURLPrefixArgsUseTLS when serving only TLS or non-TLS") } cmdArgs = append(cmdArgs, "--cacert", caPath, "--cert", certPath, "--key", privateKeyPath) - acurl = clus.procs[rand.Intn(clus.cfg.clusterSize)].cfg.acurltls + acurl = toTLS(clus.procs[rand.Intn(clus.cfg.clusterSize)].Config().acurl) } else if clus.cfg.clientTLS == clientTLS { cmdArgs = append(cmdArgs, "--cacert", caPath, "--cert", certPath, "--key", privateKeyPath) } diff --git a/embed/config.go b/embed/config.go index ee5fcce4f08..2fb2a3280e7 100644 --- a/embed/config.go +++ b/embed/config.go @@ -20,6 +20,7 @@ import ( "net" "net/http" "net/url" + "path/filepath" "strings" "github.com/coreos/etcd/etcdserver" @@ -393,6 +394,34 @@ func (cfg Config) defaultClientHost() bool { return len(cfg.ACUrls) == 1 && cfg.ACUrls[0].String() == DefaultAdvertiseClientURLs } +func (cfg *Config) ClientSelfCert() (err error) { + if cfg.ClientAutoTLS && cfg.ClientTLSInfo.Empty() { + chosts := make([]string, len(cfg.LCUrls)) + for i, u := range cfg.LCUrls { + chosts[i] = u.Host + } + cfg.ClientTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "client"), chosts) + return err + } else if cfg.ClientAutoTLS { + plog.Warningf("ignoring client auto TLS since certs given") + } + return nil +} + +func (cfg *Config) PeerSelfCert() (err error) { + if cfg.PeerAutoTLS && cfg.PeerTLSInfo.Empty() { + phosts := make([]string, len(cfg.LPUrls)) + for i, u := range cfg.LPUrls { + phosts[i] = u.Host + } + cfg.PeerTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "peer"), phosts) + return err + } else if cfg.PeerAutoTLS { + plog.Warningf("ignoring peer auto TLS since certs given") + } + return nil +} + // UpdateDefaultClusterFromName updates cluster advertise URLs with, if available, default host, // if advertise URLs are default values(localhost:2379,2380) AND if listen URL is 0.0.0.0. // e.g. advertise peer URL localhost:2380 or listen peer URL 0.0.0.0:2380 diff --git a/embed/etcd.go b/embed/etcd.go index c5c0d2d699f..b48caa8989e 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -22,7 +22,6 @@ import ( "net" "net/http" "net/url" - "path/filepath" "sync" "time" @@ -248,19 +247,9 @@ func (e *Etcd) Close() { func (e *Etcd) Err() <-chan error { return e.errc } func startPeerListeners(cfg *Config) (peers []*peerListener, err error) { - if cfg.PeerAutoTLS && cfg.PeerTLSInfo.Empty() { - phosts := make([]string, len(cfg.LPUrls)) - for i, u := range cfg.LPUrls { - phosts[i] = u.Host - } - cfg.PeerTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "peer"), phosts) - if err != nil { - plog.Fatalf("could not get certs (%v)", err) - } - } else if cfg.PeerAutoTLS { - plog.Warningf("ignoring peer auto TLS since certs given") + if err = cfg.PeerSelfCert(); err != nil { + plog.Fatalf("could not get certs (%v)", err) } - if !cfg.PeerTLSInfo.Empty() { plog.Infof("peerTLS: %s", cfg.PeerTLSInfo) } @@ -302,19 +291,9 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) { } func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) { - if cfg.ClientAutoTLS && cfg.ClientTLSInfo.Empty() { - chosts := make([]string, len(cfg.LCUrls)) - for i, u := range cfg.LCUrls { - chosts[i] = u.Host - } - cfg.ClientTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "client"), chosts) - if err != nil { - plog.Fatalf("could not get certs (%v)", err) - } - } else if cfg.ClientAutoTLS { - plog.Warningf("ignoring client auto TLS since certs given") + if err = cfg.ClientSelfCert(); err != nil { + plog.Fatalf("could not get certs (%v)", err) } - if cfg.EnablePprof { plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf) } diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 722df51f92f..f04eb4f8a12 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -199,12 +199,24 @@ func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) { func startProxy(cfg *config) error { plog.Notice("proxy: this proxy supports v2 API only!") - pt, err := transport.NewTimeoutTransport(cfg.PeerTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond) + clientTLSInfo := cfg.ClientTLSInfo + if clientTLSInfo.Empty() { + // Support old proxy behavior of defaulting to PeerTLSInfo + // for both client and peer connections. + clientTLSInfo = cfg.PeerTLSInfo + } + clientTLSInfo.InsecureSkipVerify = cfg.ClientAutoTLS + cfg.PeerTLSInfo.InsecureSkipVerify = cfg.PeerAutoTLS + + pt, err := transport.NewTimeoutTransport(clientTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond) if err != nil { return err } pt.MaxIdleConnsPerHost = httpproxy.DefaultMaxIdleConnsPerHost + if err = cfg.PeerSelfCert(); err != nil { + plog.Fatalf("could not get certs (%v)", err) + } tr, err := transport.NewTimeoutTransport(cfg.PeerTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond) if err != nil { return err diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index 800ab712d33..0fdf69ef514 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -15,12 +15,13 @@ package etcdmain import ( - "crypto/tls" "fmt" + "math" "net" "net/http" "net/url" "os" + "path/filepath" "time" "github.com/coreos/etcd/clientv3" @@ -45,9 +46,22 @@ var ( grpcProxyEndpoints []string grpcProxyDNSCluster string grpcProxyInsecureDiscovery bool - grpcProxyCert string - grpcProxyKey string - grpcProxyCA string + grpcProxyDataDir string + + // tls for connecting to etcd + + grpcProxyCA string + grpcProxyCert string + grpcProxyKey string + grpcProxyInsecureSkipTLSVerify bool + + // tls for clients connecting to proxy + + grpcProxyListenCA string + grpcProxyListenCert string + grpcProxyListenKey string + grpcProxyListenAutoTLS bool + grpcProxyListenCRL string grpcProxyAdvertiseClientURL string grpcProxyResolverPrefix string @@ -85,19 +99,77 @@ func newGRPCProxyStartCommand() *cobra.Command { cmd.Flags().StringVar(&grpcProxyMetricsListenAddr, "metrics-addr", "", "listen for /metrics requests on an additional interface") cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records") cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints") - cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file") - cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file") - cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle") cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)") cmd.Flags().StringVar(&grpcProxyResolverPrefix, "resolver-prefix", "", "prefix to use for registering proxy (must be shared with other grpc-proxy members)") cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints") cmd.Flags().StringVar(&grpcProxyNamespace, "namespace", "", "string to prefix to all keys for namespacing requests") cmd.Flags().BoolVar(&grpcProxyEnablePprof, "enable-pprof", false, `Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"`) + cmd.Flags().StringVar(&grpcProxyDataDir, "data-dir", "default.proxy", "Data directory for persistent data") + + // client TLS for connecting to server + cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file") + cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file") + cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle") + cmd.Flags().BoolVar(&grpcProxyInsecureSkipTLSVerify, "insecure-skip-tls-verify", false, "skip authentication of etcd server TLS certificates") + + // client TLS for connecting to proxy + cmd.Flags().StringVar(&grpcProxyListenCert, "cert-file", "", "identify secure connections to the proxy using this TLS certificate file") + cmd.Flags().StringVar(&grpcProxyListenKey, "key-file", "", "identify secure connections to the proxy using this TLS key file") + cmd.Flags().StringVar(&grpcProxyListenCA, "trusted-ca-file", "", "verify certificates of TLS-enabled secure proxy using this CA bundle") + cmd.Flags().BoolVar(&grpcProxyListenAutoTLS, "auto-tls", false, "proxy TLS using generated certificates") + cmd.Flags().StringVar(&grpcProxyListenCRL, "client-crl-file", "", "proxy client certificate revocation list file.") return &cmd } func startGRPCProxy(cmd *cobra.Command, args []string) { + checkArgs() + + tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey) + if tlsinfo == nil && grpcProxyListenAutoTLS { + host := []string{"https://" + grpcProxyListenAddr} + dir := filepath.Join(grpcProxyDataDir, "fixtures", "proxy") + autoTLS, err := transport.SelfCert(dir, host) + if err != nil { + plog.Fatal(err) + } + tlsinfo = &autoTLS + } + if tlsinfo != nil { + plog.Infof("ServerTLS: %s", tlsinfo) + } + m := mustListenCMux(tlsinfo) + + grpcl := m.Match(cmux.HTTP2()) + defer func() { + grpcl.Close() + plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr) + }() + + client := mustNewClient() + + srvhttp, httpl := mustHTTPListener(m, tlsinfo) + errc := make(chan error) + go func() { errc <- newGRPCProxyServer(client).Serve(grpcl) }() + go func() { errc <- srvhttp.Serve(httpl) }() + go func() { errc <- m.Serve() }() + if len(grpcProxyMetricsListenAddr) > 0 { + mhttpl := mustMetricsListener(tlsinfo) + go func() { + mux := http.NewServeMux() + mux.Handle("/metrics", prometheus.Handler()) + plog.Fatal(http.Serve(mhttpl, mux)) + }() + } + + // grpc-proxy is initialized, ready to serve + notifySystemd() + + fmt.Fprintln(os.Stderr, <-errc) + os.Exit(1) +} + +func checkArgs() { if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 { fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-ttl %d", grpcProxyResolverTTL)) os.Exit(1) @@ -110,40 +182,79 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { fmt.Fprintln(os.Stderr, fmt.Errorf("invalid advertise-client-url %q", grpcProxyAdvertiseClientURL)) os.Exit(1) } +} +func mustNewClient() *clientv3.Client { srvs := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery) - if len(srvs.Endpoints) != 0 { - grpcProxyEndpoints = srvs.Endpoints + eps := srvs.Endpoints + if len(eps) == 0 { + eps = grpcProxyEndpoints } - - l, err := net.Listen("tcp", grpcProxyListenAddr) + cfg, err := newClientCfg(eps) if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } - if l, err = transport.NewKeepAliveListener(l, "tcp", nil); err != nil { + client, err := clientv3.New(*cfg) + if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } - plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr) - defer func() { - l.Close() - plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr) - }() - m := cmux.New(l) + return client +} - cfg, cfgtls, err := newClientCfg() +func newClientCfg(eps []string) (*clientv3.Config, error) { + // set tls if any one tls option set + cfg := clientv3.Config{ + Endpoints: eps, + DialTimeout: 5 * time.Second, + } + tls := newTLS(grpcProxyCA, grpcProxyCert, grpcProxyKey) + if tls == nil && grpcProxyInsecureSkipTLSVerify { + tls = &transport.TLSInfo{} + } + if tls != nil { + clientTLS, err := tls.ClientConfig() + if err != nil { + return nil, err + } + clientTLS.InsecureSkipVerify = grpcProxyInsecureSkipTLSVerify + cfg.TLS = clientTLS + plog.Infof("ClientTLS: %s", tls) + } + return &cfg, nil +} + +func newTLS(ca, cert, key string) *transport.TLSInfo { + if ca == "" && cert == "" && key == "" { + return nil + } + return &transport.TLSInfo{CAFile: ca, CertFile: cert, KeyFile: key} +} + +func mustListenCMux(tlsinfo *transport.TLSInfo) cmux.CMux { + l, err := net.Listen("tcp", grpcProxyListenAddr) if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } - client, err := clientv3.New(*cfg) - if err != nil { + if l, err = transport.NewKeepAliveListener(l, "tcp", nil); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } + if tlsinfo != nil { + tlsinfo.CRLFile = grpcProxyListenCRL + if l, err = transport.NewTLSListener(l, tlsinfo); err != nil { + plog.Fatal(err) + } + } + plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr) + return cmux.New(l) +} + +func newGRPCProxyServer(client *clientv3.Client) *grpc.Server { if len(grpcProxyNamespace) > 0 { client.KV = namespace.NewKV(client.KV, grpcProxyNamespace) client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace) @@ -165,7 +276,9 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { server := grpc.NewServer( grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), + grpc.MaxConcurrentStreams(math.MaxUint32), ) + pb.RegisterKVServer(server, kvp) pb.RegisterWatchServer(server, watchp) pb.RegisterClusterServer(server, clusterp) @@ -174,12 +287,10 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { pb.RegisterAuthServer(server, authp) v3electionpb.RegisterElectionServer(server, electionp) v3lockpb.RegisterLockServer(server, lockp) + return server +} - errc := make(chan error) - - grpcl := m.Match(cmux.HTTP2()) - go func() { errc <- server.Serve(grpcl) }() - +func mustHTTPListener(m cmux.CMux, tlsinfo *transport.TLSInfo) (*http.Server, net.Listener) { httpmux := http.NewServeMux() httpmux.HandleFunc("/", http.NotFound) httpmux.Handle("/metrics", prometheus.Handler()) @@ -189,82 +300,31 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { } plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf) } + srvhttp := &http.Server{Handler: httpmux} - srvhttp := &http.Server{ - Handler: httpmux, + if tlsinfo == nil { + return srvhttp, m.Match(cmux.HTTP1()) } - var httpl net.Listener - if cfg.TLS != nil { - srvhttp.TLSConfig = cfg.TLS - httpl = tls.NewListener(m.Match(cmux.Any()), cfg.TLS) - } else { - httpl = m.Match(cmux.HTTP1()) - } - go func() { errc <- srvhttp.Serve(httpl) }() - - go func() { errc <- m.Serve() }() - - if len(grpcProxyMetricsListenAddr) > 0 { - murl, err := url.Parse(grpcProxyMetricsListenAddr) - if err != nil { - fmt.Fprintf(os.Stderr, "cannot parse %q", grpcProxyMetricsListenAddr) - os.Exit(1) - } - ml, err := transport.NewListener(murl.Host, murl.Scheme, cfgtls) - if err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } - - mux := http.NewServeMux() - mux.Handle("/metrics", prometheus.Handler()) - - go func() { - plog.Info("grpc-proxy: listening for metrics on ", murl.String()) - plog.Fatal(http.Serve(ml, mux)) - }() + srvTLS, err := tlsinfo.ServerConfig() + if err != nil { + plog.Fatalf("could not setup TLS (%v)", err) } - - // grpc-proxy is initialized, ready to serve - notifySystemd() - - fmt.Fprintln(os.Stderr, <-errc) - os.Exit(1) + srvhttp.TLSConfig = srvTLS + return srvhttp, m.Match(cmux.Any()) } -func newClientCfg() (*clientv3.Config, *transport.TLSInfo, error) { - // set tls if any one tls option set - var cfgtls *transport.TLSInfo - tlsinfo := transport.TLSInfo{} - if grpcProxyCert != "" { - tlsinfo.CertFile = grpcProxyCert - cfgtls = &tlsinfo - } - - if grpcProxyKey != "" { - tlsinfo.KeyFile = grpcProxyKey - cfgtls = &tlsinfo - } - - if grpcProxyCA != "" { - tlsinfo.CAFile = grpcProxyCA - cfgtls = &tlsinfo - } - - cfg := clientv3.Config{ - Endpoints: grpcProxyEndpoints, - DialTimeout: 5 * time.Second, +func mustMetricsListener(tlsinfo *transport.TLSInfo) net.Listener { + murl, err := url.Parse(grpcProxyMetricsListenAddr) + if err != nil { + fmt.Fprintf(os.Stderr, "cannot parse %q", grpcProxyMetricsListenAddr) + os.Exit(1) } - if cfgtls != nil { - clientTLS, err := cfgtls.ClientConfig() - if err != nil { - return nil, nil, err - } - cfg.TLS = clientTLS + ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsinfo) + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } - - // TODO: support insecure tls - - return &cfg, cfgtls, nil + plog.Info("grpc-proxy: listening for metrics on ", murl.String()) + return ml } diff --git a/etcdmain/main.go b/etcdmain/main.go index fd4e7f69657..06bbae56b8d 100644 --- a/etcdmain/main.go +++ b/etcdmain/main.go @@ -17,6 +17,7 @@ package etcdmain import ( "fmt" "os" + "strings" "github.com/coreos/go-systemd/daemon" systemdutil "github.com/coreos/go-systemd/util" @@ -26,7 +27,13 @@ func Main() { checkSupportArch() if len(os.Args) > 1 { - switch os.Args[1] { + cmd := os.Args[1] + if covArgs := os.Getenv("ETCDCOV_ARGS"); len(covArgs) > 0 { + args := strings.Split(os.Getenv("ETCDCOV_ARGS"), "\xe7\xcd")[1:] + rootCmd.SetArgs(args) + cmd = "grpc-proxy" + } + switch cmd { case "gateway", "grpc-proxy": if err := rootCmd.Execute(); err != nil { fmt.Fprint(os.Stderr, err) diff --git a/pkg/transport/listener.go b/pkg/transport/listener.go index 12120beaefd..33ba17fe12d 100644 --- a/pkg/transport/listener.go +++ b/pkg/transport/listener.go @@ -56,12 +56,13 @@ func wrapTLS(addr, scheme string, tlsinfo *TLSInfo, l net.Listener) (net.Listene } type TLSInfo struct { - CertFile string - KeyFile string - CAFile string - TrustedCAFile string - ClientCertAuth bool - CRLFile string + CertFile string + KeyFile string + CAFile string + TrustedCAFile string + ClientCertAuth bool + CRLFile string + InsecureSkipVerify bool // ServerName ensures the cert matches the given host in case of discovery / virtual hosting ServerName string @@ -236,6 +237,7 @@ func (info TLSInfo) ClientConfig() (*tls.Config, error) { } else { cfg = &tls.Config{ServerName: info.ServerName} } + cfg.InsecureSkipVerify = info.InsecureSkipVerify CAFiles := info.cafiles() if len(CAFiles) > 0 { diff --git a/proxy/grpcproxy/maintenance.go b/proxy/grpcproxy/maintenance.go index 0a852c35089..2f57cbb307b 100644 --- a/proxy/grpcproxy/maintenance.go +++ b/proxy/grpcproxy/maintenance.go @@ -15,6 +15,8 @@ package grpcproxy import ( + "io" + "golang.org/x/net/context" "github.com/coreos/etcd/clientv3" @@ -49,6 +51,9 @@ func (mp *maintenanceProxy) Snapshot(sr *pb.SnapshotRequest, stream pb.Maintenan for { rr, err := sc.Recv() if err != nil { + if err == io.EOF { + return nil + } return err } err = stream.Send(rr) diff --git a/test b/test index ebd4689065b..d933020a7f8 100755 --- a/test +++ b/test @@ -170,7 +170,10 @@ function cov_pass { # use 30m timeout because e2e coverage takes longer # due to many tests cause etcd process to wait # on leadership transfer timeout during gracefully shutdown + echo Testing e2e without proxy... go test -tags cov -timeout 30m -v ${REPO_PATH}"/e2e" || failed="$failed e2e" + echo Testing e2e with proxy... + go test -tags "cov cluster_proxy" -timeout 30m -v ${REPO_PATH}"/e2e" || failed="$failed e2e-proxy" # incrementally merge to get coverage data even if some coverage files are corrupted # optimistically assume etcdserver package's coverage file is OK since gocovmerge @@ -217,6 +220,7 @@ function integration_e2e_pass { function grpcproxy_pass { go test -timeout 20m -v ${RACE} -tags cluster_proxy -cpu 1,2,4 $@ ${REPO_PATH}/integration go test -timeout 15m -v ${RACE} -tags cluster_proxy -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration + go test -timeout 15m -v -tags cluster_proxy $@ ${REPO_PATH}/e2e } function release_pass {