Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc graceful shutdown #64

Merged
merged 17 commits into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,9 @@ on:
push:
branches:
- master
paths:
- 'api/**'
- 'tools/configuration-validator/**'
- .github/workflows/integration-tests.yml
pull_request:
branches:
- master
paths:
- 'api/**'
- 'tools/configuration-validator/**'
- .github/workflows/integration-tests.yml
env:
PACKAGEPATH: ${{ github.workspace }}/go/src/github.com/${{ github.repository }}
jobs:
Expand Down
36 changes: 36 additions & 0 deletions integration/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// +build integration

package integration

import (
"os"
"os/exec"
"path"
"syscall"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

const (
binaryName = "xds-relay"
)

func TestServerShutdown(t *testing.T) {
dir, err := os.Getwd()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this guaranteed to be here in CI jobs? Should there be an environment variable but default to this if it is not set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah..the CI jobs are set up to correctly set the folder https://github.com/envoyproxy/xds-relay/blob/master/.github/workflows/integration-tests.yml#L40
They run locally too without any changes.

assert.Nil(t, err)
cmd := exec.Command(path.Join(dir, "bin", binaryName),
"-c", "./integration/testdata/bootstrap_configuration_complete_tech_spec.yaml",
"-a", "./integration/testdata/keyer_configuration_complete_tech_spec.yaml",
"-l", "debug",
"-m", "serve")
err = cmd.Start()
assert.Nil(t, err)
<-time.After(5 * time.Second)
assert.Equal(t, -1, cmd.ProcessState.ExitCode())
e := cmd.Process.Signal(syscall.SIGINT)
assert.Nil(t, e)
err = cmd.Wait()
assert.Nil(t, e)
}
44 changes: 37 additions & 7 deletions internal/app/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ package server
import (
"context"
"net"
"os"
"os/signal"
"strconv"
"syscall"
"time"

"github.com/envoyproxy/xds-relay/internal/app/mapper"
"github.com/envoyproxy/xds-relay/internal/app/orchestrator"
"github.com/envoyproxy/xds-relay/internal/app/upstream"
"github.com/envoyproxy/xds-relay/internal/pkg/log"
"github.com/envoyproxy/xds-relay/internal/pkg/util"

aggregationv1 "github.com/envoyproxy/xds-relay/pkg/api/aggregation/v1"
bootstrapv1 "github.com/envoyproxy/xds-relay/pkg/api/bootstrap/v1"
Expand All @@ -32,7 +36,6 @@ func Run(bootstrapConfig *bootstrapv1.Bootstrap,
logger = log.New(bootstrapConfig.Logging.Level.String())
}

// TODO cancel should be invoked by shutdown handlers.
ctx, cancel := context.WithCancel(context.Background())
jyotimahapatra marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()

Expand All @@ -45,7 +48,7 @@ func Run(bootstrapConfig *bootstrapv1.Bootstrap,
ctx,
upstreamAddress,
upstream.CallOptions{Timeout: time.Minute},
logger.Named("xdsclient"),
logger,
)
if err != nil {
logger.With("error", err).Panic(ctx, "failed to initialize upstream client")
Expand All @@ -72,10 +75,37 @@ func Run(bootstrapConfig *bootstrapv1.Bootstrap,
api.RegisterRouteDiscoveryServiceServer(server, gcpServer)
api.RegisterListenerDiscoveryServiceServer(server, gcpServer)

if mode == "serve" {
logger.With("address", listener.Addr()).Info(ctx, "Initializing server")
if err := server.Serve(listener); err != nil {
logger.With("err", err).Fatal(ctx, "failed to initialize server")
}
if mode != "serve" {
return
}

registerShutdownHandler(ctx, cancel, server.GracefulStop, logger, time.Second*30)
logger.With("address", listener.Addr()).Info(ctx, "Initializing server")
if err := server.Serve(listener); err != nil {
logger.With("err", err).Fatal(ctx, "failed to initialize server")
}
}

func registerShutdownHandler(
ctx context.Context,
cancel context.CancelFunc,
gracefulStop func(),
logger log.Logger,
waitTime time.Duration) {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
logger.Info(ctx, "received interrupt signal:", sig.String())
err := util.DoWithTimeout(ctx, func() error {
lita marked this conversation as resolved.
Show resolved Hide resolved
logger.Info(ctx, "initiating grpc graceful stop")
jyotimahapatra marked this conversation as resolved.
Show resolved Hide resolved
gracefulStop()
_ = logger.Sync()
cancel()
return nil
}, waitTime)
if err != nil {
logger.Error(ctx, "shutdown error: ", err.Error())
}
}()
}
69 changes: 69 additions & 0 deletions internal/app/server/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package server

import (
"context"
"fmt"
"syscall"
"testing"
"time"

"github.com/envoyproxy/xds-relay/internal/pkg/log"
"github.com/stretchr/testify/assert"
)

func TestShutdown(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
blockedCh := make(chan bool, 1)
l := &logger{}
registerShutdownHandler(ctx, cancel, func() {
blockedCh <- true
}, l, time.Second*5)
_ = syscall.Kill(syscall.Getpid(), syscall.SIGINT)
<-blockedCh
}

func TestShutdownTimeout(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
l := &logger{blockedCh: make(chan bool, 1)}
registerShutdownHandler(ctx, cancel, func() {
<-time.After(time.Minute)
}, l, time.Millisecond)
_ = syscall.Kill(syscall.Getpid(), syscall.SIGINT)
<-l.blockedCh
assert.Equal(t, "shutdown error: context deadline exceeded", l.lastErr)
}

type logger struct {
blockedCh chan bool
lastErr string
}

func (l *logger) Named(name string) log.Logger {
return l
}

func (l *logger) With(args ...interface{}) log.Logger {
return l
}

func (l *logger) Sync() error { return nil }

func (l *logger) Debug(ctx context.Context, args ...interface{}) {
}

func (l *logger) Info(ctx context.Context, args ...interface{}) {
}

func (l *logger) Warn(ctx context.Context, args ...interface{}) {
}

func (l *logger) Error(ctx context.Context, args ...interface{}) {
l.lastErr = fmt.Sprint(args...)
l.blockedCh <- true
}

func (l *logger) Fatal(ctx context.Context, args ...interface{}) {
}

func (l *logger) Panic(ctx context.Context, args ...interface{}) {
}
25 changes: 5 additions & 20 deletions internal/app/upstream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/envoyproxy/xds-relay/internal/pkg/log"
"github.com/envoyproxy/xds-relay/internal/pkg/util"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -78,6 +79,7 @@ type version struct {
// The method does not block until the underlying connection is up.
// Returns immediately and connecting the server happens in background
func NewClient(ctx context.Context, url string, callOptions CallOptions, logger log.Logger) (Client, error) {
logger.Info(ctx, "Initiating upstream connection.")
// TODO: configure grpc options.https://github.com/envoyproxy/xds-relay/issues/55
conn, err := grpc.Dial(url, grpc.WithInsecure())
if err != nil {
Expand Down Expand Up @@ -163,7 +165,9 @@ func send(
}
request.ResponseNonce = sig.nonce
request.VersionInfo = sig.version
err := doWithTimeout(ctx, func() error {
// Ref: https://github.com/grpc/grpc-go/issues/1229#issuecomment-302755717
// Call SendMsg in a timeout because it can block in some cases.
err := util.DoWithTimeout(ctx, func() error {
return stream.SendMsg(request)
}, callOptions.Timeout)
if err != nil {
Expand Down Expand Up @@ -229,25 +233,6 @@ func shutDown(ctx context.Context, conn *grpc.ClientConn) {
conn.Close()
}

// DoWithTimeout runs f and returns its error. If the deadline d elapses first,
// it returns a DeadlineExceeded error instead.
// Ref: https://github.com/grpc/grpc-go/issues/1229#issuecomment-302755717
func doWithTimeout(ctx context.Context, f func() error, d time.Duration) error {
timeoutCtx, cancel := context.WithTimeout(ctx, d)
defer cancel()
errChan := make(chan error, 1)
go func() {
errChan <- f()
close(errChan)
}()
select {
case <-timeoutCtx.Done():
return timeoutCtx.Err()
case err := <-errChan:
return err
}
}

func (e *UnsupportedResourceError) Error() string {
return fmt.Sprintf("Unsupported resource typeUrl: %s", e.TypeURL)
}
24 changes: 24 additions & 0 deletions internal/pkg/util/execute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package util

import (
"context"
"time"
)

// DoWithTimeout runs f and returns its error.
// If the timeout elapses first, returns a ctx timeout error instead.
func DoWithTimeout(ctx context.Context, f func() error, t time.Duration) error {
timeoutCtx, cancel := context.WithTimeout(ctx, t)
defer cancel()
errChan := make(chan error, 1)
go func() {
errChan <- f()
close(errChan)
}()
select {
case <-timeoutCtx.Done():
return timeoutCtx.Err()
case err := <-errChan:
return err
}
}
65 changes: 65 additions & 0 deletions internal/pkg/util/execute_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package util_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/envoyproxy/xds-relay/internal/pkg/util"
"github.com/stretchr/testify/assert"
)

func TestExecuteWithinTimeout(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
methodExecuted := false
err := util.DoWithTimeout(ctx, func() error {
methodExecuted = true
return nil
}, time.Second)

assert.Nil(t, err)
assert.Equal(t, methodExecuted, true)
}

func TestExecuteWithinTimeoutReturnsError(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
returnErr := fmt.Errorf("error")
err := util.DoWithTimeout(ctx, func() error {
return returnErr
}, time.Second)

assert.NotNil(t, err)
assert.Equal(t, err, returnErr)
}

func TestExecuteWithinExceedsTimeout(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err := util.DoWithTimeout(ctx, func() error {
<-time.After(time.Millisecond)
return nil
}, time.Nanosecond)

assert.NotNil(t, err)
assert.Equal(t, err.Error(), "context deadline exceeded")
}

func TestExecuteCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
methodCalled := make(chan bool, 1)
go func() {
<-methodCalled
cancel()
}()
err := util.DoWithTimeout(ctx, func() error {
methodCalled <- true
<-time.After(time.Minute)
return nil
}, time.Minute)

assert.NotNil(t, err)
assert.Equal(t, err.Error(), "context canceled")
}
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"log"

"github.com/envoyproxy/xds-relay/internal/app/server"
yamlproto "github.com/envoyproxy/xds-relay/internal/pkg/util"
yamlproto "github.com/envoyproxy/xds-relay/internal/pkg/util/yamlproto"
aggregationv1 "github.com/envoyproxy/xds-relay/pkg/api/aggregation/v1"
bootstrapv1 "github.com/envoyproxy/xds-relay/pkg/api/bootstrap/v1"
"github.com/spf13/cobra"
Expand Down
2 changes: 1 addition & 1 deletion tools/configuration-validator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"log"
"os"

yamlproto "github.com/envoyproxy/xds-relay/internal/pkg/util"
yamlproto "github.com/envoyproxy/xds-relay/internal/pkg/util/yamlproto"
aggregationv1 "github.com/envoyproxy/xds-relay/pkg/api/aggregation/v1"

"github.com/spf13/cobra"
Expand Down