From 5c6a6bdc5a91a2d46e0ed60dd3e6deb8dc29e3ec Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Fri, 14 Jul 2017 16:03:04 -0700 Subject: [PATCH 01/11] e2e: refactor to support -tags cluster_proxy --- e2e/cluster_direct_test.go | 21 ++ e2e/cluster_proxy_test.go | 278 +++++++++++++++ e2e/cluster_test.go | 359 +++++++++++++++++++ e2e/ctl_v2_test.go | 30 +- e2e/ctl_v3_alarm_test.go | 2 +- e2e/ctl_v3_auth_test.go | 99 ++++++ e2e/ctl_v3_defrag_test.go | 26 +- e2e/ctl_v3_elect_test.go | 4 +- e2e/ctl_v3_endpoint_test.go | 34 +- e2e/ctl_v3_lock_test.go | 4 +- e2e/ctl_v3_migrate_test.go | 10 +- e2e/ctl_v3_move_leader_test.go | 6 +- e2e/ctl_v3_snapshot_test.go | 59 +-- e2e/ctl_v3_test.go | 8 +- e2e/etcd_config_test.go | 2 +- e2e/etcd_process.go | 134 +++++++ e2e/etcd_release_upgrade_test.go | 8 +- e2e/etcd_spawn_cov.go | 41 ++- e2e/etcd_test.go | 593 ------------------------------- e2e/gateway_test.go | 4 +- e2e/main_test.go | 25 +- e2e/util.go | 91 +++++ e2e/v2_curl_test.go | 19 +- 23 files changed, 1083 insertions(+), 774 deletions(-) create mode 100644 e2e/cluster_direct_test.go create mode 100644 e2e/cluster_proxy_test.go create mode 100644 e2e/cluster_test.go create mode 100644 e2e/etcd_process.go delete mode 100644 e2e/etcd_test.go create mode 100644 e2e/util.go diff --git a/e2e/cluster_direct_test.go b/e2e/cluster_direct_test.go new file mode 100644 index 00000000000..15a16c9257a --- /dev/null +++ b/e2e/cluster_direct_test.go @@ -0,0 +1,21 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build !cluster_proxy + +package e2e + +func newEtcdProcess(cfg *etcdServerProcessConfig) (etcdProcess, error) { + return newEtcdServerProcess(cfg) +} diff --git a/e2e/cluster_proxy_test.go b/e2e/cluster_proxy_test.go new file mode 100644 index 00000000000..a2bab6f587e --- /dev/null +++ b/e2e/cluster_proxy_test.go @@ -0,0 +1,278 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// +build cluster_proxy + +package e2e + +import ( + "fmt" + "net" + "net/url" + "os" + "strconv" + "strings" + + "github.com/coreos/etcd/pkg/expect" +) + +type proxyEtcdProcess struct { + etcdProc etcdProcess + proxyV2 *proxyV2Proc + proxyV3 *proxyV3Proc +} + +func newEtcdProcess(cfg *etcdServerProcessConfig) (etcdProcess, error) { + return newProxyEtcdProcess(cfg) +} + +func newProxyEtcdProcess(cfg *etcdServerProcessConfig) (*proxyEtcdProcess, error) { + ep, err := newEtcdServerProcess(cfg) + if err != nil { + return nil, err + } + pep := &proxyEtcdProcess{ + etcdProc: ep, + proxyV2: newProxyV2Proc(cfg), + proxyV3: newProxyV3Proc(cfg), + } + return pep, nil +} + +func (p *proxyEtcdProcess) Config() *etcdServerProcessConfig { return p.etcdProc.Config() } + +func (p *proxyEtcdProcess) EndpointsV2() []string { return p.proxyV2.endpoints() } +func (p *proxyEtcdProcess) EndpointsV3() []string { return p.proxyV3.endpoints() } + +func (p *proxyEtcdProcess) Start() error { + if err := p.etcdProc.Start(); err != nil { + return err + } + if err := p.proxyV2.Start(); err != nil { + return err + } + return p.proxyV3.Start() +} + +func (p *proxyEtcdProcess) Restart() error { + if err := p.etcdProc.Restart(); err != nil { + return err + } + if err := p.proxyV2.Restart(); err != nil { + return err + } + return p.proxyV3.Restart() +} + +func (p *proxyEtcdProcess) Stop() error { + err := p.proxyV2.Stop() + if v3err := p.proxyV3.Stop(); err == nil { + err = v3err + } + if eerr := p.etcdProc.Stop(); eerr != nil && err == nil { + // fails on go-grpc issue #1384 + if !strings.Contains(eerr.Error(), "exit status 2") { + err = eerr + } + } + return err +} + +func (p *proxyEtcdProcess) Close() error { + err := p.proxyV2.Close() + if v3err := p.proxyV3.Close(); err == nil { + err = v3err + } + if eerr := p.etcdProc.Close(); eerr != nil && err == nil { + // fails on go-grpc issue #1384 + if !strings.Contains(eerr.Error(), "exit status 2") { + err = eerr + } + } + return err +} + +func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal { + p.proxyV3.WithStopSignal(sig) + p.proxyV3.WithStopSignal(sig) + return p.etcdProc.WithStopSignal(sig) +} + +type proxyProc struct { + execPath string + args []string + ep string + donec chan struct{} + + proc *expect.ExpectProcess +} + +func (pp *proxyProc) endpoints() []string { return []string{pp.ep} } + +func (pp *proxyProc) start() error { + if pp.proc != nil { + panic("already started") + } + proc, err := spawnCmd(append([]string{pp.execPath}, pp.args...)) + if err != nil { + return err + } + pp.proc = proc + return nil +} + +func (pp *proxyProc) waitReady(readyStr string) error { + defer close(pp.donec) + return waitReadyExpectProc(pp.proc, []string{readyStr}) +} + +func (pp *proxyProc) Stop() error { + if pp.proc == nil { + return nil + } + if err := pp.proc.Stop(); err != nil && !strings.Contains(err.Error(), "exit status 1") { + // v2proxy exits with status 1 on auto tls; not sure why + return err + } + pp.proc = nil + <-pp.donec + pp.donec = make(chan struct{}) + return nil +} + +func (pp *proxyProc) WithStopSignal(sig os.Signal) os.Signal { + ret := pp.proc.StopSignal + pp.proc.StopSignal = sig + return ret +} + +func (pp *proxyProc) Close() error { return pp.Stop() } + +type proxyV2Proc struct { + proxyProc + dataDir string +} + +func proxyListenURL(cfg *etcdServerProcessConfig, portOffset int) string { + u, err := url.Parse(cfg.acurl) + if err != nil { + panic(err) + } + host, port, _ := net.SplitHostPort(u.Host) + p, _ := strconv.ParseInt(port, 10, 16) + u.Host = fmt.Sprintf("%s:%d", host, int(p)+portOffset) + return u.String() +} + +func newProxyV2Proc(cfg *etcdServerProcessConfig) *proxyV2Proc { + listenAddr := proxyListenURL(cfg, 2) + name := fmt.Sprintf("testname-proxy-%p", cfg) + args := []string{ + "--name", name, + "--proxy", "on", + "--listen-client-urls", listenAddr, + "--initial-cluster", cfg.name + "=" + cfg.purl.String(), + } + return &proxyV2Proc{ + proxyProc{ + execPath: cfg.execPath, + args: append(args, cfg.tlsArgs...), + ep: listenAddr, + donec: make(chan struct{}), + }, + name + ".etcd", + } +} + +func (v2p *proxyV2Proc) Start() error { + os.RemoveAll(v2p.dataDir) + if err := v2p.start(); err != nil { + return err + } + return v2p.waitReady("httpproxy: endpoints found") +} + +func (v2p *proxyV2Proc) Restart() error { + if err := v2p.Stop(); err != nil { + return err + } + return v2p.Start() +} + +func (v2p *proxyV2Proc) Stop() error { + if err := v2p.proxyProc.Stop(); err != nil { + return err + } + // v2 proxy caches members; avoid reuse of directory + return os.RemoveAll(v2p.dataDir) +} + +type proxyV3Proc struct { + proxyProc +} + +func newProxyV3Proc(cfg *etcdServerProcessConfig) *proxyV3Proc { + listenAddr := proxyListenURL(cfg, 3) + args := []string{ + "grpc-proxy", + "start", + "--listen-addr", strings.Split(listenAddr, "/")[2], + "--endpoints", cfg.acurl, + // pass-through member RPCs + "--advertise-client-url", "", + } + tlsArgs := []string{} + for i := 0; i < len(cfg.tlsArgs); i++ { + switch cfg.tlsArgs[i] { + case "--cert-file": + tlsArgs = append(tlsArgs, "--cert", cfg.tlsArgs[i+1], "--cert-file", cfg.tlsArgs[i+1]) + i++ + case "--key-file": + tlsArgs = append(tlsArgs, "--key", cfg.tlsArgs[i+1], "--key-file", cfg.tlsArgs[i+1]) + i++ + case "--ca-file": + tlsArgs = append(tlsArgs, "--cacert", cfg.tlsArgs[i+1], "--trusted-ca-file", cfg.tlsArgs[i+1]) + i++ + case "--auto-tls": + tlsArgs = append(tlsArgs, "--auto-tls", "--insecure-skip-tls-verify") + case "--peer-ca-file", "--peer-cert-file", "--peer-key-file": + i++ // skip arg + case "--client-cert-auth", "--peer-auto-tls": + default: + tlsArgs = append(tlsArgs, cfg.tlsArgs[i]) + } + } + return &proxyV3Proc{ + proxyProc{ + execPath: cfg.execPath, + args: append(args, tlsArgs...), + ep: listenAddr, + donec: make(chan struct{}), + }, + } +} + +func (v3p *proxyV3Proc) Restart() error { + if err := v3p.Stop(); err != nil { + return err + } + return v3p.Start() +} + +func (v3p *proxyV3Proc) Start() error { + if err := v3p.start(); err != nil { + return err + } + return v3p.waitReady("listening for grpc-proxy client requests") +} diff --git a/e2e/cluster_test.go b/e2e/cluster_test.go new file mode 100644 index 00000000000..ebd2c265d7e --- /dev/null +++ b/e2e/cluster_test.go @@ -0,0 +1,359 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "fmt" + "io/ioutil" + "net/url" + "os" + "strings" + + "github.com/coreos/etcd/etcdserver" +) + +const etcdProcessBasePort = 20000 + +type clientConnType int + +const ( + clientNonTLS clientConnType = iota + clientTLS + clientTLSAndNonTLS +) + +var ( + configNoTLS = etcdProcessClusterConfig{ + clusterSize: 3, + initialToken: "new", + } + configAutoTLS = etcdProcessClusterConfig{ + clusterSize: 3, + isPeerTLS: true, + isPeerAutoTLS: true, + initialToken: "new", + } + configTLS = etcdProcessClusterConfig{ + clusterSize: 3, + clientTLS: clientTLS, + isPeerTLS: true, + initialToken: "new", + } + configClientTLS = etcdProcessClusterConfig{ + clusterSize: 3, + clientTLS: clientTLS, + initialToken: "new", + } + configClientBoth = etcdProcessClusterConfig{ + clusterSize: 1, + clientTLS: clientTLSAndNonTLS, + initialToken: "new", + } + configClientAutoTLS = etcdProcessClusterConfig{ + clusterSize: 1, + isClientAutoTLS: true, + clientTLS: clientTLS, + initialToken: "new", + } + configPeerTLS = etcdProcessClusterConfig{ + clusterSize: 3, + isPeerTLS: true, + initialToken: "new", + } + configClientTLSCertAuth = etcdProcessClusterConfig{ + clusterSize: 1, + clientTLS: clientTLS, + initialToken: "new", + clientCertAuthEnabled: true, + } +) + +func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig { + ret := cfg + ret.clusterSize = 1 + return &ret +} + +type etcdProcessCluster struct { + cfg *etcdProcessClusterConfig + procs []etcdProcess +} + +type etcdProcessClusterConfig struct { + execPath string + dataDirPath string + keepDataDir bool + + clusterSize int + + baseScheme string + basePort int + + snapCount int // default is 10000 + + clientTLS clientConnType + clientCertAuthEnabled bool + isPeerTLS bool + isPeerAutoTLS bool + isClientAutoTLS bool + isClientCRL bool + + forceNewCluster bool + initialToken string + quotaBackendBytes int64 + noStrictReconfig bool +} + +// newEtcdProcessCluster launches a new cluster from etcd processes, returning +// a new etcdProcessCluster once all nodes are ready to accept client requests. +func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) { + etcdCfgs := cfg.etcdServerProcessConfigs() + epc := &etcdProcessCluster{ + cfg: cfg, + procs: make([]etcdProcess, cfg.clusterSize), + } + + // launch etcd processes + for i := range etcdCfgs { + proc, err := newEtcdProcess(etcdCfgs[i]) + if err != nil { + epc.Close() + return nil, err + } + epc.procs[i] = proc + } + + if err := epc.Start(); err != nil { + return nil, err + } + return epc, nil +} + +func (cfg *etcdProcessClusterConfig) clientScheme() string { + if cfg.clientTLS == clientTLS { + return "https" + } + return "http" +} + +func (cfg *etcdProcessClusterConfig) peerScheme() string { + peerScheme := cfg.baseScheme + if peerScheme == "" { + peerScheme = "http" + } + if cfg.isPeerTLS { + peerScheme += "s" + } + return peerScheme +} + +func (cfg *etcdProcessClusterConfig) etcdServerProcessConfigs() []*etcdServerProcessConfig { + if cfg.basePort == 0 { + cfg.basePort = etcdProcessBasePort + } + if cfg.execPath == "" { + cfg.execPath = binPath + } + if cfg.snapCount == 0 { + cfg.snapCount = etcdserver.DefaultSnapCount + } + + etcdCfgs := make([]*etcdServerProcessConfig, cfg.clusterSize) + initialCluster := make([]string, cfg.clusterSize) + for i := 0; i < cfg.clusterSize; i++ { + var curls []string + var curl, curltls string + port := cfg.basePort + 4*i + curlHost := fmt.Sprintf("localhost:%d", port) + + switch cfg.clientTLS { + case clientNonTLS, clientTLS: + curl = (&url.URL{Scheme: cfg.clientScheme(), Host: curlHost}).String() + curls = []string{curl} + case clientTLSAndNonTLS: + curl = (&url.URL{Scheme: "http", Host: curlHost}).String() + curltls = (&url.URL{Scheme: "https", Host: curlHost}).String() + curls = []string{curl, curltls} + } + + purl := url.URL{Scheme: cfg.peerScheme(), Host: fmt.Sprintf("localhost:%d", port+1)} + name := fmt.Sprintf("testname%d", i) + dataDirPath := cfg.dataDirPath + if cfg.dataDirPath == "" { + var derr error + dataDirPath, derr = ioutil.TempDir("", name+".etcd") + if derr != nil { + panic("could not get tempdir for datadir") + } + } + initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String()) + + args := []string{ + "--name", name, + "--listen-client-urls", strings.Join(curls, ","), + "--advertise-client-urls", strings.Join(curls, ","), + "--listen-peer-urls", purl.String(), + "--initial-advertise-peer-urls", purl.String(), + "--initial-cluster-token", cfg.initialToken, + "--data-dir", dataDirPath, + "--snapshot-count", fmt.Sprintf("%d", cfg.snapCount), + } + if cfg.forceNewCluster { + args = append(args, "--force-new-cluster") + } + if cfg.quotaBackendBytes > 0 { + args = append(args, + "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes), + ) + } + if cfg.noStrictReconfig { + args = append(args, "--strict-reconfig-check=false") + } + + args = append(args, cfg.tlsArgs()...) + etcdCfgs[i] = &etcdServerProcessConfig{ + execPath: cfg.execPath, + args: args, + tlsArgs: cfg.tlsArgs(), + dataDirPath: dataDirPath, + keepDataDir: cfg.keepDataDir, + name: name, + purl: purl, + acurl: curl, + initialToken: cfg.initialToken, + } + } + + initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")} + for i := range etcdCfgs { + etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",") + etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...) + } + + return etcdCfgs +} + +func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) { + if cfg.clientTLS != clientNonTLS { + if cfg.isClientAutoTLS { + args = append(args, "--auto-tls") + } else { + tlsClientArgs := []string{ + "--cert-file", certPath, + "--key-file", privateKeyPath, + "--ca-file", caPath, + } + args = append(args, tlsClientArgs...) + + if cfg.clientCertAuthEnabled { + args = append(args, "--client-cert-auth") + } + } + } + + if cfg.isPeerTLS { + if cfg.isPeerAutoTLS { + args = append(args, "--peer-auto-tls") + } else { + tlsPeerArgs := []string{ + "--peer-cert-file", certPath, + "--peer-key-file", privateKeyPath, + "--peer-ca-file", caPath, + } + args = append(args, tlsPeerArgs...) + } + } + + if cfg.isClientCRL { + args = append(args, "--client-crl-file", crlPath, "--client-cert-auth") + } + + return args +} + +func (epc *etcdProcessCluster) EndpointsV2() []string { + return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV2() }) +} + +func (epc *etcdProcessCluster) EndpointsV3() []string { + return epc.endpoints(func(ep etcdProcess) []string { return ep.EndpointsV3() }) +} + +func (epc *etcdProcessCluster) endpoints(f func(ep etcdProcess) []string) (ret []string) { + for _, p := range epc.procs { + ret = append(ret, f(p)...) + } + return ret +} + +func (epc *etcdProcessCluster) Start() error { + return epc.start(func(ep etcdProcess) error { return ep.Start() }) +} + +func (epc *etcdProcessCluster) Restart() error { + return epc.start(func(ep etcdProcess) error { return ep.Restart() }) +} + +func (epc *etcdProcessCluster) start(f func(ep etcdProcess) error) error { + readyC := make(chan error, len(epc.procs)) + for i := range epc.procs { + go func(n int) { readyC <- f(epc.procs[n]) }(i) + } + for range epc.procs { + if err := <-readyC; err != nil { + epc.Close() + return err + } + } + return nil +} + +func (epc *etcdProcessCluster) Stop() (err error) { + for _, p := range epc.procs { + if p == nil { + continue + } + if curErr := p.Stop(); curErr != nil { + if err != nil { + err = fmt.Errorf("%v; %v", err, curErr) + } else { + err = curErr + } + } + } + return err +} + +func (epc *etcdProcessCluster) Close() error { + err := epc.Stop() + for _, p := range epc.procs { + // p is nil when newEtcdProcess fails in the middle + // Close still gets called to clean up test data + if p == nil { + continue + } + if cerr := p.Close(); cerr != nil { + err = cerr + } + } + return err +} + +func (epc *etcdProcessCluster) WithStopSignal(sig os.Signal) (ret os.Signal) { + for _, p := range epc.procs { + ret = p.WithStopSignal(sig) + } + return ret +} diff --git a/e2e/ctl_v2_test.go b/e2e/ctl_v2_test.go index c1635887dbe..f986eb1f311 100644 --- a/e2e/ctl_v2_test.go +++ b/e2e/ctl_v2_test.go @@ -128,10 +128,9 @@ func testCtlV2Ls(t *testing.T, cfg *etcdProcessClusterConfig, quorum bool) { } } -func TestCtlV2Watch(t *testing.T) { testCtlV2Watch(t, &configNoTLS, false) } -func TestCtlV2WatchTLS(t *testing.T) { testCtlV2Watch(t, &configTLS, false) } -func TestCtlV2WatchWithProxy(t *testing.T) { testCtlV2Watch(t, &configWithProxy, false) } -func TestCtlV2WatchWithProxyNoSync(t *testing.T) { testCtlV2Watch(t, &configWithProxy, true) } +func TestCtlV2Watch(t *testing.T) { testCtlV2Watch(t, &configNoTLS, false) } +func TestCtlV2WatchTLS(t *testing.T) { testCtlV2Watch(t, &configTLS, false) } + func testCtlV2Watch(t *testing.T, cfg *etcdProcessClusterConfig, noSync bool) { defer testutil.AfterTest(t) @@ -158,12 +157,10 @@ func testCtlV2Watch(t *testing.T, cfg *etcdProcessClusterConfig, noSync bool) { } } -func TestCtlV2GetRoleUser(t *testing.T) { testCtlV2GetRoleUser(t, &configNoTLS) } -func TestCtlV2GetRoleUserWithProxy(t *testing.T) { testCtlV2GetRoleUser(t, &configWithProxy) } -func testCtlV2GetRoleUser(t *testing.T, cfg *etcdProcessClusterConfig) { +func TestCtlV2GetRoleUser(t *testing.T) { defer testutil.AfterTest(t) - epc := setupEtcdctlTest(t, cfg, false) + epc := setupEtcdctlTest(t, &configNoTLS, false) defer func() { if err := epc.Close(); err != nil { t.Fatalf("error closing etcd processes (%v)", err) @@ -196,7 +193,7 @@ func TestCtlV2UserListRoot(t *testing.T) { testCtlV2UserList(t, "root") } func testCtlV2UserList(t *testing.T, username string) { defer testutil.AfterTest(t) - epc := setupEtcdctlTest(t, &configWithProxy, false) + epc := setupEtcdctlTest(t, &configNoTLS, false) defer func() { if err := epc.Close(); err != nil { t.Fatalf("error closing etcd processes (%v)", err) @@ -214,7 +211,7 @@ func testCtlV2UserList(t *testing.T, username string) { func TestCtlV2RoleList(t *testing.T) { defer testutil.AfterTest(t) - epc := setupEtcdctlTest(t, &configWithProxy, false) + epc := setupEtcdctlTest(t, &configNoTLS, false) defer func() { if err := epc.Close(); err != nil { t.Fatalf("error closing etcd processes (%v)", err) @@ -243,7 +240,7 @@ func TestCtlV2Backup(t *testing.T) { // For https://github.com/coreos/etcd/issue t.Fatal(err) } - if err := etcdctlBackup(epc1, epc1.procs[0].cfg.dataDirPath, backupDir); err != nil { + if err := etcdctlBackup(epc1, epc1.procs[0].Config().dataDirPath, backupDir); err != nil { t.Fatal(err) } @@ -350,16 +347,7 @@ func TestCtlV2ClusterHealth(t *testing.T) { } func etcdctlPrefixArgs(clus *etcdProcessCluster) []string { - endpoints := "" - if proxies := clus.proxies(); len(proxies) != 0 { - endpoints = proxies[0].cfg.acurl - } else if processes := clus.processes(); len(processes) != 0 { - es := []string{} - for _, b := range processes { - es = append(es, b.cfg.acurl) - } - endpoints = strings.Join(es, ",") - } + endpoints := strings.Join(clus.EndpointsV2(), ",") cmdArgs := []string{ctlBinPath, "--endpoints", endpoints} if clus.cfg.clientTLS == clientTLS { cmdArgs = append(cmdArgs, "--ca-file", caPath, "--cert-file", certPath, "--key-file", privateKeyPath) diff --git a/e2e/ctl_v3_alarm_test.go b/e2e/ctl_v3_alarm_test.go index 78dfeaddcdd..50baae5e9e7 100644 --- a/e2e/ctl_v3_alarm_test.go +++ b/e2e/ctl_v3_alarm_test.go @@ -64,7 +64,7 @@ func alarmTest(cx ctlCtx) { } } - eps := cx.epc.grpcEndpoints() + eps := cx.epc.EndpointsV3() // get latest revision to compact cli, err := clientv3.New(clientv3.Config{ diff --git a/e2e/ctl_v3_auth_test.go b/e2e/ctl_v3_auth_test.go index cf3043ccf09..e8fc31f81d5 100644 --- a/e2e/ctl_v3_auth_test.go +++ b/e2e/ctl_v3_auth_test.go @@ -12,10 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +// Skip proxy tests for now since auth is broken on grpcproxy. +// +build !cluster_proxy + package e2e import ( "fmt" + "os" "testing" "github.com/coreos/etcd/clientv3" @@ -44,6 +48,12 @@ func TestCtlV3AuthRoleGet(t *testing.T) { testCtl(t, authTestRoleGet) } func TestCtlV3AuthUserGet(t *testing.T) { testCtl(t, authTestUserGet) } func TestCtlV3AuthRoleList(t *testing.T) { testCtl(t, authTestRoleList) } +func TestCtlV3AuthDefrag(t *testing.T) { testCtl(t, authTestDefrag) } +func TestCtlV3AuthEndpointHealth(t *testing.T) { + testCtl(t, authTestEndpointHealth, withQuorum()) +} +func TestCtlV3AuthSnapshot(t *testing.T) { testCtl(t, authTestSnapshot) } + func authEnableTest(cx ctlCtx) { if err := authEnable(cx); err != nil { cx.t.Fatal(err) @@ -816,3 +826,92 @@ func authTestRoleList(cx ctlCtx) { cx.t.Fatal(err) } } + +func authTestDefrag(cx ctlCtx) { + maintenanceInitKeys(cx) + + if err := authEnable(cx); err != nil { + cx.t.Fatal(err) + } + + cx.user, cx.pass = "root", "root" + authSetupTestUser(cx) + + // ordinary user cannot defrag + cx.user, cx.pass = "test-user", "pass" + if err := ctlV3Defrag(cx); err == nil { + cx.t.Fatal("ordinary user should not be able to issue a defrag request") + } + + // root can defrag + cx.user, cx.pass = "root", "root" + if err := ctlV3Defrag(cx); err != nil { + cx.t.Fatal(err) + } +} + +func authTestSnapshot(cx ctlCtx) { + maintenanceInitKeys(cx) + + if err := authEnable(cx); err != nil { + cx.t.Fatal(err) + } + + cx.user, cx.pass = "root", "root" + authSetupTestUser(cx) + + fpath := "test.snapshot" + defer os.RemoveAll(fpath) + + // ordinary user cannot save a snapshot + cx.user, cx.pass = "test-user", "pass" + if err := ctlV3SnapshotSave(cx, fpath); err == nil { + cx.t.Fatal("ordinary user should not be able to save a snapshot") + } + + // root can save a snapshot + cx.user, cx.pass = "root", "root" + if err := ctlV3SnapshotSave(cx, fpath); err != nil { + cx.t.Fatalf("snapshotTest ctlV3SnapshotSave error (%v)", err) + } + + st, err := getSnapshotStatus(cx, fpath) + if err != nil { + cx.t.Fatalf("snapshotTest getSnapshotStatus error (%v)", err) + } + if st.Revision != 4 { + cx.t.Fatalf("expected 4, got %d", st.Revision) + } + if st.TotalKey < 3 { + cx.t.Fatalf("expected at least 3, got %d", st.TotalKey) + } +} + +func authTestEndpointHealth(cx ctlCtx) { + if err := authEnable(cx); err != nil { + cx.t.Fatal(err) + } + + cx.user, cx.pass = "root", "root" + authSetupTestUser(cx) + + if err := ctlV3EndpointHealth(cx); err != nil { + cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err) + } + + // health checking with an ordinary user "succeeds" since permission denial goes through consensus + cx.user, cx.pass = "test-user", "pass" + if err := ctlV3EndpointHealth(cx); err != nil { + cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err) + } + + // succeed if permissions granted for ordinary user + cx.user, cx.pass = "root", "root" + if err := ctlV3RoleGrantPermission(cx, "test-role", grantingPerm{true, true, "health", "", false}); err != nil { + cx.t.Fatal(err) + } + cx.user, cx.pass = "test-user", "pass" + if err := ctlV3EndpointHealth(cx); err != nil { + cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err) + } +} diff --git a/e2e/ctl_v3_defrag_test.go b/e2e/ctl_v3_defrag_test.go index cc197d36289..64c3bb9f0c3 100644 --- a/e2e/ctl_v3_defrag_test.go +++ b/e2e/ctl_v3_defrag_test.go @@ -16,8 +16,7 @@ package e2e import "testing" -func TestCtlV3Defrag(t *testing.T) { testCtl(t, defragTest) } -func TestCtlV3DefragWithAuth(t *testing.T) { testCtl(t, defragTestWithAuth) } +func TestCtlV3Defrag(t *testing.T) { testCtl(t, defragTest) } func maintenanceInitKeys(cx ctlCtx) { var kvs = []kv{{"key", "val1"}, {"key", "val2"}, {"key", "val3"}} @@ -40,29 +39,6 @@ func defragTest(cx ctlCtx) { } } -func defragTestWithAuth(cx ctlCtx) { - maintenanceInitKeys(cx) - - if err := authEnable(cx); err != nil { - cx.t.Fatal(err) - } - - cx.user, cx.pass = "root", "root" - authSetupTestUser(cx) - - // ordinary user cannot defrag - cx.user, cx.pass = "test-user", "pass" - if err := ctlV3Defrag(cx); err == nil { - cx.t.Fatal("ordinary user should not be able to issue a defrag request") - } - - // root can defrag - cx.user, cx.pass = "root", "root" - if err := ctlV3Defrag(cx); err != nil { - cx.t.Fatal(err) - } -} - func ctlV3Defrag(cx ctlCtx) error { cmdArgs := append(cx.PrefixArgs(), "defrag") lines := make([]string, cx.epc.cfg.clusterSize) diff --git a/e2e/ctl_v3_elect_test.go b/e2e/ctl_v3_elect_test.go index 2c9c986d3ac..410c00f813c 100644 --- a/e2e/ctl_v3_elect_test.go +++ b/e2e/ctl_v3_elect_test.go @@ -33,8 +33,8 @@ func TestCtlV3Elect(t *testing.T) { func testElect(cx ctlCtx) { // debugging for #6934 - sig := cx.epc.withStopSignal(debugLockSignal) - defer cx.epc.withStopSignal(sig) + sig := cx.epc.WithStopSignal(debugLockSignal) + defer cx.epc.WithStopSignal(sig) name := "a" diff --git a/e2e/ctl_v3_endpoint_test.go b/e2e/ctl_v3_endpoint_test.go index 3a42c1c9b4a..74a2ebb7a1d 100644 --- a/e2e/ctl_v3_endpoint_test.go +++ b/e2e/ctl_v3_endpoint_test.go @@ -21,9 +21,6 @@ import ( func TestCtlV3EndpointHealth(t *testing.T) { testCtl(t, endpointHealthTest, withQuorum()) } func TestCtlV3EndpointStatus(t *testing.T) { testCtl(t, endpointStatusTest, withQuorum()) } -func TestCtlV3EndpointHealthWithAuth(t *testing.T) { - testCtl(t, endpointHealthTestWithAuth, withQuorum()) -} func endpointHealthTest(cx ctlCtx) { if err := ctlV3EndpointHealth(cx); err != nil { @@ -49,38 +46,9 @@ func endpointStatusTest(cx ctlCtx) { func ctlV3EndpointStatus(cx ctlCtx) error { cmdArgs := append(cx.PrefixArgs(), "endpoint", "status") var eps []string - for _, ep := range cx.epc.endpoints() { + for _, ep := range cx.epc.EndpointsV3() { u, _ := url.Parse(ep) eps = append(eps, u.Host) } return spawnWithExpects(cmdArgs, eps...) } - -func endpointHealthTestWithAuth(cx ctlCtx) { - if err := authEnable(cx); err != nil { - cx.t.Fatal(err) - } - - cx.user, cx.pass = "root", "root" - authSetupTestUser(cx) - - if err := ctlV3EndpointHealth(cx); err != nil { - cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err) - } - - // health checking with an ordinary user "succeeds" since permission denial goes through consensus - cx.user, cx.pass = "test-user", "pass" - if err := ctlV3EndpointHealth(cx); err != nil { - cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err) - } - - // succeed if permissions granted for ordinary user - cx.user, cx.pass = "root", "root" - if err := ctlV3RoleGrantPermission(cx, "test-role", grantingPerm{true, true, "health", "", false}); err != nil { - cx.t.Fatal(err) - } - cx.user, cx.pass = "test-user", "pass" - if err := ctlV3EndpointHealth(cx); err != nil { - cx.t.Fatalf("endpointStatusTest ctlV3EndpointHealth error (%v)", err) - } -} diff --git a/e2e/ctl_v3_lock_test.go b/e2e/ctl_v3_lock_test.go index 416f26d8a41..ddda3fc03cc 100644 --- a/e2e/ctl_v3_lock_test.go +++ b/e2e/ctl_v3_lock_test.go @@ -56,8 +56,8 @@ func TestCtlV3Lock(t *testing.T) { func testLock(cx ctlCtx) { // debugging for #6464 - sig := cx.epc.withStopSignal(debugLockSignal) - defer cx.epc.withStopSignal(sig) + sig := cx.epc.WithStopSignal(debugLockSignal) + defer cx.epc.WithStopSignal(sig) name := "a" diff --git a/e2e/ctl_v3_migrate_test.go b/e2e/ctl_v3_migrate_test.go index 3136c4920f8..7252fe0a3a0 100644 --- a/e2e/ctl_v3_migrate_test.go +++ b/e2e/ctl_v3_migrate_test.go @@ -48,8 +48,8 @@ func TestCtlV3Migrate(t *testing.T) { } } - dataDir := epc.procs[0].cfg.dataDirPath - if err := epc.StopAll(); err != nil { + dataDir := epc.procs[0].Config().dataDirPath + if err := epc.Stop(); err != nil { t.Fatalf("error closing etcd processes (%v)", err) } @@ -65,8 +65,8 @@ func TestCtlV3Migrate(t *testing.T) { t.Fatal(err) } - epc.procs[0].cfg.keepDataDir = true - if err := epc.RestartAll(); err != nil { + epc.procs[0].Config().keepDataDir = true + if err := epc.Restart(); err != nil { t.Fatal(err) } @@ -75,7 +75,7 @@ func TestCtlV3Migrate(t *testing.T) { t.Fatal(err) } cli, err := clientv3.New(clientv3.Config{ - Endpoints: epc.grpcEndpoints(), + Endpoints: epc.EndpointsV3(), DialTimeout: 3 * time.Second, }) if err != nil { diff --git a/e2e/ctl_v3_move_leader_test.go b/e2e/ctl_v3_move_leader_test.go index ec2f134d5e3..eb2afda5f0c 100644 --- a/e2e/ctl_v3_move_leader_test.go +++ b/e2e/ctl_v3_move_leader_test.go @@ -39,7 +39,7 @@ func TestCtlV3MoveLeader(t *testing.T) { var leadIdx int var leaderID uint64 var transferee uint64 - for i, ep := range epc.grpcEndpoints() { + for i, ep := range epc.EndpointsV3() { cli, err := clientv3.New(clientv3.Config{ Endpoints: []string{ep}, DialTimeout: 3 * time.Second, @@ -75,11 +75,11 @@ func TestCtlV3MoveLeader(t *testing.T) { expect string }{ { // request to non-leader - cx.prefixArgs([]string{cx.epc.grpcEndpoints()[(leadIdx+1)%3]}), + cx.prefixArgs([]string{cx.epc.EndpointsV3()[(leadIdx+1)%3]}), "no leader endpoint given at ", }, { // request to leader - cx.prefixArgs([]string{cx.epc.grpcEndpoints()[leadIdx]}), + cx.prefixArgs([]string{cx.epc.EndpointsV3()[leadIdx]}), fmt.Sprintf("Leadership transferred from %s to %s", types.ID(leaderID), types.ID(transferee)), }, } diff --git a/e2e/ctl_v3_snapshot_test.go b/e2e/ctl_v3_snapshot_test.go index d2394885d62..234d5b037de 100644 --- a/e2e/ctl_v3_snapshot_test.go +++ b/e2e/ctl_v3_snapshot_test.go @@ -152,7 +152,7 @@ func TestIssue6361(t *testing.T) { }() dialTimeout := 7 * time.Second - prefixArgs := []string{ctlBinPath, "--endpoints", strings.Join(epc.grpcEndpoints(), ","), "--dial-timeout", dialTimeout.String()} + prefixArgs := []string{ctlBinPath, "--endpoints", strings.Join(epc.EndpointsV3(), ","), "--dial-timeout", dialTimeout.String()} // write some keys kvs := []kv{{"foo1", "val1"}, {"foo2", "val2"}, {"foo3", "val3"}} @@ -170,7 +170,7 @@ func TestIssue6361(t *testing.T) { t.Fatal(err) } - if err = epc.processes()[0].Stop(); err != nil { + if err = epc.procs[0].Stop(); err != nil { t.Fatal(err) } @@ -178,19 +178,19 @@ func TestIssue6361(t *testing.T) { defer os.RemoveAll(newDataDir) // etcdctl restore the snapshot - err = spawnWithExpect([]string{ctlBinPath, "snapshot", "restore", fpath, "--name", epc.procs[0].cfg.name, "--initial-cluster", epc.procs[0].cfg.initialCluster, "--initial-cluster-token", epc.procs[0].cfg.initialToken, "--initial-advertise-peer-urls", epc.procs[0].cfg.purl.String(), "--data-dir", newDataDir}, "membership: added member") + err = spawnWithExpect([]string{ctlBinPath, "snapshot", "restore", fpath, "--name", epc.procs[0].Config().name, "--initial-cluster", epc.procs[0].Config().initialCluster, "--initial-cluster-token", epc.procs[0].Config().initialToken, "--initial-advertise-peer-urls", epc.procs[0].Config().purl.String(), "--data-dir", newDataDir}, "membership: added member") if err != nil { t.Fatal(err) } // start the etcd member using the restored snapshot - epc.procs[0].cfg.dataDirPath = newDataDir - for i := range epc.procs[0].cfg.args { - if epc.procs[0].cfg.args[i] == "--data-dir" { - epc.procs[0].cfg.args[i+1] = newDataDir + epc.procs[0].Config().dataDirPath = newDataDir + for i := range epc.procs[0].Config().args { + if epc.procs[0].Config().args[i] == "--data-dir" { + epc.procs[0].Config().args[i+1] = newDataDir } } - if err = epc.processes()[0].Restart(); err != nil { + if err = epc.procs[0].Restart(); err != nil { t.Fatal(err) } @@ -217,11 +217,11 @@ func TestIssue6361(t *testing.T) { defer os.RemoveAll(newDataDir2) name2 := "infra2" - initialCluster2 := epc.procs[0].cfg.initialCluster + fmt.Sprintf(",%s=%s", name2, peerURL) + initialCluster2 := epc.procs[0].Config().initialCluster + fmt.Sprintf(",%s=%s", name2, peerURL) // start the new member var nepc *expect.ExpectProcess - nepc, err = spawnCmd([]string{epc.procs[0].cfg.execPath, "--name", name2, + nepc, err = spawnCmd([]string{epc.procs[0].Config().execPath, "--name", name2, "--listen-client-urls", clientURL, "--advertise-client-urls", clientURL, "--listen-peer-urls", peerURL, "--initial-advertise-peer-urls", peerURL, "--initial-cluster", initialCluster2, "--initial-cluster-state", "existing", "--data-dir", newDataDir2}) @@ -245,42 +245,3 @@ func TestIssue6361(t *testing.T) { t.Fatal(err) } } - -func TestCtlV3SnapshotWithAuth(t *testing.T) { testCtl(t, snapshotTestWithAuth) } - -func snapshotTestWithAuth(cx ctlCtx) { - maintenanceInitKeys(cx) - - if err := authEnable(cx); err != nil { - cx.t.Fatal(err) - } - - cx.user, cx.pass = "root", "root" - authSetupTestUser(cx) - - fpath := "test.snapshot" - defer os.RemoveAll(fpath) - - // ordinary user cannot save a snapshot - cx.user, cx.pass = "test-user", "pass" - if err := ctlV3SnapshotSave(cx, fpath); err == nil { - cx.t.Fatal("ordinary user should not be able to save a snapshot") - } - - // root can save a snapshot - cx.user, cx.pass = "root", "root" - if err := ctlV3SnapshotSave(cx, fpath); err != nil { - cx.t.Fatalf("snapshotTest ctlV3SnapshotSave error (%v)", err) - } - - st, err := getSnapshotStatus(cx, fpath) - if err != nil { - cx.t.Fatalf("snapshotTest getSnapshotStatus error (%v)", err) - } - if st.Revision != 4 { - cx.t.Fatalf("expected 4, got %d", st.Revision) - } - if st.TotalKey < 3 { - cx.t.Fatalf("expected at least 3, got %d", st.TotalKey) - } -} diff --git a/e2e/ctl_v3_test.go b/e2e/ctl_v3_test.go index a840aebc14a..45e2abeecb0 100644 --- a/e2e/ctl_v3_test.go +++ b/e2e/ctl_v3_test.go @@ -45,7 +45,7 @@ func TestCtlV3DialWithHTTPScheme(t *testing.T) { } func dialWithSchemeTest(cx ctlCtx) { - cmdArgs := append(cx.prefixArgs(cx.epc.endpoints()), "put", "foo", "bar") + cmdArgs := append(cx.prefixArgs(cx.epc.EndpointsV3()), "put", "foo", "bar") if err := spawnWithExpect(cmdArgs, "OK"); err != nil { cx.t.Fatal(err) } @@ -169,10 +169,6 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) { } func (cx *ctlCtx) prefixArgs(eps []string) []string { - if len(cx.epc.proxies()) > 0 { // TODO: add proxy check as in v2 - panic("v3 proxy not implemented") - } - fmap := make(map[string]string) fmap["endpoints"] = strings.Join(eps, ",") fmap["dial-timeout"] = cx.dialTimeout.String() @@ -212,7 +208,7 @@ func (cx *ctlCtx) prefixArgs(eps []string) []string { // PrefixArgs prefixes etcdctl command. // Make sure to unset environment variables after tests. func (cx *ctlCtx) PrefixArgs() []string { - return cx.prefixArgs(cx.epc.grpcEndpoints()) + return cx.prefixArgs(cx.epc.EndpointsV3()) } func isGRPCTimedout(err error) bool { diff --git a/e2e/etcd_config_test.go b/e2e/etcd_config_test.go index 9e308c3bcb9..e7531866ade 100644 --- a/e2e/etcd_config_test.go +++ b/e2e/etcd_config_test.go @@ -25,7 +25,7 @@ func TestEtcdExampleConfig(t *testing.T) { if err != nil { t.Fatal(err) } - if err = waitReadyExpectProc(proc, false); err != nil { + if err = waitReadyExpectProc(proc, etcdServerReadyLines); err != nil { t.Fatal(err) } if err = proc.Stop(); err != nil { diff --git a/e2e/etcd_process.go b/e2e/etcd_process.go new file mode 100644 index 00000000000..cfde0255a6e --- /dev/null +++ b/e2e/etcd_process.go @@ -0,0 +1,134 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "fmt" + "net/url" + "os" + + "github.com/coreos/etcd/pkg/expect" + "github.com/coreos/etcd/pkg/fileutil" +) + +var etcdServerReadyLines = []string{"enabled capabilities for version", "published"} + +// etcdProcess is a process that serves etcd requests. +type etcdProcess interface { + EndpointsV2() []string + EndpointsV3() []string + + Start() error + Restart() error + Stop() error + Close() error + WithStopSignal(sig os.Signal) os.Signal + Config() *etcdServerProcessConfig +} + +type etcdServerProcess struct { + cfg *etcdServerProcessConfig + proc *expect.ExpectProcess + donec chan struct{} // closed when Interact() terminates +} + +type etcdServerProcessConfig struct { + execPath string + args []string + tlsArgs []string + + dataDirPath string + keepDataDir bool + + name string + + purl url.URL + + acurl string + + initialToken string + initialCluster string +} + +func newEtcdServerProcess(cfg *etcdServerProcessConfig) (*etcdServerProcess, error) { + if !fileutil.Exist(cfg.execPath) { + return nil, fmt.Errorf("could not find etcd binary") + } + if !cfg.keepDataDir { + if err := os.RemoveAll(cfg.dataDirPath); err != nil { + return nil, err + } + } + return &etcdServerProcess{cfg: cfg, donec: make(chan struct{})}, nil +} + +func (ep *etcdServerProcess) EndpointsV2() []string { return []string{ep.cfg.acurl} } +func (ep *etcdServerProcess) EndpointsV3() []string { return ep.EndpointsV2() } + +func (ep *etcdServerProcess) Start() error { + if ep.proc != nil { + panic("already started") + } + proc, err := spawnCmd(append([]string{ep.cfg.execPath}, ep.cfg.args...)) + if err != nil { + return err + } + ep.proc = proc + return ep.waitReady() +} + +func (ep *etcdServerProcess) Restart() error { + if err := ep.Stop(); err != nil { + return err + } + ep.donec = make(chan struct{}) + return ep.Start() +} + +func (ep *etcdServerProcess) Stop() error { + if ep == nil || ep.proc == nil { + return nil + } + if err := ep.proc.Stop(); err != nil { + return err + } + ep.proc = nil + <-ep.donec + ep.donec = make(chan struct{}) + if ep.cfg.purl.Scheme == "unix" || ep.cfg.purl.Scheme == "unixs" { + os.Remove(ep.cfg.purl.Host + ep.cfg.purl.Path) + } + return nil +} + +func (ep *etcdServerProcess) Close() error { + if err := ep.Stop(); err != nil { + return err + } + return os.RemoveAll(ep.cfg.dataDirPath) +} + +func (ep *etcdServerProcess) WithStopSignal(sig os.Signal) os.Signal { + ret := ep.proc.StopSignal + ep.proc.StopSignal = sig + return ret +} + +func (ep *etcdServerProcess) waitReady() error { + defer close(ep.donec) + return waitReadyExpectProc(ep.proc, etcdServerReadyLines) +} + +func (ep *etcdServerProcess) Config() *etcdServerProcessConfig { return ep.cfg } diff --git a/e2e/etcd_release_upgrade_test.go b/e2e/etcd_release_upgrade_test.go index af958e89213..6b1d42323e7 100644 --- a/e2e/etcd_release_upgrade_test.go +++ b/e2e/etcd_release_upgrade_test.go @@ -88,8 +88,8 @@ func TestReleaseUpgrade(t *testing.T) { if err := epc.procs[i].Stop(); err != nil { t.Fatalf("#%d: error closing etcd process (%v)", i, err) } - epc.procs[i].cfg.execPath = binDir + "/etcd" - epc.procs[i].cfg.keepDataDir = true + epc.procs[i].Config().execPath = binDir + "/etcd" + epc.procs[i].Config().keepDataDir = true if err := epc.procs[i].Restart(); err != nil { t.Fatalf("error restarting etcd process (%v)", err) @@ -155,8 +155,8 @@ func TestReleaseUpgradeWithRestart(t *testing.T) { wg.Add(len(epc.procs)) for i := range epc.procs { go func(i int) { - epc.procs[i].cfg.execPath = binDir + "/etcd" - epc.procs[i].cfg.keepDataDir = true + epc.procs[i].Config().execPath = binDir + "/etcd" + epc.procs[i].Config().keepDataDir = true if err := epc.procs[i].Restart(); err != nil { t.Fatalf("error restarting etcd process (%v)", err) } diff --git a/e2e/etcd_spawn_cov.go b/e2e/etcd_spawn_cov.go index 6032ef2018b..c198c5377fd 100644 --- a/e2e/etcd_spawn_cov.go +++ b/e2e/etcd_spawn_cov.go @@ -33,20 +33,7 @@ const noOutputLineCount = 2 // cov-enabled binaries emit PASS and coverage count func spawnCmd(args []string) (*expect.ExpectProcess, error) { if args[0] == binPath { - covArgs, err := getCovArgs() - if err != nil { - return nil, err - } - ep, err := expect.NewExpectWithEnv(binDir+"/etcd_test", covArgs, args2env(args[1:])) - if err != nil { - return nil, err - } - // ep sends SIGTERM to etcd_test process on ep.close() - // allowing the process to exit gracefully in order to generate a coverage report. - // note: go runtime ignores SIGINT but not SIGTERM - // if e2e test is run as a background process. - ep.StopSignal = syscall.SIGTERM - return ep, nil + return spawnEtcd(args) } if args[0] == ctlBinPath { @@ -73,6 +60,32 @@ func spawnCmd(args []string) (*expect.ExpectProcess, error) { return expect.NewExpect(args[0], args[1:]...) } +func spawnEtcd(args []string) (*expect.ExpectProcess, error) { + covArgs, err := getCovArgs() + if err != nil { + return nil, err + } + + env := []string{} + if args[1] == "grpc-proxy" { + // avoid test flag conflicts in coverage enabled etcd by putting flags in ETCDCOV_ARGS + env = append(os.Environ(), "ETCDCOV_ARGS="+strings.Join(args, "\xe7\xcd")) + } else { + env = args2env(args[1:]) + } + + ep, err := expect.NewExpectWithEnv(binDir+"/etcd_test", covArgs, env) + if err != nil { + return nil, err + } + // ep sends SIGTERM to etcd_test process on ep.close() + // allowing the process to exit gracefully in order to generate a coverage report. + // note: go runtime ignores SIGINT but not SIGTERM + // if e2e test is run as a background process. + ep.StopSignal = syscall.SIGTERM + return ep, nil +} + func getCovArgs() ([]string, error) { coverPath := os.Getenv("COVERDIR") if !filepath.IsAbs(coverPath) { diff --git a/e2e/etcd_test.go b/e2e/etcd_test.go deleted file mode 100644 index 69e76985521..00000000000 --- a/e2e/etcd_test.go +++ /dev/null @@ -1,593 +0,0 @@ -// Copyright 2016 The etcd Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package e2e - -import ( - "fmt" - "io/ioutil" - "net/url" - "os" - "strings" - "time" - - "github.com/coreos/etcd/etcdserver" - "github.com/coreos/etcd/pkg/expect" - "github.com/coreos/etcd/pkg/fileutil" -) - -const etcdProcessBasePort = 20000 - -var ( - binPath string - ctlBinPath string - certPath string - privateKeyPath string - caPath string - - crlPath string - revokedCertPath string - revokedPrivateKeyPath string -) - -type clientConnType int - -const ( - clientNonTLS clientConnType = iota - clientTLS - clientTLSAndNonTLS -) - -var ( - configNoTLS = etcdProcessClusterConfig{ - clusterSize: 3, - proxySize: 0, - initialToken: "new", - } - configAutoTLS = etcdProcessClusterConfig{ - clusterSize: 3, - isPeerTLS: true, - isPeerAutoTLS: true, - initialToken: "new", - } - configTLS = etcdProcessClusterConfig{ - clusterSize: 3, - proxySize: 0, - clientTLS: clientTLS, - isPeerTLS: true, - initialToken: "new", - } - configClientTLS = etcdProcessClusterConfig{ - clusterSize: 3, - proxySize: 0, - clientTLS: clientTLS, - initialToken: "new", - } - configClientBoth = etcdProcessClusterConfig{ - clusterSize: 1, - proxySize: 0, - clientTLS: clientTLSAndNonTLS, - initialToken: "new", - } - configClientAutoTLS = etcdProcessClusterConfig{ - clusterSize: 1, - proxySize: 0, - isClientAutoTLS: true, - clientTLS: clientTLS, - initialToken: "new", - } - configPeerTLS = etcdProcessClusterConfig{ - clusterSize: 3, - proxySize: 0, - isPeerTLS: true, - initialToken: "new", - } - configWithProxy = etcdProcessClusterConfig{ - clusterSize: 3, - proxySize: 1, - initialToken: "new", - } - configWithProxyTLS = etcdProcessClusterConfig{ - clusterSize: 3, - proxySize: 1, - clientTLS: clientTLS, - isPeerTLS: true, - initialToken: "new", - } - configWithProxyPeerTLS = etcdProcessClusterConfig{ - clusterSize: 3, - proxySize: 1, - isPeerTLS: true, - initialToken: "new", - } - configClientTLSCertAuth = etcdProcessClusterConfig{ - clusterSize: 1, - proxySize: 0, - clientTLS: clientTLS, - initialToken: "new", - clientCertAuthEnabled: true, - } -) - -func configStandalone(cfg etcdProcessClusterConfig) *etcdProcessClusterConfig { - ret := cfg - ret.clusterSize = 1 - return &ret -} - -type etcdProcessCluster struct { - cfg *etcdProcessClusterConfig - procs []*etcdProcess -} - -type etcdProcess struct { - cfg *etcdProcessConfig - proc *expect.ExpectProcess - donec chan struct{} // closed when Interact() terminates -} - -type etcdProcessConfig struct { - execPath string - args []string - - dataDirPath string - keepDataDir bool - - name string - - purl url.URL - - acurl string - // additional url for tls connection when the etcd process - // serves both http and https - acurltls string - acurlHost string - - initialToken string - initialCluster string - - isProxy bool -} - -type etcdProcessClusterConfig struct { - execPath string - dataDirPath string - keepDataDir bool - - clusterSize int - - baseScheme string - basePort int - - proxySize int - - snapCount int // default is 10000 - - clientTLS clientConnType - clientCertAuthEnabled bool - isPeerTLS bool - isPeerAutoTLS bool - isClientAutoTLS bool - isClientCRL bool - - forceNewCluster bool - initialToken string - quotaBackendBytes int64 - noStrictReconfig bool -} - -// newEtcdProcessCluster launches a new cluster from etcd processes, returning -// a new etcdProcessCluster once all nodes are ready to accept client requests. -func newEtcdProcessCluster(cfg *etcdProcessClusterConfig) (*etcdProcessCluster, error) { - etcdCfgs := cfg.etcdProcessConfigs() - epc := &etcdProcessCluster{ - cfg: cfg, - procs: make([]*etcdProcess, cfg.clusterSize+cfg.proxySize), - } - - // launch etcd processes - for i := range etcdCfgs { - proc, err := newEtcdProcess(etcdCfgs[i]) - if err != nil { - epc.Close() - return nil, err - } - epc.procs[i] = proc - } - - return epc, epc.Start() -} - -func newEtcdProcess(cfg *etcdProcessConfig) (*etcdProcess, error) { - if !fileutil.Exist(cfg.execPath) { - return nil, fmt.Errorf("could not find etcd binary") - } - - if !cfg.keepDataDir { - if err := os.RemoveAll(cfg.dataDirPath); err != nil { - return nil, err - } - } - - child, err := spawnCmd(append([]string{cfg.execPath}, cfg.args...)) - if err != nil { - return nil, err - } - return &etcdProcess{cfg: cfg, proc: child, donec: make(chan struct{})}, nil -} - -func (cfg *etcdProcessClusterConfig) etcdProcessConfigs() []*etcdProcessConfig { - binPath = binDir + "/etcd" - ctlBinPath = binDir + "/etcdctl" - certPath = certDir + "/server.crt" - privateKeyPath = certDir + "/server.key.insecure" - caPath = certDir + "/ca.crt" - - revokedCertPath = certDir + "/server-revoked.crt" - revokedPrivateKeyPath = certDir + "/server-revoked.key.insecure" - crlPath = certDir + "/revoke.crl" - - if cfg.basePort == 0 { - cfg.basePort = etcdProcessBasePort - } - - if cfg.execPath == "" { - cfg.execPath = binPath - } - if cfg.snapCount == 0 { - cfg.snapCount = etcdserver.DefaultSnapCount - } - - clientScheme := "http" - if cfg.clientTLS == clientTLS { - clientScheme = "https" - } - peerScheme := cfg.baseScheme - if peerScheme == "" { - peerScheme = "http" - } - if cfg.isPeerTLS { - peerScheme += "s" - } - - etcdCfgs := make([]*etcdProcessConfig, cfg.clusterSize+cfg.proxySize) - initialCluster := make([]string, cfg.clusterSize) - for i := 0; i < cfg.clusterSize; i++ { - var curls []string - var curl, curltls string - port := cfg.basePort + 2*i - curlHost := fmt.Sprintf("localhost:%d", port) - - switch cfg.clientTLS { - case clientNonTLS, clientTLS: - curl = (&url.URL{Scheme: clientScheme, Host: curlHost}).String() - curls = []string{curl} - case clientTLSAndNonTLS: - curl = (&url.URL{Scheme: "http", Host: curlHost}).String() - curltls = (&url.URL{Scheme: "https", Host: curlHost}).String() - curls = []string{curl, curltls} - } - - purl := url.URL{Scheme: peerScheme, Host: fmt.Sprintf("localhost:%d", port+1)} - name := fmt.Sprintf("testname%d", i) - dataDirPath := cfg.dataDirPath - if cfg.dataDirPath == "" { - var derr error - dataDirPath, derr = ioutil.TempDir("", name+".etcd") - if derr != nil { - panic("could not get tempdir for datadir") - } - } - initialCluster[i] = fmt.Sprintf("%s=%s", name, purl.String()) - - args := []string{ - "--name", name, - "--listen-client-urls", strings.Join(curls, ","), - "--advertise-client-urls", strings.Join(curls, ","), - "--listen-peer-urls", purl.String(), - "--initial-advertise-peer-urls", purl.String(), - "--initial-cluster-token", cfg.initialToken, - "--data-dir", dataDirPath, - "--snapshot-count", fmt.Sprintf("%d", cfg.snapCount), - } - if cfg.forceNewCluster { - args = append(args, "--force-new-cluster") - } - if cfg.quotaBackendBytes > 0 { - args = append(args, - "--quota-backend-bytes", fmt.Sprintf("%d", cfg.quotaBackendBytes), - ) - } - if cfg.noStrictReconfig { - args = append(args, "--strict-reconfig-check=false") - } - - args = append(args, cfg.tlsArgs()...) - etcdCfgs[i] = &etcdProcessConfig{ - execPath: cfg.execPath, - args: args, - dataDirPath: dataDirPath, - keepDataDir: cfg.keepDataDir, - name: name, - purl: purl, - acurl: curl, - acurltls: curltls, - acurlHost: curlHost, - initialToken: cfg.initialToken, - } - } - for i := 0; i < cfg.proxySize; i++ { - port := cfg.basePort + 2*cfg.clusterSize + i + 1 - curlHost := fmt.Sprintf("localhost:%d", port) - curl := url.URL{Scheme: clientScheme, Host: curlHost} - name := fmt.Sprintf("testname-proxy%d", i) - dataDirPath, derr := ioutil.TempDir("", name+".etcd") - if derr != nil { - panic("could not get tempdir for datadir") - } - args := []string{ - "--name", name, - "--proxy", "on", - "--listen-client-urls", curl.String(), - "--data-dir", dataDirPath, - } - args = append(args, cfg.tlsArgs()...) - etcdCfgs[cfg.clusterSize+i] = &etcdProcessConfig{ - execPath: cfg.execPath, - args: args, - dataDirPath: dataDirPath, - keepDataDir: cfg.keepDataDir, - name: name, - acurl: curl.String(), - acurlHost: curlHost, - isProxy: true, - } - } - - initialClusterArgs := []string{"--initial-cluster", strings.Join(initialCluster, ",")} - for i := range etcdCfgs { - etcdCfgs[i].initialCluster = strings.Join(initialCluster, ",") - etcdCfgs[i].args = append(etcdCfgs[i].args, initialClusterArgs...) - } - - return etcdCfgs -} - -func (cfg *etcdProcessClusterConfig) tlsArgs() (args []string) { - if cfg.clientTLS != clientNonTLS { - if cfg.isClientAutoTLS { - args = append(args, "--auto-tls=true") - } else { - tlsClientArgs := []string{ - "--cert-file", certPath, - "--key-file", privateKeyPath, - "--ca-file", caPath, - } - args = append(args, tlsClientArgs...) - - if cfg.clientCertAuthEnabled { - args = append(args, "--client-cert-auth") - } - } - } - - if cfg.isPeerTLS { - if cfg.isPeerAutoTLS { - args = append(args, "--peer-auto-tls=true") - } else { - tlsPeerArgs := []string{ - "--peer-cert-file", certPath, - "--peer-key-file", privateKeyPath, - "--peer-ca-file", caPath, - } - args = append(args, tlsPeerArgs...) - } - } - - if cfg.isClientCRL { - args = append(args, "--client-crl-file", crlPath, "--client-cert-auth") - } - - return args -} - -func (epc *etcdProcessCluster) Start() (err error) { - readyC := make(chan error, epc.cfg.clusterSize+epc.cfg.proxySize) - for i := range epc.procs { - go func(n int) { readyC <- epc.procs[n].waitReady() }(i) - } - for range epc.procs { - if err := <-readyC; err != nil { - epc.Close() - return err - } - } - return nil -} - -func (epc *etcdProcessCluster) RestartAll() error { - for i := range epc.procs { - proc, err := newEtcdProcess(epc.procs[i].cfg) - if err != nil { - epc.Close() - return err - } - epc.procs[i] = proc - } - return epc.Start() -} - -func (epc *etcdProcessCluster) StopAll() (err error) { - for _, p := range epc.procs { - if p == nil { - continue - } - if curErr := p.Stop(); curErr != nil { - if err != nil { - err = fmt.Errorf("%v; %v", err, curErr) - } else { - err = curErr - } - } - } - return err -} - -func (epc *etcdProcessCluster) Close() error { - err := epc.StopAll() - for _, p := range epc.procs { - // p is nil when newEtcdProcess fails in the middle - // Close still gets called to clean up test data - if p == nil { - continue - } - os.RemoveAll(p.cfg.dataDirPath) - } - return err -} - -func (ep *etcdProcess) Restart() error { - newEp, err := newEtcdProcess(ep.cfg) - if err != nil { - ep.Stop() - return err - } - *ep = *newEp - if err = ep.waitReady(); err != nil { - ep.Stop() - return err - } - return nil -} - -func (ep *etcdProcess) Stop() error { - if ep == nil { - return nil - } - if err := ep.proc.Stop(); err != nil { - return err - } - <-ep.donec - - if ep.cfg.purl.Scheme == "unix" || ep.cfg.purl.Scheme == "unixs" { - os.Remove(ep.cfg.purl.Host + ep.cfg.purl.Path) - } - return nil -} - -func (ep *etcdProcess) waitReady() error { - defer close(ep.donec) - return waitReadyExpectProc(ep.proc, ep.cfg.isProxy) -} - -func waitReadyExpectProc(exproc *expect.ExpectProcess, isProxy bool) error { - readyStrs := []string{"enabled capabilities for version", "published"} - if isProxy { - readyStrs = []string{"httpproxy: endpoints found"} - } - c := 0 - matchSet := func(l string) bool { - for _, s := range readyStrs { - if strings.Contains(l, s) { - c++ - break - } - } - return c == len(readyStrs) - } - _, err := exproc.ExpectFunc(matchSet) - return err -} - -func spawnWithExpect(args []string, expected string) error { - return spawnWithExpects(args, []string{expected}...) -} - -func spawnWithExpects(args []string, xs ...string) error { - proc, err := spawnCmd(args) - if err != nil { - return err - } - // process until either stdout or stderr contains - // the expected string - var ( - lines []string - lineFunc = func(txt string) bool { return true } - ) - for _, txt := range xs { - for { - l, lerr := proc.ExpectFunc(lineFunc) - if lerr != nil { - proc.Close() - return fmt.Errorf("%v (expected %q, got %q)", lerr, txt, lines) - } - lines = append(lines, l) - if strings.Contains(l, txt) { - break - } - } - } - perr := proc.Close() - if len(xs) == 0 && proc.LineCount() != noOutputLineCount { // expect no output - return fmt.Errorf("unexpected output (got lines %q, line count %d)", lines, proc.LineCount()) - } - return perr -} - -// proxies returns only the proxy etcdProcess. -func (epc *etcdProcessCluster) proxies() []*etcdProcess { - return epc.procs[epc.cfg.clusterSize:] -} - -func (epc *etcdProcessCluster) processes() []*etcdProcess { - return epc.procs[:epc.cfg.clusterSize] -} - -func (epc *etcdProcessCluster) endpoints() []string { - eps := make([]string, epc.cfg.clusterSize) - for i, ep := range epc.processes() { - eps[i] = ep.cfg.acurl - } - return eps -} - -func (epc *etcdProcessCluster) grpcEndpoints() []string { - eps := make([]string, epc.cfg.clusterSize) - for i, ep := range epc.processes() { - eps[i] = ep.cfg.acurlHost - } - return eps -} - -func (epc *etcdProcessCluster) withStopSignal(sig os.Signal) os.Signal { - ret := epc.procs[0].proc.StopSignal - for _, p := range epc.procs { - p.proc.StopSignal = sig - } - return ret -} - -func closeWithTimeout(p *expect.ExpectProcess, d time.Duration) error { - errc := make(chan error, 1) - go func() { errc <- p.Close() }() - select { - case err := <-errc: - return err - case <-time.After(d): - p.Stop() - // retry close after stopping to collect SIGQUIT data, if any - closeWithTimeout(p, time.Second) - } - return fmt.Errorf("took longer than %v to Close process %+v", d, p) -} diff --git a/e2e/gateway_test.go b/e2e/gateway_test.go index 9eee0170e35..6539e6f84cb 100644 --- a/e2e/gateway_test.go +++ b/e2e/gateway_test.go @@ -31,9 +31,9 @@ func TestGateway(t *testing.T) { if err != nil { t.Fatal(err) } - defer ec.StopAll() + defer ec.Stop() - eps := strings.Join(ec.grpcEndpoints(), ",") + eps := strings.Join(ec.EndpointsV3(), ",") p := startGateway(t, eps) defer p.Stop() diff --git a/e2e/main_test.go b/e2e/main_test.go index 59589507841..858018a26a8 100644 --- a/e2e/main_test.go +++ b/e2e/main_test.go @@ -13,8 +13,20 @@ import ( "github.com/coreos/etcd/pkg/testutil" ) -var binDir string -var certDir string +var ( + binDir string + certDir string + + binPath string + ctlBinPath string + certPath string + privateKeyPath string + caPath string + + crlPath string + revokedCertPath string + revokedPrivateKeyPath string +) func TestMain(m *testing.M) { os.Setenv("ETCD_UNSUPPORTED_ARCH", runtime.GOARCH) @@ -24,6 +36,15 @@ func TestMain(m *testing.M) { flag.StringVar(&certDir, "cert-dir", "../integration/fixtures", "The directory for store certificate files.") flag.Parse() + binPath = binDir + "/etcd" + ctlBinPath = binDir + "/etcdctl" + certPath = certDir + "/server.crt" + privateKeyPath = certDir + "/server.key.insecure" + caPath = certDir + "/ca.crt" + revokedCertPath = certDir + "/server-revoked.crt" + revokedPrivateKeyPath = certDir + "/server-revoked.key.insecure" + crlPath = certDir + "/revoke.crl" + v := m.Run() if v == 0 && testutil.CheckLeakedGoroutine() { os.Exit(1) diff --git a/e2e/util.go b/e2e/util.go new file mode 100644 index 00000000000..e4125137a69 --- /dev/null +++ b/e2e/util.go @@ -0,0 +1,91 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "fmt" + "strings" + "time" + + "github.com/coreos/etcd/pkg/expect" +) + +func waitReadyExpectProc(exproc *expect.ExpectProcess, readyStrs []string) error { + c := 0 + matchSet := func(l string) bool { + for _, s := range readyStrs { + if strings.Contains(l, s) { + c++ + break + } + } + return c == len(readyStrs) + } + _, err := exproc.ExpectFunc(matchSet) + return err +} + +func spawnWithExpect(args []string, expected string) error { + return spawnWithExpects(args, []string{expected}...) +} + +func spawnWithExpects(args []string, xs ...string) error { + proc, err := spawnCmd(args) + if err != nil { + return err + } + // process until either stdout or stderr contains + // the expected string + var ( + lines []string + lineFunc = func(txt string) bool { return true } + ) + for _, txt := range xs { + for { + l, lerr := proc.ExpectFunc(lineFunc) + if lerr != nil { + proc.Close() + return fmt.Errorf("%v (expected %q, got %q)", lerr, txt, lines) + } + lines = append(lines, l) + if strings.Contains(l, txt) { + break + } + } + } + perr := proc.Close() + if len(xs) == 0 && proc.LineCount() != noOutputLineCount { // expect no output + return fmt.Errorf("unexpected output (got lines %q, line count %d)", lines, proc.LineCount()) + } + return perr +} + +func closeWithTimeout(p *expect.ExpectProcess, d time.Duration) error { + errc := make(chan error, 1) + go func() { errc <- p.Close() }() + select { + case err := <-errc: + return err + case <-time.After(d): + p.Stop() + // retry close after stopping to collect SIGQUIT data, if any + closeWithTimeout(p, time.Second) + } + return fmt.Errorf("took longer than %v to Close process %+v", d, p) +} + +func toTLS(s string) string { + return strings.Replace(s, "http://", "https://", 1) +} diff --git a/e2e/v2_curl_test.go b/e2e/v2_curl_test.go index a44227ec6d5..2322a8549f6 100644 --- a/e2e/v2_curl_test.go +++ b/e2e/v2_curl_test.go @@ -23,15 +23,12 @@ import ( "github.com/coreos/etcd/pkg/testutil" ) -func TestV2CurlNoTLS(t *testing.T) { testCurlPutGet(t, &configNoTLS) } -func TestV2CurlAutoTLS(t *testing.T) { testCurlPutGet(t, &configAutoTLS) } -func TestV2CurlAllTLS(t *testing.T) { testCurlPutGet(t, &configTLS) } -func TestV2CurlPeerTLS(t *testing.T) { testCurlPutGet(t, &configPeerTLS) } -func TestV2CurlClientTLS(t *testing.T) { testCurlPutGet(t, &configClientTLS) } -func TestV2CurlProxyNoTLS(t *testing.T) { testCurlPutGet(t, &configWithProxy) } -func TestV2CurlProxyTLS(t *testing.T) { testCurlPutGet(t, &configWithProxyTLS) } -func TestV2CurlProxyPeerTLS(t *testing.T) { testCurlPutGet(t, &configWithProxyPeerTLS) } -func TestV2CurlClientBoth(t *testing.T) { testCurlPutGet(t, &configClientBoth) } +func TestV2CurlNoTLS(t *testing.T) { testCurlPutGet(t, &configNoTLS) } +func TestV2CurlAutoTLS(t *testing.T) { testCurlPutGet(t, &configAutoTLS) } +func TestV2CurlAllTLS(t *testing.T) { testCurlPutGet(t, &configTLS) } +func TestV2CurlPeerTLS(t *testing.T) { testCurlPutGet(t, &configPeerTLS) } +func TestV2CurlClientTLS(t *testing.T) { testCurlPutGet(t, &configClientTLS) } +func TestV2CurlClientBoth(t *testing.T) { testCurlPutGet(t, &configClientBoth) } func testCurlPutGet(t *testing.T, cfg *etcdProcessClusterConfig) { defer testutil.AfterTest(t) @@ -135,14 +132,14 @@ type cURLReq struct { func cURLPrefixArgs(clus *etcdProcessCluster, method string, req cURLReq) []string { var ( cmdArgs = []string{"curl"} - acurl = clus.procs[rand.Intn(clus.cfg.clusterSize)].cfg.acurl + acurl = clus.procs[rand.Intn(clus.cfg.clusterSize)].Config().acurl ) if req.isTLS { if clus.cfg.clientTLS != clientTLSAndNonTLS { panic("should not use cURLPrefixArgsUseTLS when serving only TLS or non-TLS") } cmdArgs = append(cmdArgs, "--cacert", caPath, "--cert", certPath, "--key", privateKeyPath) - acurl = clus.procs[rand.Intn(clus.cfg.clusterSize)].cfg.acurltls + acurl = toTLS(clus.procs[rand.Intn(clus.cfg.clusterSize)].Config().acurl) } else if clus.cfg.clientTLS == clientTLS { cmdArgs = append(cmdArgs, "--cacert", caPath, "--cert", certPath, "--key", privateKeyPath) } From 7c22d35dff51af9b563adc265a295e43b0d4d5f9 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 20 Jul 2017 10:44:14 -0700 Subject: [PATCH 02/11] etcdmain: support grpc-proxy/gateway compiled with -tags cov --- etcdmain/main.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/etcdmain/main.go b/etcdmain/main.go index fd4e7f69657..06bbae56b8d 100644 --- a/etcdmain/main.go +++ b/etcdmain/main.go @@ -17,6 +17,7 @@ package etcdmain import ( "fmt" "os" + "strings" "github.com/coreos/go-systemd/daemon" systemdutil "github.com/coreos/go-systemd/util" @@ -26,7 +27,13 @@ func Main() { checkSupportArch() if len(os.Args) > 1 { - switch os.Args[1] { + cmd := os.Args[1] + if covArgs := os.Getenv("ETCDCOV_ARGS"); len(covArgs) > 0 { + args := strings.Split(os.Getenv("ETCDCOV_ARGS"), "\xe7\xcd")[1:] + rootCmd.SetArgs(args) + cmd = "grpc-proxy" + } + switch cmd { case "gateway", "grpc-proxy": if err := rootCmd.Execute(); err != nil { fmt.Fprint(os.Stderr, err) From 426ad25924e1f125741ecc142d940b402eba1f9d Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 18 Jul 2017 13:36:42 -0700 Subject: [PATCH 03/11] transport: include InsecureSkipVerify in TLSInfo Some functions take a TLSInfo to generate a tls.Config and there was no way to force the InsecureSkipVerify flag. --- pkg/transport/listener.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/transport/listener.go b/pkg/transport/listener.go index 12120beaefd..33ba17fe12d 100644 --- a/pkg/transport/listener.go +++ b/pkg/transport/listener.go @@ -56,12 +56,13 @@ func wrapTLS(addr, scheme string, tlsinfo *TLSInfo, l net.Listener) (net.Listene } type TLSInfo struct { - CertFile string - KeyFile string - CAFile string - TrustedCAFile string - ClientCertAuth bool - CRLFile string + CertFile string + KeyFile string + CAFile string + TrustedCAFile string + ClientCertAuth bool + CRLFile string + InsecureSkipVerify bool // ServerName ensures the cert matches the given host in case of discovery / virtual hosting ServerName string @@ -236,6 +237,7 @@ func (info TLSInfo) ClientConfig() (*tls.Config, error) { } else { cfg = &tls.Config{ServerName: info.ServerName} } + cfg.InsecureSkipVerify = info.InsecureSkipVerify CAFiles := info.cafiles() if len(CAFiles) > 0 { From 5d6c6ad20e8fd144d0d22056009ffec0d2c66750 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 17 Jul 2017 13:26:12 -0700 Subject: [PATCH 04/11] etcdmain: use client tls info for v2 proxy client connections Was defaulting to PeerTLSInfo for client connections to the etcd cluster. Since proxy users may rely on this behavior, only use the client tls info if given, and fall back to peer tls otherwise. --- etcdmain/etcd.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 722df51f92f..7c4cef50350 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -199,7 +199,14 @@ func startEtcd(cfg *embed.Config) (<-chan struct{}, <-chan error, error) { func startProxy(cfg *config) error { plog.Notice("proxy: this proxy supports v2 API only!") - pt, err := transport.NewTimeoutTransport(cfg.PeerTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond) + clientTLSInfo := cfg.ClientTLSInfo + if clientTLSInfo.Empty() { + // Support old proxy behavior of defaulting to PeerTLSInfo + // for both client and peer connections. + clientTLSInfo = cfg.PeerTLSInfo + } + + pt, err := transport.NewTimeoutTransport(clientTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond) if err != nil { return err } From d5a0d4d696dec06304ca6389275127b71b313e11 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 17 Jul 2017 14:34:59 -0700 Subject: [PATCH 05/11] etcdmain, embed: --auto-peer-tls and --auto-tls for v2 proxy Fixes #7930 --- embed/config.go | 29 +++++++++++++++++++++++++++++ embed/etcd.go | 29 ++++------------------------- etcdmain/etcd.go | 5 +++++ 3 files changed, 38 insertions(+), 25 deletions(-) diff --git a/embed/config.go b/embed/config.go index ee5fcce4f08..2fb2a3280e7 100644 --- a/embed/config.go +++ b/embed/config.go @@ -20,6 +20,7 @@ import ( "net" "net/http" "net/url" + "path/filepath" "strings" "github.com/coreos/etcd/etcdserver" @@ -393,6 +394,34 @@ func (cfg Config) defaultClientHost() bool { return len(cfg.ACUrls) == 1 && cfg.ACUrls[0].String() == DefaultAdvertiseClientURLs } +func (cfg *Config) ClientSelfCert() (err error) { + if cfg.ClientAutoTLS && cfg.ClientTLSInfo.Empty() { + chosts := make([]string, len(cfg.LCUrls)) + for i, u := range cfg.LCUrls { + chosts[i] = u.Host + } + cfg.ClientTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "client"), chosts) + return err + } else if cfg.ClientAutoTLS { + plog.Warningf("ignoring client auto TLS since certs given") + } + return nil +} + +func (cfg *Config) PeerSelfCert() (err error) { + if cfg.PeerAutoTLS && cfg.PeerTLSInfo.Empty() { + phosts := make([]string, len(cfg.LPUrls)) + for i, u := range cfg.LPUrls { + phosts[i] = u.Host + } + cfg.PeerTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "peer"), phosts) + return err + } else if cfg.PeerAutoTLS { + plog.Warningf("ignoring peer auto TLS since certs given") + } + return nil +} + // UpdateDefaultClusterFromName updates cluster advertise URLs with, if available, default host, // if advertise URLs are default values(localhost:2379,2380) AND if listen URL is 0.0.0.0. // e.g. advertise peer URL localhost:2380 or listen peer URL 0.0.0.0:2380 diff --git a/embed/etcd.go b/embed/etcd.go index c5c0d2d699f..b48caa8989e 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -22,7 +22,6 @@ import ( "net" "net/http" "net/url" - "path/filepath" "sync" "time" @@ -248,19 +247,9 @@ func (e *Etcd) Close() { func (e *Etcd) Err() <-chan error { return e.errc } func startPeerListeners(cfg *Config) (peers []*peerListener, err error) { - if cfg.PeerAutoTLS && cfg.PeerTLSInfo.Empty() { - phosts := make([]string, len(cfg.LPUrls)) - for i, u := range cfg.LPUrls { - phosts[i] = u.Host - } - cfg.PeerTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "peer"), phosts) - if err != nil { - plog.Fatalf("could not get certs (%v)", err) - } - } else if cfg.PeerAutoTLS { - plog.Warningf("ignoring peer auto TLS since certs given") + if err = cfg.PeerSelfCert(); err != nil { + plog.Fatalf("could not get certs (%v)", err) } - if !cfg.PeerTLSInfo.Empty() { plog.Infof("peerTLS: %s", cfg.PeerTLSInfo) } @@ -302,19 +291,9 @@ func startPeerListeners(cfg *Config) (peers []*peerListener, err error) { } func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) { - if cfg.ClientAutoTLS && cfg.ClientTLSInfo.Empty() { - chosts := make([]string, len(cfg.LCUrls)) - for i, u := range cfg.LCUrls { - chosts[i] = u.Host - } - cfg.ClientTLSInfo, err = transport.SelfCert(filepath.Join(cfg.Dir, "fixtures", "client"), chosts) - if err != nil { - plog.Fatalf("could not get certs (%v)", err) - } - } else if cfg.ClientAutoTLS { - plog.Warningf("ignoring client auto TLS since certs given") + if err = cfg.ClientSelfCert(); err != nil { + plog.Fatalf("could not get certs (%v)", err) } - if cfg.EnablePprof { plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf) } diff --git a/etcdmain/etcd.go b/etcdmain/etcd.go index 7c4cef50350..f04eb4f8a12 100644 --- a/etcdmain/etcd.go +++ b/etcdmain/etcd.go @@ -205,6 +205,8 @@ func startProxy(cfg *config) error { // for both client and peer connections. clientTLSInfo = cfg.PeerTLSInfo } + clientTLSInfo.InsecureSkipVerify = cfg.ClientAutoTLS + cfg.PeerTLSInfo.InsecureSkipVerify = cfg.PeerAutoTLS pt, err := transport.NewTimeoutTransport(clientTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond) if err != nil { @@ -212,6 +214,9 @@ func startProxy(cfg *config) error { } pt.MaxIdleConnsPerHost = httpproxy.DefaultMaxIdleConnsPerHost + if err = cfg.PeerSelfCert(); err != nil { + plog.Fatalf("could not get certs (%v)", err) + } tr, err := transport.NewTimeoutTransport(cfg.PeerTLSInfo, time.Duration(cfg.ProxyDialTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyReadTimeoutMs)*time.Millisecond, time.Duration(cfg.ProxyWriteTimeoutMs)*time.Millisecond) if err != nil { return err From 1365f87d4046469bcc94ad03048a52b8b4367bb0 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 17 Jul 2017 15:36:19 -0700 Subject: [PATCH 06/11] etcdmain: cleanup grpcproxy; support different certs for proxy/etcd Enables TLS termination in grpcproxy. --- etcdmain/grpc_proxy.go | 230 ++++++++++++++++++++++++----------------- 1 file changed, 135 insertions(+), 95 deletions(-) diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index 800ab712d33..3e64f885757 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -17,6 +17,7 @@ package etcdmain import ( "crypto/tls" "fmt" + "math" "net" "net/http" "net/url" @@ -45,9 +46,18 @@ var ( grpcProxyEndpoints []string grpcProxyDNSCluster string grpcProxyInsecureDiscovery bool - grpcProxyCert string - grpcProxyKey string - grpcProxyCA string + + // tls for connecting to etcd + + grpcProxyCA string + grpcProxyCert string + grpcProxyKey string + + // tls for clients connecting to proxy + + grpcProxyListenCA string + grpcProxyListenCert string + grpcProxyListenKey string grpcProxyAdvertiseClientURL string grpcProxyResolverPrefix string @@ -85,19 +95,64 @@ func newGRPCProxyStartCommand() *cobra.Command { cmd.Flags().StringVar(&grpcProxyMetricsListenAddr, "metrics-addr", "", "listen for /metrics requests on an additional interface") cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records") cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints") - cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file") - cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file") - cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle") cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)") cmd.Flags().StringVar(&grpcProxyResolverPrefix, "resolver-prefix", "", "prefix to use for registering proxy (must be shared with other grpc-proxy members)") cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints") cmd.Flags().StringVar(&grpcProxyNamespace, "namespace", "", "string to prefix to all keys for namespacing requests") cmd.Flags().BoolVar(&grpcProxyEnablePprof, "enable-pprof", false, `Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"`) + // client TLS for connecting to server + cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file") + cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file") + cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle") + + // client TLS for connecting to proxy + cmd.Flags().StringVar(&grpcProxyListenCert, "cert-file", "", "identify secure connections to the proxy using this TLS certificate file") + cmd.Flags().StringVar(&grpcProxyListenKey, "key-file", "", "identify secure connections to the proxy using this TLS key file") + cmd.Flags().StringVar(&grpcProxyListenCA, "trusted-ca-file", "", "verify certificates of TLS-enabled secure proxy using this CA bundle") + return &cmd } func startGRPCProxy(cmd *cobra.Command, args []string) { + checkArgs() + + tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey) + if tlsinfo != nil { + plog.Infof("ServerTLS: %s", tlsinfo) + } + m := mustListenCMux(tlsinfo) + + grpcl := m.Match(cmux.HTTP2()) + defer func() { + grpcl.Close() + plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr) + }() + + client := mustNewClient() + + srvhttp, httpl := mustHTTPListener(m, tlsinfo) + errc := make(chan error) + go func() { errc <- newGRPCProxyServer(client).Serve(grpcl) }() + go func() { errc <- srvhttp.Serve(httpl) }() + go func() { errc <- m.Serve() }() + if len(grpcProxyMetricsListenAddr) > 0 { + mhttpl := mustMetricsListener(tlsinfo) + go func() { + mux := http.NewServeMux() + mux.Handle("/metrics", prometheus.Handler()) + plog.Fatal(http.Serve(mhttpl, mux)) + }() + } + + // grpc-proxy is initialized, ready to serve + notifySystemd() + + fmt.Fprintln(os.Stderr, <-errc) + os.Exit(1) +} + +func checkArgs() { if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 { fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-ttl %d", grpcProxyResolverTTL)) os.Exit(1) @@ -110,40 +165,76 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { fmt.Fprintln(os.Stderr, fmt.Errorf("invalid advertise-client-url %q", grpcProxyAdvertiseClientURL)) os.Exit(1) } +} +func mustNewClient() *clientv3.Client { srvs := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery) - if len(srvs.Endpoints) != 0 { - grpcProxyEndpoints = srvs.Endpoints + eps := srvs.Endpoints + if len(eps) == 0 { + eps = grpcProxyEndpoints } - - l, err := net.Listen("tcp", grpcProxyListenAddr) + cfg, err := newClientCfg(eps) if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } - if l, err = transport.NewKeepAliveListener(l, "tcp", nil); err != nil { + client, err := clientv3.New(*cfg) + if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } - plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr) - defer func() { - l.Close() - plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr) - }() - m := cmux.New(l) + return client +} + +func newClientCfg(eps []string) (*clientv3.Config, error) { + // set tls if any one tls option set + cfg := clientv3.Config{ + Endpoints: eps, + DialTimeout: 5 * time.Second, + } + if tls := newTLS(grpcProxyCA, grpcProxyCert, grpcProxyKey); tls != nil { + clientTLS, err := tls.ClientConfig() + if err != nil { + return nil, err + } + cfg.TLS = clientTLS + plog.Infof("ClientTLS: %s", tls) + } + // TODO: support insecure tls + return &cfg, nil +} - cfg, cfgtls, err := newClientCfg() +func newTLS(ca, cert, key string) *transport.TLSInfo { + if ca == "" && cert == "" && key == "" { + return nil + } + return &transport.TLSInfo{CAFile: ca, CertFile: cert, KeyFile: key} +} + +func mustListenCMux(tlsinfo *transport.TLSInfo) cmux.CMux { + l, err := net.Listen("tcp", grpcProxyListenAddr) if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } - client, err := clientv3.New(*cfg) - if err != nil { + var tlscfg *tls.Config + scheme := "http" + if tlsinfo != nil { + if tlscfg, err = tlsinfo.ServerConfig(); err != nil { + plog.Fatal(err) + } + scheme = "https" + } + if l, err = transport.NewKeepAliveListener(l, scheme, tlscfg); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } + plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr) + return cmux.New(l) +} +func newGRPCProxyServer(client *clientv3.Client) *grpc.Server { if len(grpcProxyNamespace) > 0 { client.KV = namespace.NewKV(client.KV, grpcProxyNamespace) client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace) @@ -165,7 +256,9 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { server := grpc.NewServer( grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), + grpc.MaxConcurrentStreams(math.MaxUint32), ) + pb.RegisterKVServer(server, kvp) pb.RegisterWatchServer(server, watchp) pb.RegisterClusterServer(server, clusterp) @@ -174,12 +267,10 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { pb.RegisterAuthServer(server, authp) v3electionpb.RegisterElectionServer(server, electionp) v3lockpb.RegisterLockServer(server, lockp) + return server +} - errc := make(chan error) - - grpcl := m.Match(cmux.HTTP2()) - go func() { errc <- server.Serve(grpcl) }() - +func mustHTTPListener(m cmux.CMux, tlsinfo *transport.TLSInfo) (*http.Server, net.Listener) { httpmux := http.NewServeMux() httpmux.HandleFunc("/", http.NotFound) httpmux.Handle("/metrics", prometheus.Handler()) @@ -189,82 +280,31 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { } plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf) } + srvhttp := &http.Server{Handler: httpmux} - srvhttp := &http.Server{ - Handler: httpmux, - } - - var httpl net.Listener - if cfg.TLS != nil { - srvhttp.TLSConfig = cfg.TLS - httpl = tls.NewListener(m.Match(cmux.Any()), cfg.TLS) - } else { - httpl = m.Match(cmux.HTTP1()) + if tlsinfo == nil { + return srvhttp, m.Match(cmux.HTTP1()) } - go func() { errc <- srvhttp.Serve(httpl) }() - - go func() { errc <- m.Serve() }() - - if len(grpcProxyMetricsListenAddr) > 0 { - murl, err := url.Parse(grpcProxyMetricsListenAddr) - if err != nil { - fmt.Fprintf(os.Stderr, "cannot parse %q", grpcProxyMetricsListenAddr) - os.Exit(1) - } - ml, err := transport.NewListener(murl.Host, murl.Scheme, cfgtls) - if err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } - - mux := http.NewServeMux() - mux.Handle("/metrics", prometheus.Handler()) - go func() { - plog.Info("grpc-proxy: listening for metrics on ", murl.String()) - plog.Fatal(http.Serve(ml, mux)) - }() + srvTLS, err := tlsinfo.ServerConfig() + if err != nil { + plog.Fatalf("could not setup TLS (%v)", err) } - - // grpc-proxy is initialized, ready to serve - notifySystemd() - - fmt.Fprintln(os.Stderr, <-errc) - os.Exit(1) + srvhttp.TLSConfig = srvTLS + return srvhttp, m.Match(cmux.Any()) } -func newClientCfg() (*clientv3.Config, *transport.TLSInfo, error) { - // set tls if any one tls option set - var cfgtls *transport.TLSInfo - tlsinfo := transport.TLSInfo{} - if grpcProxyCert != "" { - tlsinfo.CertFile = grpcProxyCert - cfgtls = &tlsinfo - } - - if grpcProxyKey != "" { - tlsinfo.KeyFile = grpcProxyKey - cfgtls = &tlsinfo - } - - if grpcProxyCA != "" { - tlsinfo.CAFile = grpcProxyCA - cfgtls = &tlsinfo - } - - cfg := clientv3.Config{ - Endpoints: grpcProxyEndpoints, - DialTimeout: 5 * time.Second, +func mustMetricsListener(tlsinfo *transport.TLSInfo) net.Listener { + murl, err := url.Parse(grpcProxyMetricsListenAddr) + if err != nil { + fmt.Fprintf(os.Stderr, "cannot parse %q", grpcProxyMetricsListenAddr) + os.Exit(1) } - if cfgtls != nil { - clientTLS, err := cfgtls.ClientConfig() - if err != nil { - return nil, nil, err - } - cfg.TLS = clientTLS + ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsinfo) + if err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) } - - // TODO: support insecure tls - - return &cfg, cfgtls, nil + plog.Info("grpc-proxy: listening for metrics on ", murl.String()) + return ml } From efbee9d8c788e434cae8da03b03ad55f5a6f67c5 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 18 Jul 2017 14:09:32 -0700 Subject: [PATCH 07/11] etcdmain: support --auto-tls and --insecure-skip-verify in grpcproxy --- etcdmain/grpc_proxy.go | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index 3e64f885757..027a61a5d4c 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -22,6 +22,7 @@ import ( "net/http" "net/url" "os" + "path/filepath" "time" "github.com/coreos/etcd/clientv3" @@ -46,18 +47,21 @@ var ( grpcProxyEndpoints []string grpcProxyDNSCluster string grpcProxyInsecureDiscovery bool + grpcProxyDataDir string // tls for connecting to etcd - grpcProxyCA string - grpcProxyCert string - grpcProxyKey string + grpcProxyCA string + grpcProxyCert string + grpcProxyKey string + grpcProxyInsecureSkipTLSVerify bool // tls for clients connecting to proxy - grpcProxyListenCA string - grpcProxyListenCert string - grpcProxyListenKey string + grpcProxyListenCA string + grpcProxyListenCert string + grpcProxyListenKey string + grpcProxyListenAutoTLS bool grpcProxyAdvertiseClientURL string grpcProxyResolverPrefix string @@ -100,16 +104,19 @@ func newGRPCProxyStartCommand() *cobra.Command { cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints") cmd.Flags().StringVar(&grpcProxyNamespace, "namespace", "", "string to prefix to all keys for namespacing requests") cmd.Flags().BoolVar(&grpcProxyEnablePprof, "enable-pprof", false, `Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"`) + cmd.Flags().StringVar(&grpcProxyDataDir, "data-dir", "default.proxy", "Data directory for persistent data") // client TLS for connecting to server cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file") cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file") cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle") + cmd.Flags().BoolVar(&grpcProxyInsecureSkipTLSVerify, "insecure-skip-tls-verify", false, "skip authentication of etcd server TLS certificates") // client TLS for connecting to proxy cmd.Flags().StringVar(&grpcProxyListenCert, "cert-file", "", "identify secure connections to the proxy using this TLS certificate file") cmd.Flags().StringVar(&grpcProxyListenKey, "key-file", "", "identify secure connections to the proxy using this TLS key file") cmd.Flags().StringVar(&grpcProxyListenCA, "trusted-ca-file", "", "verify certificates of TLS-enabled secure proxy using this CA bundle") + cmd.Flags().BoolVar(&grpcProxyListenAutoTLS, "auto-tls", false, "proxy TLS using generated certificates") return &cmd } @@ -118,6 +125,15 @@ func startGRPCProxy(cmd *cobra.Command, args []string) { checkArgs() tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey) + if tlsinfo == nil && grpcProxyListenAutoTLS { + host := []string{"https://" + grpcProxyListenAddr} + dir := filepath.Join(grpcProxyDataDir, "fixtures", "proxy") + autoTLS, err := transport.SelfCert(dir, host) + if err != nil { + plog.Fatal(err) + } + tlsinfo = &autoTLS + } if tlsinfo != nil { plog.Infof("ServerTLS: %s", tlsinfo) } @@ -192,15 +208,19 @@ func newClientCfg(eps []string) (*clientv3.Config, error) { Endpoints: eps, DialTimeout: 5 * time.Second, } - if tls := newTLS(grpcProxyCA, grpcProxyCert, grpcProxyKey); tls != nil { + tls := newTLS(grpcProxyCA, grpcProxyCert, grpcProxyKey) + if tls == nil && grpcProxyInsecureSkipTLSVerify { + tls = &transport.TLSInfo{} + } + if tls != nil { clientTLS, err := tls.ClientConfig() if err != nil { return nil, err } + clientTLS.InsecureSkipVerify = grpcProxyInsecureSkipTLSVerify cfg.TLS = clientTLS plog.Infof("ClientTLS: %s", tls) } - // TODO: support insecure tls return &cfg, nil } From c5447c2ec91bc349117327de9f7f019cf6cdf1b5 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 18 Jul 2017 14:59:35 -0700 Subject: [PATCH 08/11] etcdmain: support crl in grpcproxy --- etcdmain/grpc_proxy.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index 027a61a5d4c..0fdf69ef514 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -15,7 +15,6 @@ package etcdmain import ( - "crypto/tls" "fmt" "math" "net" @@ -62,6 +61,7 @@ var ( grpcProxyListenCert string grpcProxyListenKey string grpcProxyListenAutoTLS bool + grpcProxyListenCRL string grpcProxyAdvertiseClientURL string grpcProxyResolverPrefix string @@ -117,6 +117,7 @@ func newGRPCProxyStartCommand() *cobra.Command { cmd.Flags().StringVar(&grpcProxyListenKey, "key-file", "", "identify secure connections to the proxy using this TLS key file") cmd.Flags().StringVar(&grpcProxyListenCA, "trusted-ca-file", "", "verify certificates of TLS-enabled secure proxy using this CA bundle") cmd.Flags().BoolVar(&grpcProxyListenAutoTLS, "auto-tls", false, "proxy TLS using generated certificates") + cmd.Flags().StringVar(&grpcProxyListenCRL, "client-crl-file", "", "proxy client certificate revocation list file.") return &cmd } @@ -238,18 +239,17 @@ func mustListenCMux(tlsinfo *transport.TLSInfo) cmux.CMux { os.Exit(1) } - var tlscfg *tls.Config - scheme := "http" + if l, err = transport.NewKeepAliveListener(l, "tcp", nil); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } if tlsinfo != nil { - if tlscfg, err = tlsinfo.ServerConfig(); err != nil { + tlsinfo.CRLFile = grpcProxyListenCRL + if l, err = transport.NewTLSListener(l, tlsinfo); err != nil { plog.Fatal(err) } - scheme = "https" - } - if l, err = transport.NewKeepAliveListener(l, scheme, tlscfg); err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) } + plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr) return cmux.New(l) } From 1dcae41b20d52384020a3e9b39d7a1204c281ac3 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 18 Jul 2017 15:04:41 -0700 Subject: [PATCH 09/11] grpcproxy: return nil on receiving snapshot EOF Gets "code = OutOfRange desc = EOF" errors otherwise. --- proxy/grpcproxy/maintenance.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/proxy/grpcproxy/maintenance.go b/proxy/grpcproxy/maintenance.go index 0a852c35089..2f57cbb307b 100644 --- a/proxy/grpcproxy/maintenance.go +++ b/proxy/grpcproxy/maintenance.go @@ -15,6 +15,8 @@ package grpcproxy import ( + "io" + "golang.org/x/net/context" "github.com/coreos/etcd/clientv3" @@ -49,6 +51,9 @@ func (mp *maintenanceProxy) Snapshot(sr *pb.SnapshotRequest, stream pb.Maintenan for { rr, err := sc.Recv() if err != nil { + if err == io.EOF { + return nil + } return err } err = stream.Send(rr) From 107828d7779d16be21fbcb39d3329818ca839076 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 18 Jul 2017 15:12:07 -0700 Subject: [PATCH 10/11] test: support -tags cluster_proxy for e2e tests --- test | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test b/test index ebd4689065b..d933020a7f8 100755 --- a/test +++ b/test @@ -170,7 +170,10 @@ function cov_pass { # use 30m timeout because e2e coverage takes longer # due to many tests cause etcd process to wait # on leadership transfer timeout during gracefully shutdown + echo Testing e2e without proxy... go test -tags cov -timeout 30m -v ${REPO_PATH}"/e2e" || failed="$failed e2e" + echo Testing e2e with proxy... + go test -tags "cov cluster_proxy" -timeout 30m -v ${REPO_PATH}"/e2e" || failed="$failed e2e-proxy" # incrementally merge to get coverage data even if some coverage files are corrupted # optimistically assume etcdserver package's coverage file is OK since gocovmerge @@ -217,6 +220,7 @@ function integration_e2e_pass { function grpcproxy_pass { go test -timeout 20m -v ${RACE} -tags cluster_proxy -cpu 1,2,4 $@ ${REPO_PATH}/integration go test -timeout 15m -v ${RACE} -tags cluster_proxy -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration + go test -timeout 15m -v -tags cluster_proxy $@ ${REPO_PATH}/e2e } function release_pass { From 954ec4d1a55570c33bc7104140fab38006546132 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Thu, 20 Jul 2017 16:14:52 -0700 Subject: [PATCH 11/11] e2e: fix range indexing for args2env conversion Was dropping the last argument in the slice. --- e2e/etcd_spawn_cov.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/etcd_spawn_cov.go b/e2e/etcd_spawn_cov.go index c198c5377fd..ca45a571efc 100644 --- a/e2e/etcd_spawn_cov.go +++ b/e2e/etcd_spawn_cov.go @@ -105,7 +105,7 @@ func getCovArgs() ([]string, error) { func args2env(args []string) []string { var covEnvs []string - for i := range args[1:] { + for i := range args { if !strings.HasPrefix(args[i], "--") { continue }