Skip to content

Commit

Permalink
grpc graceful shutdown (envoyproxy#64)
Browse files Browse the repository at this point in the history
Signed-off-by: Jyoti Mahapatra <jmahapatra@lyft.com>
  • Loading branch information
jyotimahapatra authored and jessicayuen committed Apr 24, 2020
1 parent 24d87a6 commit aefa63b
Show file tree
Hide file tree
Showing 33 changed files with 238 additions and 37 deletions.
8 changes: 0 additions & 8 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,9 @@ on:
push:
branches:
- master
paths:
- 'api/**'
- 'tools/configuration-validator/**'
- .github/workflows/integration-tests.yml
pull_request:
branches:
- master
paths:
- 'api/**'
- 'tools/configuration-validator/**'
- .github/workflows/integration-tests.yml
env:
PACKAGEPATH: ${{ github.workspace }}/go/src/github.com/${{ github.repository }}
jobs:
Expand Down
36 changes: 36 additions & 0 deletions integration/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// +build integration

package integration

import (
"os"
"os/exec"
"path"
"syscall"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

const (
binaryName = "xds-relay"
)

func TestServerShutdown(t *testing.T) {
dir, err := os.Getwd()
assert.Nil(t, err)
cmd := exec.Command(path.Join(dir, "bin", binaryName),
"-c", "./integration/testdata/bootstrap_configuration_complete_tech_spec.yaml",
"-a", "./integration/testdata/keyer_configuration_complete_tech_spec.yaml",
"-l", "debug",
"-m", "serve")
err = cmd.Start()
assert.Nil(t, err)
<-time.After(5 * time.Second)
assert.Equal(t, -1, cmd.ProcessState.ExitCode())
e := cmd.Process.Signal(syscall.SIGINT)
assert.Nil(t, e)
err = cmd.Wait()
assert.Nil(t, e)
}
44 changes: 37 additions & 7 deletions internal/app/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ package server
import (
"context"
"net"
"os"
"os/signal"
"strconv"
"syscall"
"time"

"github.com/envoyproxy/xds-relay/internal/app/mapper"
"github.com/envoyproxy/xds-relay/internal/app/orchestrator"
"github.com/envoyproxy/xds-relay/internal/app/upstream"
"github.com/envoyproxy/xds-relay/internal/pkg/log"
"github.com/envoyproxy/xds-relay/internal/pkg/util"

aggregationv1 "github.com/envoyproxy/xds-relay/pkg/api/aggregation/v1"
bootstrapv1 "github.com/envoyproxy/xds-relay/pkg/api/bootstrap/v1"
Expand All @@ -32,7 +36,6 @@ func Run(bootstrapConfig *bootstrapv1.Bootstrap,
logger = log.New(bootstrapConfig.Logging.Level.String())
}

// TODO cancel should be invoked by shutdown handlers.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -45,7 +48,7 @@ func Run(bootstrapConfig *bootstrapv1.Bootstrap,
ctx,
upstreamAddress,
upstream.CallOptions{Timeout: time.Minute},
logger.Named("xdsclient"),
logger,
)
if err != nil {
logger.With("error", err).Panic(ctx, "failed to initialize upstream client")
Expand All @@ -72,10 +75,37 @@ func Run(bootstrapConfig *bootstrapv1.Bootstrap,
api.RegisterRouteDiscoveryServiceServer(server, gcpServer)
api.RegisterListenerDiscoveryServiceServer(server, gcpServer)

if mode == "serve" {
logger.With("address", listener.Addr()).Info(ctx, "Initializing server")
if err := server.Serve(listener); err != nil {
logger.With("err", err).Fatal(ctx, "failed to initialize server")
}
if mode != "serve" {
return
}

registerShutdownHandler(ctx, cancel, server.GracefulStop, logger, time.Second*30)
logger.With("address", listener.Addr()).Info(ctx, "Initializing server")
if err := server.Serve(listener); err != nil {
logger.With("err", err).Fatal(ctx, "failed to initialize server")
}
}

func registerShutdownHandler(
ctx context.Context,
cancel context.CancelFunc,
gracefulStop func(),
logger log.Logger,
waitTime time.Duration) {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
logger.Info(ctx, "received interrupt signal:", sig.String())
err := util.DoWithTimeout(ctx, func() error {
logger.Info(ctx, "initiating grpc graceful stop")
gracefulStop()
_ = logger.Sync()
cancel()
return nil
}, waitTime)
if err != nil {
logger.Error(ctx, "shutdown error: ", err.Error())
}
}()
}
69 changes: 69 additions & 0 deletions internal/app/server/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package server

import (
"context"
"fmt"
"syscall"
"testing"
"time"

"github.com/envoyproxy/xds-relay/internal/pkg/log"
"github.com/stretchr/testify/assert"
)

func TestShutdown(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
blockedCh := make(chan bool, 1)
l := &logger{}
registerShutdownHandler(ctx, cancel, func() {
blockedCh <- true
}, l, time.Second*5)
_ = syscall.Kill(syscall.Getpid(), syscall.SIGINT)
<-blockedCh
}

func TestShutdownTimeout(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
l := &logger{blockedCh: make(chan bool, 1)}
registerShutdownHandler(ctx, cancel, func() {
<-time.After(time.Minute)
}, l, time.Millisecond)
_ = syscall.Kill(syscall.Getpid(), syscall.SIGINT)
<-l.blockedCh
assert.Equal(t, "shutdown error: context deadline exceeded", l.lastErr)
}

type logger struct {
blockedCh chan bool
lastErr string
}

func (l *logger) Named(name string) log.Logger {
return l
}

func (l *logger) With(args ...interface{}) log.Logger {
return l
}

func (l *logger) Sync() error { return nil }

func (l *logger) Debug(ctx context.Context, args ...interface{}) {
}

func (l *logger) Info(ctx context.Context, args ...interface{}) {
}

func (l *logger) Warn(ctx context.Context, args ...interface{}) {
}

func (l *logger) Error(ctx context.Context, args ...interface{}) {
l.lastErr = fmt.Sprint(args...)
l.blockedCh <- true
}

func (l *logger) Fatal(ctx context.Context, args ...interface{}) {
}

func (l *logger) Panic(ctx context.Context, args ...interface{}) {
}
25 changes: 5 additions & 20 deletions internal/app/upstream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/envoyproxy/xds-relay/internal/pkg/log"
"github.com/envoyproxy/xds-relay/internal/pkg/util"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -78,6 +79,7 @@ type version struct {
// The method does not block until the underlying connection is up.
// Returns immediately and connecting the server happens in background
func NewClient(ctx context.Context, url string, callOptions CallOptions, logger log.Logger) (Client, error) {
logger.Info(ctx, "Initiating upstream connection.")
// TODO: configure grpc options.https://github.com/envoyproxy/xds-relay/issues/55
conn, err := grpc.Dial(url, grpc.WithInsecure())
if err != nil {
Expand Down Expand Up @@ -163,7 +165,9 @@ func send(
}
request.ResponseNonce = sig.nonce
request.VersionInfo = sig.version
err := doWithTimeout(ctx, func() error {
// Ref: https://github.com/grpc/grpc-go/issues/1229#issuecomment-302755717
// Call SendMsg in a timeout because it can block in some cases.
err := util.DoWithTimeout(ctx, func() error {
return stream.SendMsg(request)
}, callOptions.Timeout)
if err != nil {
Expand Down Expand Up @@ -229,25 +233,6 @@ func shutDown(ctx context.Context, conn *grpc.ClientConn) {
conn.Close()
}

// DoWithTimeout runs f and returns its error. If the deadline d elapses first,
// it returns a DeadlineExceeded error instead.
// Ref: https://github.com/grpc/grpc-go/issues/1229#issuecomment-302755717
func doWithTimeout(ctx context.Context, f func() error, d time.Duration) error {
timeoutCtx, cancel := context.WithTimeout(ctx, d)
defer cancel()
errChan := make(chan error, 1)
go func() {
errChan <- f()
close(errChan)
}()
select {
case <-timeoutCtx.Done():
return timeoutCtx.Err()
case err := <-errChan:
return err
}
}

func (e *UnsupportedResourceError) Error() string {
return fmt.Sprintf("Unsupported resource typeUrl: %s", e.TypeURL)
}
24 changes: 24 additions & 0 deletions internal/pkg/util/execute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package util

import (
"context"
"time"
)

// DoWithTimeout runs f and returns its error.
// If the timeout elapses first, returns a ctx timeout error instead.
func DoWithTimeout(ctx context.Context, f func() error, t time.Duration) error {
timeoutCtx, cancel := context.WithTimeout(ctx, t)
defer cancel()
errChan := make(chan error, 1)
go func() {
errChan <- f()
close(errChan)
}()
select {
case <-timeoutCtx.Done():
return timeoutCtx.Err()
case err := <-errChan:
return err
}
}
65 changes: 65 additions & 0 deletions internal/pkg/util/execute_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package util_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/envoyproxy/xds-relay/internal/pkg/util"
"github.com/stretchr/testify/assert"
)

func TestExecuteWithinTimeout(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
methodExecuted := false
err := util.DoWithTimeout(ctx, func() error {
methodExecuted = true
return nil
}, time.Second)

assert.Nil(t, err)
assert.Equal(t, methodExecuted, true)
}

func TestExecuteWithinTimeoutReturnsError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
returnErr := fmt.Errorf("error")
err := util.DoWithTimeout(ctx, func() error {
return returnErr
}, time.Second)

assert.NotNil(t, err)
assert.Equal(t, err, returnErr)
}

func TestExecuteWithinExceedsTimeout(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := util.DoWithTimeout(ctx, func() error {
<-time.After(time.Millisecond)
return nil
}, time.Nanosecond)

assert.NotNil(t, err)
assert.Equal(t, err.Error(), "context deadline exceeded")
}

func TestExecuteCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
methodCalled := make(chan bool, 1)
go func() {
<-methodCalled
cancel()
}()
err := util.DoWithTimeout(ctx, func() error {
methodCalled <- true
<-time.After(time.Minute)
return nil
}, time.Minute)

assert.NotNil(t, err)
assert.Equal(t, err.Error(), "context canceled")
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"log"

"github.com/envoyproxy/xds-relay/internal/app/server"
yamlproto "github.com/envoyproxy/xds-relay/internal/pkg/util"
yamlproto "github.com/envoyproxy/xds-relay/internal/pkg/util/yamlproto"
aggregationv1 "github.com/envoyproxy/xds-relay/pkg/api/aggregation/v1"
bootstrapv1 "github.com/envoyproxy/xds-relay/pkg/api/bootstrap/v1"
"github.com/spf13/cobra"
Expand Down
2 changes: 1 addition & 1 deletion tools/configuration-validator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"log"
"os"

yamlproto "github.com/envoyproxy/xds-relay/internal/pkg/util"
yamlproto "github.com/envoyproxy/xds-relay/internal/pkg/util/yamlproto"
aggregationv1 "github.com/envoyproxy/xds-relay/pkg/api/aggregation/v1"

"github.com/spf13/cobra"
Expand Down

0 comments on commit aefa63b

Please sign in to comment.