Skip to content

Commit

Permalink
Merge pull request #8277 from heyitsanthony/test-e2e-grpcproxy
Browse files Browse the repository at this point in the history
e2e grpcproxy tests
  • Loading branch information
heyitsanthony committed Jul 21, 2017
2 parents a64d15e + 954ec4d commit 2eb9353
Show file tree
Hide file tree
Showing 31 changed files with 1,311 additions and 904 deletions.
21 changes: 21 additions & 0 deletions e2e/cluster_direct_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build !cluster_proxy

package e2e

func newEtcdProcess(cfg *etcdServerProcessConfig) (etcdProcess, error) {
return newEtcdServerProcess(cfg)
}
278 changes: 278 additions & 0 deletions e2e/cluster_proxy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// +build cluster_proxy

package e2e

import (
"fmt"
"net"
"net/url"
"os"
"strconv"
"strings"

"github.com/coreos/etcd/pkg/expect"
)

type proxyEtcdProcess struct {
etcdProc etcdProcess
proxyV2 *proxyV2Proc
proxyV3 *proxyV3Proc
}

func newEtcdProcess(cfg *etcdServerProcessConfig) (etcdProcess, error) {
return newProxyEtcdProcess(cfg)
}

func newProxyEtcdProcess(cfg *etcdServerProcessConfig) (*proxyEtcdProcess, error) {
ep, err := newEtcdServerProcess(cfg)
if err != nil {
return nil, err
}
pep := &proxyEtcdProcess{
etcdProc: ep,
proxyV2: newProxyV2Proc(cfg),
proxyV3: newProxyV3Proc(cfg),
}
return pep, nil
}

func (p *proxyEtcdProcess) Config() *etcdServerProcessConfig { return p.etcdProc.Config() }

func (p *proxyEtcdProcess) EndpointsV2() []string { return p.proxyV2.endpoints() }
func (p *proxyEtcdProcess) EndpointsV3() []string { return p.proxyV3.endpoints() }

func (p *proxyEtcdProcess) Start() error {
if err := p.etcdProc.Start(); err != nil {
return err
}
if err := p.proxyV2.Start(); err != nil {
return err
}
return p.proxyV3.Start()
}

func (p *proxyEtcdProcess) Restart() error {
if err := p.etcdProc.Restart(); err != nil {
return err
}
if err := p.proxyV2.Restart(); err != nil {
return err
}
return p.proxyV3.Restart()
}

func (p *proxyEtcdProcess) Stop() error {
err := p.proxyV2.Stop()
if v3err := p.proxyV3.Stop(); err == nil {
err = v3err
}
if eerr := p.etcdProc.Stop(); eerr != nil && err == nil {
// fails on go-grpc issue #1384
if !strings.Contains(eerr.Error(), "exit status 2") {
err = eerr
}
}
return err
}

func (p *proxyEtcdProcess) Close() error {
err := p.proxyV2.Close()
if v3err := p.proxyV3.Close(); err == nil {
err = v3err
}
if eerr := p.etcdProc.Close(); eerr != nil && err == nil {
// fails on go-grpc issue #1384
if !strings.Contains(eerr.Error(), "exit status 2") {
err = eerr
}
}
return err
}

func (p *proxyEtcdProcess) WithStopSignal(sig os.Signal) os.Signal {
p.proxyV3.WithStopSignal(sig)
p.proxyV3.WithStopSignal(sig)
return p.etcdProc.WithStopSignal(sig)
}

type proxyProc struct {
execPath string
args []string
ep string
donec chan struct{}

proc *expect.ExpectProcess
}

func (pp *proxyProc) endpoints() []string { return []string{pp.ep} }

func (pp *proxyProc) start() error {
if pp.proc != nil {
panic("already started")
}
proc, err := spawnCmd(append([]string{pp.execPath}, pp.args...))
if err != nil {
return err
}
pp.proc = proc
return nil
}

func (pp *proxyProc) waitReady(readyStr string) error {
defer close(pp.donec)
return waitReadyExpectProc(pp.proc, []string{readyStr})
}

func (pp *proxyProc) Stop() error {
if pp.proc == nil {
return nil
}
if err := pp.proc.Stop(); err != nil && !strings.Contains(err.Error(), "exit status 1") {
// v2proxy exits with status 1 on auto tls; not sure why
return err
}
pp.proc = nil
<-pp.donec
pp.donec = make(chan struct{})
return nil
}

func (pp *proxyProc) WithStopSignal(sig os.Signal) os.Signal {
ret := pp.proc.StopSignal
pp.proc.StopSignal = sig
return ret
}

func (pp *proxyProc) Close() error { return pp.Stop() }

type proxyV2Proc struct {
proxyProc
dataDir string
}

func proxyListenURL(cfg *etcdServerProcessConfig, portOffset int) string {
u, err := url.Parse(cfg.acurl)
if err != nil {
panic(err)
}
host, port, _ := net.SplitHostPort(u.Host)
p, _ := strconv.ParseInt(port, 10, 16)
u.Host = fmt.Sprintf("%s:%d", host, int(p)+portOffset)
return u.String()
}

func newProxyV2Proc(cfg *etcdServerProcessConfig) *proxyV2Proc {
listenAddr := proxyListenURL(cfg, 2)
name := fmt.Sprintf("testname-proxy-%p", cfg)
args := []string{
"--name", name,
"--proxy", "on",
"--listen-client-urls", listenAddr,
"--initial-cluster", cfg.name + "=" + cfg.purl.String(),
}
return &proxyV2Proc{
proxyProc{
execPath: cfg.execPath,
args: append(args, cfg.tlsArgs...),
ep: listenAddr,
donec: make(chan struct{}),
},
name + ".etcd",
}
}

func (v2p *proxyV2Proc) Start() error {
os.RemoveAll(v2p.dataDir)
if err := v2p.start(); err != nil {
return err
}
return v2p.waitReady("httpproxy: endpoints found")
}

func (v2p *proxyV2Proc) Restart() error {
if err := v2p.Stop(); err != nil {
return err
}
return v2p.Start()
}

func (v2p *proxyV2Proc) Stop() error {
if err := v2p.proxyProc.Stop(); err != nil {
return err
}
// v2 proxy caches members; avoid reuse of directory
return os.RemoveAll(v2p.dataDir)
}

type proxyV3Proc struct {
proxyProc
}

func newProxyV3Proc(cfg *etcdServerProcessConfig) *proxyV3Proc {
listenAddr := proxyListenURL(cfg, 3)
args := []string{
"grpc-proxy",
"start",
"--listen-addr", strings.Split(listenAddr, "/")[2],
"--endpoints", cfg.acurl,
// pass-through member RPCs
"--advertise-client-url", "",
}
tlsArgs := []string{}
for i := 0; i < len(cfg.tlsArgs); i++ {
switch cfg.tlsArgs[i] {
case "--cert-file":
tlsArgs = append(tlsArgs, "--cert", cfg.tlsArgs[i+1], "--cert-file", cfg.tlsArgs[i+1])
i++
case "--key-file":
tlsArgs = append(tlsArgs, "--key", cfg.tlsArgs[i+1], "--key-file", cfg.tlsArgs[i+1])
i++
case "--ca-file":
tlsArgs = append(tlsArgs, "--cacert", cfg.tlsArgs[i+1], "--trusted-ca-file", cfg.tlsArgs[i+1])
i++
case "--auto-tls":
tlsArgs = append(tlsArgs, "--auto-tls", "--insecure-skip-tls-verify")
case "--peer-ca-file", "--peer-cert-file", "--peer-key-file":
i++ // skip arg
case "--client-cert-auth", "--peer-auto-tls":
default:
tlsArgs = append(tlsArgs, cfg.tlsArgs[i])
}
}
return &proxyV3Proc{
proxyProc{
execPath: cfg.execPath,
args: append(args, tlsArgs...),
ep: listenAddr,
donec: make(chan struct{}),
},
}
}

func (v3p *proxyV3Proc) Restart() error {
if err := v3p.Stop(); err != nil {
return err
}
return v3p.Start()
}

func (v3p *proxyV3Proc) Start() error {
if err := v3p.start(); err != nil {
return err
}
return v3p.waitReady("listening for grpc-proxy client requests")
}
Loading

0 comments on commit 2eb9353

Please sign in to comment.