Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdmain: backport support for different certs for etcd-gRPC proxy #9894

Merged
merged 2 commits into from
Jul 2, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 138 additions & 74 deletions etcdmain/grpc_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package etcdmain
import (
"crypto/tls"
"fmt"
"math"
"net"
"net/http"
"net/url"
"os"
"time"

Expand All @@ -40,12 +42,22 @@ import (

var (
grpcProxyListenAddr string
grpcProxyMetricsListenAddr string
grpcProxyEndpoints []string
grpcProxyDNSCluster string
grpcProxyInsecureDiscovery bool
grpcProxyCert string
grpcProxyKey string
grpcProxyCA string

// tls for connecting to etcd

grpcProxyCA string
grpcProxyCert string
grpcProxyKey string

// tls for clients connecting to proxy

grpcProxyListenCA string
grpcProxyListenCert string
grpcProxyListenKey string

grpcProxyAdvertiseClientURL string
grpcProxyResolverPrefix string
Expand Down Expand Up @@ -80,21 +92,67 @@ func newGRPCProxyStartCommand() *cobra.Command {

cmd.Flags().StringVar(&grpcProxyListenAddr, "listen-addr", "127.0.0.1:23790", "listen address")
cmd.Flags().StringVar(&grpcProxyDNSCluster, "discovery-srv", "", "DNS domain used to bootstrap initial cluster")
cmd.Flags().StringVar(&grpcProxyMetricsListenAddr, "metrics-addr", "", "listen for /metrics requests on an additional interface")
cmd.Flags().BoolVar(&grpcProxyInsecureDiscovery, "insecure-discovery", false, "accept insecure SRV records")
cmd.Flags().StringSliceVar(&grpcProxyEndpoints, "endpoints", []string{"127.0.0.1:2379"}, "comma separated etcd cluster endpoints")
cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")
cmd.Flags().StringVar(&grpcProxyAdvertiseClientURL, "advertise-client-url", "127.0.0.1:23790", "advertise address to register (must be reachable by client)")
cmd.Flags().StringVar(&grpcProxyResolverPrefix, "resolver-prefix", "", "prefix to use for registering proxy (must be shared with other grpc-proxy members)")
cmd.Flags().IntVar(&grpcProxyResolverTTL, "resolver-ttl", 0, "specify TTL, in seconds, when registering proxy endpoints")
cmd.Flags().StringVar(&grpcProxyNamespace, "namespace", "", "string to prefix to all keys for namespacing requests")
cmd.Flags().BoolVar(&grpcProxyEnablePprof, "enable-pprof", false, `Enable runtime profiling data via HTTP server. Address is at client URL + "/debug/pprof/"`)

// client TLS for connecting to server
cmd.Flags().StringVar(&grpcProxyCert, "cert", "", "identify secure connections with etcd servers using this TLS certificate file")
cmd.Flags().StringVar(&grpcProxyKey, "key", "", "identify secure connections with etcd servers using this TLS key file")
cmd.Flags().StringVar(&grpcProxyCA, "cacert", "", "verify certificates of TLS-enabled secure etcd servers using this CA bundle")

// client TLS for connecting to proxy
cmd.Flags().StringVar(&grpcProxyListenCert, "cert-file", "", "identify secure connections to the proxy using this TLS certificate file")
cmd.Flags().StringVar(&grpcProxyListenKey, "key-file", "", "identify secure connections to the proxy using this TLS key file")
cmd.Flags().StringVar(&grpcProxyListenCA, "trusted-ca-file", "", "verify certificates of TLS-enabled secure proxy using this CA bundle")

return &cmd
}

func startGRPCProxy(cmd *cobra.Command, args []string) {
checkArgs()

tlsinfo := newTLS(grpcProxyListenCA, grpcProxyListenCert, grpcProxyListenKey)
if tlsinfo != nil {
plog.Infof("ServerTLS: %s", tlsinfo)
}
m := mustListenCMux(tlsinfo)

grpcl := m.Match(cmux.HTTP2())
defer func() {
grpcl.Close()
plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
}()

client := mustNewClient()

srvhttp, httpl := mustHTTPListener(m, tlsinfo)
errc := make(chan error)
go func() { errc <- newGRPCProxyServer(client).Serve(grpcl) }()
go func() { errc <- srvhttp.Serve(httpl) }()
go func() { errc <- m.Serve() }()
if len(grpcProxyMetricsListenAddr) > 0 {
mhttpl := mustMetricsListener(tlsinfo)
go func() {
mux := http.NewServeMux()
mux.Handle("/metrics", prometheus.Handler())
plog.Fatal(http.Serve(mhttpl, mux))
}()
}

// grpc-proxy is initialized, ready to serve
notifySystemd()

fmt.Fprintln(os.Stderr, <-errc)
os.Exit(1)
}

func checkArgs() {
if grpcProxyResolverPrefix != "" && grpcProxyResolverTTL < 1 {
fmt.Fprintln(os.Stderr, fmt.Errorf("invalid resolver-ttl %d", grpcProxyResolverTTL))
os.Exit(1)
Expand All @@ -107,40 +165,76 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
fmt.Fprintln(os.Stderr, fmt.Errorf("invalid advertise-client-url %q", grpcProxyAdvertiseClientURL))
os.Exit(1)
}
}

func mustNewClient() *clientv3.Client {
srvs := discoverEndpoints(grpcProxyDNSCluster, grpcProxyCA, grpcProxyInsecureDiscovery)
if len(srvs.Endpoints) != 0 {
grpcProxyEndpoints = srvs.Endpoints
eps := srvs.Endpoints
if len(eps) == 0 {
eps = grpcProxyEndpoints
}

l, err := net.Listen("tcp", grpcProxyListenAddr)
cfg, err := newClientCfg(eps)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
if l, err = transport.NewKeepAliveListener(l, "tcp", nil); err != nil {
client, err := clientv3.New(*cfg)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
defer func() {
l.Close()
plog.Infof("stopping listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
}()
m := cmux.New(l)
return client
}

func newClientCfg(eps []string) (*clientv3.Config, error) {
// set tls if any one tls option set
cfg := clientv3.Config{
Endpoints: eps,
DialTimeout: 5 * time.Second,
}
if tls := newTLS(grpcProxyCA, grpcProxyCert, grpcProxyKey); tls != nil {
clientTLS, err := tls.ClientConfig()
if err != nil {
return nil, err
}
cfg.TLS = clientTLS
plog.Infof("ClientTLS: %s", tls)
}
// TODO: support insecure tls
return &cfg, nil
}

func newTLS(ca, cert, key string) *transport.TLSInfo {
if ca == "" && cert == "" && key == "" {
return nil
}
return &transport.TLSInfo{CAFile: ca, CertFile: cert, KeyFile: key}
}

cfg, err := newClientCfg()
func mustListenCMux(tlsinfo *transport.TLSInfo) cmux.CMux {
l, err := net.Listen("tcp", grpcProxyListenAddr)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}

client, err := clientv3.New(*cfg)
if err != nil {
var tlscfg *tls.Config
scheme := "http"
if tlsinfo != nil {
if tlscfg, err = tlsinfo.ServerConfig(); err != nil {
plog.Fatal(err)
}
scheme = "https"
}
if l, err = transport.NewKeepAliveListener(l, scheme, tlscfg); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
plog.Infof("listening for grpc-proxy client requests on %s", grpcProxyListenAddr)
return cmux.New(l)
}

func newGRPCProxyServer(client *clientv3.Client) *grpc.Server {
if len(grpcProxyNamespace) > 0 {
client.KV = namespace.NewKV(client.KV, grpcProxyNamespace)
client.Watcher = namespace.NewWatcher(client.Watcher, grpcProxyNamespace)
Expand All @@ -162,7 +256,9 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
server := grpc.NewServer(
grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),
grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor),
grpc.MaxConcurrentStreams(math.MaxUint32),
)

pb.RegisterKVServer(server, kvp)
pb.RegisterWatchServer(server, watchp)
pb.RegisterClusterServer(server, clusterp)
Expand All @@ -171,12 +267,10 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
pb.RegisterAuthServer(server, authp)
v3electionpb.RegisterElectionServer(server, electionp)
v3lockpb.RegisterLockServer(server, lockp)
return server
}

errc := make(chan error)

grpcl := m.Match(cmux.HTTP2())
go func() { errc <- server.Serve(grpcl) }()

func mustHTTPListener(m cmux.CMux, tlsinfo *transport.TLSInfo) (*http.Server, net.Listener) {
httpmux := http.NewServeMux()
httpmux.HandleFunc("/", http.NotFound)
httpmux.Handle("/metrics", prometheus.Handler())
Expand All @@ -186,61 +280,31 @@ func startGRPCProxy(cmd *cobra.Command, args []string) {
}
plog.Infof("pprof is enabled under %s", debugutil.HTTPPrefixPProf)
}
srvhttp := &http.Server{Handler: httpmux}

srvhttp := &http.Server{
Handler: httpmux,
if tlsinfo == nil {
return srvhttp, m.Match(cmux.HTTP1())
}

var httpl net.Listener
if cfg.TLS != nil {
srvhttp.TLSConfig = cfg.TLS
httpl = tls.NewListener(m.Match(cmux.Any()), cfg.TLS)
} else {
httpl = m.Match(cmux.HTTP1())
srvTLS, err := tlsinfo.ServerConfig()
if err != nil {
plog.Fatalf("could not setup TLS (%v)", err)
}
go func() { errc <- srvhttp.Serve(httpl) }()

go func() { errc <- m.Serve() }()

// grpc-proxy is initialized, ready to serve
notifySystemd()

fmt.Fprintln(os.Stderr, <-errc)
os.Exit(1)
srvhttp.TLSConfig = srvTLS
return srvhttp, m.Match(cmux.Any())
}

func newClientCfg() (*clientv3.Config, error) {
// set tls if any one tls option set
var cfgtls *transport.TLSInfo
tlsinfo := transport.TLSInfo{}
if grpcProxyCert != "" {
tlsinfo.CertFile = grpcProxyCert
cfgtls = &tlsinfo
}

if grpcProxyKey != "" {
tlsinfo.KeyFile = grpcProxyKey
cfgtls = &tlsinfo
}

if grpcProxyCA != "" {
tlsinfo.CAFile = grpcProxyCA
cfgtls = &tlsinfo
}

cfg := clientv3.Config{
Endpoints: grpcProxyEndpoints,
DialTimeout: 5 * time.Second,
func mustMetricsListener(tlsinfo *transport.TLSInfo) net.Listener {
murl, err := url.Parse(grpcProxyMetricsListenAddr)
if err != nil {
fmt.Fprintf(os.Stderr, "cannot parse %q", grpcProxyMetricsListenAddr)
os.Exit(1)
}
if cfgtls != nil {
clientTLS, err := cfgtls.ClientConfig()
if err != nil {
return nil, err
}
cfg.TLS = clientTLS
ml, err := transport.NewListener(murl.Host, murl.Scheme, tlsinfo)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}

// TODO: support insecure tls

return &cfg, nil
plog.Info("grpc-proxy: listening for metrics on ", murl.String())
return ml
}