Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow user to separate http and grpc server #15446

Merged
merged 5 commits into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 40 additions & 13 deletions server/embed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,12 +211,12 @@ type Config struct {
// streams that each client can open at a time.
MaxConcurrentStreams uint32 `json:"max-concurrent-streams"`

ListenPeerUrls, ListenClientUrls []url.URL
AdvertisePeerUrls, AdvertiseClientUrls []url.URL
ClientTLSInfo transport.TLSInfo
ClientAutoTLS bool
PeerTLSInfo transport.TLSInfo
PeerAutoTLS bool
ListenPeerUrls, ListenClientUrls, ListenClientHttpUrls []url.URL
serathius marked this conversation as resolved.
Show resolved Hide resolved
AdvertisePeerUrls, AdvertiseClientUrls []url.URL
ClientTLSInfo transport.TLSInfo
ClientAutoTLS bool
PeerTLSInfo transport.TLSInfo
PeerAutoTLS bool
// SelfSignedCertValidity specifies the validity period of the client and peer certificates
// that are automatically generated by etcd when you specify ClientAutoTLS and PeerAutoTLS,
// the unit is year, and the default is 1
Expand Down Expand Up @@ -439,10 +439,11 @@ type configYAML struct {

// configJSON has file options that are translated into Config options
type configJSON struct {
ListenPeerUrls string `json:"listen-peer-urls"`
ListenClientUrls string `json:"listen-client-urls"`
AdvertisePeerUrls string `json:"initial-advertise-peer-urls"`
AdvertiseClientUrls string `json:"advertise-client-urls"`
ListenPeerUrls string `json:"listen-peer-urls"`
ListenClientUrls string `json:"listen-client-urls"`
ListenClientHttpUrls string `json:"listen-client-http-urls"`
AdvertisePeerUrls string `json:"initial-advertise-peer-urls"`
AdvertiseClientUrls string `json:"advertise-client-urls"`

CORSJSON string `json:"cors"`
HostWhitelistJSON string `json:"host-whitelist"`
Expand Down Expand Up @@ -589,6 +590,15 @@ func (cfg *configYAML) configFromFile(path string) error {
cfg.Config.ListenClientUrls = u
}

if cfg.configJSON.ListenClientHttpUrls != "" {
u, err := types.NewURLs(strings.Split(cfg.configJSON.ListenClientHttpUrls, ","))
if err != nil {
fmt.Fprintf(os.Stderr, "unexpected error setting up listen-client-http-urls: %v\n", err)
os.Exit(1)
}
cfg.Config.ListenClientHttpUrls = u
}

if cfg.configJSON.AdvertisePeerUrls != "" {
u, err := types.NewURLs(strings.Split(cfg.configJSON.AdvertisePeerUrls, ","))
if err != nil {
Expand Down Expand Up @@ -688,6 +698,12 @@ func (cfg *Config) Validate() error {
if err := checkBindURLs(cfg.ListenClientUrls); err != nil {
return err
}
if err := checkBindURLs(cfg.ListenClientHttpUrls); err != nil {
return err
}
if len(cfg.ListenClientHttpUrls) == 0 {
cfg.logger.Warn("Running http and grpc server on single port. This is not recommended for production.")
serathius marked this conversation as resolved.
Show resolved Hide resolved
}
if err := checkBindURLs(cfg.ListenMetricsUrls); err != nil {
return err
}
Expand Down Expand Up @@ -957,9 +973,12 @@ func (cfg *Config) ClientSelfCert() (err error) {
cfg.logger.Warn("ignoring client auto TLS since certs given")
return nil
}
chosts := make([]string, len(cfg.ListenClientUrls))
for i, u := range cfg.ListenClientUrls {
chosts[i] = u.Host
chosts := make([]string, 0, len(cfg.ListenClientUrls)+len(cfg.ListenClientHttpUrls))
for _, u := range cfg.ListenClientUrls {
chosts = append(chosts, u.Host)
}
for _, u := range cfg.ListenClientHttpUrls {
chosts = append(chosts, u.Host)
}
cfg.ClientTLSInfo, err = transport.SelfCert(cfg.logger, filepath.Join(cfg.Dir, "fixtures", "client"), chosts, cfg.SelfSignedCertValidity)
if err != nil {
Expand Down Expand Up @@ -1094,6 +1113,14 @@ func (cfg *Config) getListenClientUrls() (ss []string) {
return ss
}

func (cfg *Config) getListenClientHttpUrls() (ss []string) {
serathius marked this conversation as resolved.
Show resolved Hide resolved
ss = make([]string, len(cfg.ListenClientHttpUrls))
for i := range cfg.ListenClientHttpUrls {
ss[i] = cfg.ListenClientHttpUrls[i].String()
}
return ss
}

func (cfg *Config) getMetricsURLs() (ss []string) {
ss = make([]string, len(cfg.ListenMetricsUrls))
for i := range cfg.ListenMetricsUrls {
Expand Down
134 changes: 110 additions & 24 deletions server/embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"io"
defaultLog "log"
"math"
"net"
"net/http"
"net/url"
Expand All @@ -32,6 +33,7 @@ import (
"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/client/pkg/v3/transport"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/client/v3/credentials"
"go.etcd.io/etcd/pkg/v3/debugutil"
runtimeutil "go.etcd.io/etcd/pkg/v3/runtime"
"go.etcd.io/etcd/server/v3/config"
Expand All @@ -45,6 +47,7 @@ import (
"github.com/soheilhy/cmux"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
)

Expand Down Expand Up @@ -456,11 +459,16 @@ func (e *Etcd) Close() {

func stopServers(ctx context.Context, ss *servers) {
// first, close the http.Server
ss.http.Shutdown(ctx)
// do not grpc.Server.GracefulStop with TLS enabled etcd server
if ss.http != nil {
ss.http.Shutdown(ctx)
}
if ss.grpc == nil {
return
}
// do not grpc.Server.GracefulStop when grpc runs under http server
// See https://github.com/grpc/grpc-go/issues/1384#issuecomment-317124531
// and https://github.com/etcd-io/etcd/issues/8916
if ss.secure {
if ss.secure && ss.http != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why adding the ss.http != nil?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above comment states do not grpc.Server.GracefulStop when grpc runs under http server. With this PR, grpc server no longer runs under http server when in httpOnly mode. So if there is no http server, grpc server runs standalone mode.

ss.grpc.Stop()
return
}
Expand Down Expand Up @@ -618,8 +626,7 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
}

sctxs = make(map[string]*serveCtx)
for _, u := range cfg.ListenClientUrls {
sctx := newServeCtx(cfg.logger)
for _, u := range append(cfg.ListenClientUrls, cfg.ListenClientHttpUrls...) {
if u.Scheme == "http" || u.Scheme == "unix" {
if !cfg.ClientTLSInfo.Empty() {
cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("client-url", u.String()))
Expand All @@ -631,32 +638,48 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
if (u.Scheme == "https" || u.Scheme == "unixs") && cfg.ClientTLSInfo.Empty() {
return nil, fmt.Errorf("TLS key/cert (--cert-file, --key-file) must be provided for client url %s with HTTPS scheme", u.String())
}
}

network := "tcp"
addr := u.Host
if u.Scheme == "unix" || u.Scheme == "unixs" {
network = "unix"
addr = u.Host + u.Path
for _, u := range cfg.ListenClientUrls {
addr, secure, network := resolveUrl(u)
sctx := sctxs[addr]
if sctx == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect conflicts here ? Do we expect user to mention the same host multiple times ?

If that was the previous semantic, I would log a warning if we discover such case and override the config.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we expect that user can list same host twice with different scheme. One for http, second for https

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's strange but it is the status quo...

sctx = newServeCtx(cfg.logger)
sctxs[addr] = sctx
}
sctx.secure = sctx.secure || secure
sctx.insecure = sctx.insecure || !secure
sctx.scheme = u.Scheme
sctx.addr = addr
sctx.network = network
}
for _, u := range cfg.ListenClientHttpUrls {
addr, secure, network := resolveUrl(u)

sctx.secure = u.Scheme == "https" || u.Scheme == "unixs"
sctx.insecure = !sctx.secure
if oldctx := sctxs[addr]; oldctx != nil {
oldctx.secure = oldctx.secure || sctx.secure
oldctx.insecure = oldctx.insecure || sctx.insecure
continue
sctx := sctxs[addr]
if sctx == nil {
sctx = newServeCtx(cfg.logger)
sctxs[addr] = sctx
} else if !sctx.httpOnly {
return nil, fmt.Errorf("cannot bind both --client-listen-urls and --client-listen-http-urls on the same url %s", u.String())
}
sctx.secure = sctx.secure || secure
sctx.insecure = sctx.insecure || !secure
sctx.scheme = u.Scheme
sctx.addr = addr
sctx.network = network
sctx.httpOnly = true
}

if sctx.l, err = transport.NewListenerWithOpts(addr, u.Scheme,
for _, sctx := range sctxs {
if sctx.l, err = transport.NewListenerWithOpts(sctx.addr, sctx.scheme,
transport.WithSocketOpts(&cfg.SocketOpts),
transport.WithSkipTLSInfoCheck(true),
); err != nil {
return nil, err
}
// net.Listener will rewrite ipv4 0.0.0.0 to ipv6 [::], breaking
// hosts that disable ipv6. So, use the address given by the user.
Comment on lines 681 to 682
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the comment also be moved up to the appropriate location?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what this comment refers to. I didn't move it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was introduced by #8221 to fix the grpc gateway creating grpc connections using [::] caused by sctx.listener.addr() when ipv6 is disabled but user configures 0.0.0.0 as listen client urls.

I think we can leave it as it is right now and improve it later.

sctx.addr = addr

if fdLimit, fderr := runtimeutil.FDLimit(); fderr == nil {
if fdLimit <= reservedInternalFDNum {
Expand All @@ -669,17 +692,17 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
sctx.l = transport.LimitListener(sctx.l, int(fdLimit-reservedInternalFDNum))
}

defer func(u url.URL) {
if err == nil {
defer func(addr string) {
if err == nil || sctx.l == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should pass sctx in the defer function instead of address.

https://go.dev/play/p/ZzmQADcBkJl

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice find!, will send a separate PR with a fix.

return
}
sctx.l.Close()
cfg.logger.Warn(
"closing peer listener",
zap.String("address", u.Host),
zap.String("address", addr),
zap.Error(err),
)
}(u)
}(sctx.addr)
for k := range cfg.UserHandlers {
sctx.userHandlers[k] = cfg.UserHandlers[k]
}
Expand All @@ -690,11 +713,21 @@ func configureClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err erro
if cfg.LogLevel == "debug" {
sctx.registerTrace()
}
sctxs[addr] = sctx
}
return sctxs, nil
}

func resolveUrl(u url.URL) (addr string, secure bool, network string) {
addr = u.Host
network = "tcp"
if u.Scheme == "unix" || u.Scheme == "unixs" {
addr = u.Host + u.Path
network = "unix"
}
secure = u.Scheme == "https" || u.Scheme == "unixs"
return addr, secure, network
}

func (e *Etcd) serveClients() (err error) {
if !e.cfg.ClientTLSInfo.Empty() {
e.cfg.logger.Info(
Expand Down Expand Up @@ -726,15 +759,68 @@ func (e *Etcd) serveClients() (err error) {
}))
}

splitHttp := false
for _, sctx := range e.sctxs {
if sctx.httpOnly {
splitHttp = true
}
}

// start client servers in each goroutine
for _, sctx := range e.sctxs {
go func(s *serveCtx) {
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, gopts...))
e.errHandler(s.serve(e.Server, &e.cfg.ClientTLSInfo, mux, e.errHandler, e.grpcGatewayDial(splitHttp), splitHttp, gopts...))
}(sctx)
}
return nil
}

func (e *Etcd) grpcGatewayDial(splitHttp bool) (grpcDial func(ctx context.Context) (*grpc.ClientConn, error)) {
if !e.cfg.EnableGRPCGateway {
return nil
}
sctx := e.pickGrpcGatewayServeContext(splitHttp)
addr := sctx.addr
if network := sctx.network; network == "unix" {
// explicitly define unix network for gRPC socket support
addr = fmt.Sprintf("%s:%s", network, addr)
}
opts := []grpc.DialOption{grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32))}
if sctx.secure {
tlscfg, tlsErr := e.cfg.ClientTLSInfo.ServerConfig()
if tlsErr != nil {
return func(ctx context.Context) (*grpc.ClientConn, error) {
return nil, tlsErr
}
}
dtls := tlscfg.Clone()
// trust local server
dtls.InsecureSkipVerify = true
bundle := credentials.NewBundle(credentials.Config{TLSConfig: dtls})
opts = append(opts, grpc.WithTransportCredentials(bundle.TransportCredentials()))
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}

return func(ctx context.Context) (*grpc.ClientConn, error) {
conn, err := grpc.DialContext(ctx, addr, opts...)
if err != nil {
sctx.lg.Error("grpc gateway failed to dial", zap.String("addr", addr), zap.Error(err))
return nil, err
}
return conn, err
}
}

func (e *Etcd) pickGrpcGatewayServeContext(splitHttp bool) *serveCtx {
for _, sctx := range e.sctxs {
if !splitHttp || !sctx.httpOnly {
return sctx
}
}
panic("Expect at least one context able to serve grpc")
}

func (e *Etcd) serveMetrics() (err error) {
if e.cfg.Metrics == "extensive" {
grpc_prometheus.EnableHandlingTimeHistogram()
Expand Down
Loading