From 4363d123654fd552b0dfb7ab46b39c2b330a5c9d Mon Sep 17 00:00:00 2001 From: Jyoti Mahapatra Date: Wed, 22 Apr 2020 14:58:35 -0700 Subject: [PATCH 01/17] graceful Signed-off-by: Jyoti Mahapatra --- internal/app/server/server.go | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/internal/app/server/server.go b/internal/app/server/server.go index 6b94c405..4071b59f 100644 --- a/internal/app/server/server.go +++ b/internal/app/server/server.go @@ -3,7 +3,10 @@ package server import ( "context" "net" + "os" + "os/signal" "strconv" + "syscall" "time" "github.com/envoyproxy/xds-relay/internal/app/mapper" @@ -72,10 +75,22 @@ func Run(bootstrapConfig *bootstrapv1.Bootstrap, api.RegisterRouteDiscoveryServiceServer(server, gcpServer) api.RegisterListenerDiscoveryServiceServer(server, gcpServer) - if mode == "serve" { - logger.With("address", listener.Addr()).Info(ctx, "Initializing server") - if err := server.Serve(listener); err != nil { - logger.With("err", err).Fatal(ctx, "failed to initialize server") - } + if mode != "serve" { + return } + + registerShutdownHandler(server) + logger.With("address", listener.Addr()).Info(ctx, "Initializing server") + if err := server.Serve(listener); err != nil { + logger.With("err", err).Fatal(ctx, "failed to initialize server") + } +} + +func registerShutdownHandler(server *grpc.Server) { + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigs + server.GracefulStop() + }() } From ccb3041069a1af4a8c0829225abb47d6bef1b89b Mon Sep 17 00:00:00 2001 From: Jyoti Mahapatra Date: Wed, 22 Apr 2020 15:04:20 -0700 Subject: [PATCH 02/17] mv yamlproto Signed-off-by: Jyoti Mahapatra --- internal/pkg/util/{ => yamlproto}/testdata/and_result.yaml | 0 internal/pkg/util/{ => yamlproto}/testdata/empty.yaml | 0 .../pkg/util/{ => yamlproto}/testdata/error_in_second_rule.yaml | 0 .../pkg/util/{ => yamlproto}/testdata/inexistent_field.yaml | 0 internal/pkg/util/{ => yamlproto}/testdata/invalid.yaml | 0 .../testdata/keyer_configuration_empty_request_type_match.yaml | 0 ...er_configuration_empty_request_type_math_in_second_rule.yaml | 0 .../keyer_configuration_match_predicate_invalid_bool.yaml | 0 .../testdata/keyer_configuration_missing_match_predicate.yaml | 0 .../testdata/keyer_configuration_missing_result_predicate.yaml | 0 .../keyer_configuration_not_match_empty_request_type_match.yaml | 0 .../keyer_configuration_not_match_request_type_match.yaml | 0 ...keyer_configuration_or_match_invalid_request_type_match.yaml | 0 .../keyer_configuration_or_match_request_type_match.yaml | 0 .../keyer_configuration_request_node_match_invalid_enum.yaml | 0 .../keyer_configuration_request_node_match_string_fragment.yaml | 0 .../keyer_configuration_request_type_match_string_fragment.yaml | 0 .../util/{ => yamlproto}/testdata/request_node_fragment.yaml | 0 .../util/{ => yamlproto}/testdata/resource_names_fragment.yaml | 0 internal/pkg/util/{ => yamlproto}/testdata/string_fragment.yaml | 0 .../pkg/util/{ => yamlproto}/testdata/typo_in_fragments.yaml | 0 internal/pkg/util/{ => yamlproto}/yamlproto.go | 0 internal/pkg/util/{ => yamlproto}/yamlproto_suite_test.go | 0 internal/pkg/util/{ => yamlproto}/yamlproto_test.go | 0 main.go | 2 +- tools/configuration-validator/main.go | 2 +- 26 files changed, 2 insertions(+), 2 deletions(-) rename internal/pkg/util/{ => yamlproto}/testdata/and_result.yaml (100%) rename internal/pkg/util/{ => yamlproto}/testdata/empty.yaml (100%) rename internal/pkg/util/{ => yamlproto}/testdata/error_in_second_rule.yaml (100%) rename internal/pkg/util/{ => yamlproto}/testdata/inexistent_field.yaml (100%) rename internal/pkg/util/{ => yamlproto}/testdata/invalid.yaml (100%) rename internal/pkg/util/{ => yamlproto}/testdata/keyer_configuration_empty_request_type_match.yaml (100%) rename internal/pkg/util/{ => yamlproto}/testdata/keyer_configuration_empty_request_type_math_in_second_rule.yaml (100%) rename internal/pkg/util/{ => yamlproto}/testdata/keyer_configuration_match_predicate_invalid_bool.yaml (100%) rename internal/pkg/util/{ => yamlproto}/testdata/keyer_configuration_missing_match_predicate.yaml (100%) rename internal/pkg/util/{ => yamlproto}/testdata/keyer_configuration_missing_result_predicate.yaml (100%) rename internal/pkg/util/{ => yamlproto}/testdata/keyer_configuration_not_match_empty_request_type_match.yaml (100%) rename internal/pkg/util/{ => yamlproto}/testdata/keyer_configuration_not_match_request_type_match.yaml (100%) rename internal/pkg/util/{ => yamlproto}/testdata/keyer_configuration_or_match_invalid_request_type_match.yaml (100%) rename internal/pkg/util/{ => yamlproto}/testdata/keyer_configuration_or_match_request_type_match.yaml (100%) rename internal/pkg/util/{ => yamlproto}/testdata/keyer_configuration_request_node_match_invalid_enum.yaml (100%) rename internal/pkg/util/{ => yamlproto}/testdata/keyer_configuration_request_node_match_string_fragment.yaml (100%) rename internal/pkg/util/{ => yamlproto}/testdata/keyer_configuration_request_type_match_string_fragment.yaml (100%) rename internal/pkg/util/{ => yamlproto}/testdata/request_node_fragment.yaml (100%) rename internal/pkg/util/{ => yamlproto}/testdata/resource_names_fragment.yaml (100%) rename internal/pkg/util/{ => yamlproto}/testdata/string_fragment.yaml (100%) rename internal/pkg/util/{ => yamlproto}/testdata/typo_in_fragments.yaml (100%) rename internal/pkg/util/{ => yamlproto}/yamlproto.go (100%) rename internal/pkg/util/{ => yamlproto}/yamlproto_suite_test.go (100%) rename internal/pkg/util/{ => yamlproto}/yamlproto_test.go (100%) diff --git a/internal/pkg/util/testdata/and_result.yaml b/internal/pkg/util/yamlproto/testdata/and_result.yaml similarity index 100% rename from internal/pkg/util/testdata/and_result.yaml rename to internal/pkg/util/yamlproto/testdata/and_result.yaml diff --git a/internal/pkg/util/testdata/empty.yaml b/internal/pkg/util/yamlproto/testdata/empty.yaml similarity index 100% rename from internal/pkg/util/testdata/empty.yaml rename to internal/pkg/util/yamlproto/testdata/empty.yaml diff --git a/internal/pkg/util/testdata/error_in_second_rule.yaml b/internal/pkg/util/yamlproto/testdata/error_in_second_rule.yaml similarity index 100% rename from internal/pkg/util/testdata/error_in_second_rule.yaml rename to internal/pkg/util/yamlproto/testdata/error_in_second_rule.yaml diff --git a/internal/pkg/util/testdata/inexistent_field.yaml b/internal/pkg/util/yamlproto/testdata/inexistent_field.yaml similarity index 100% rename from internal/pkg/util/testdata/inexistent_field.yaml rename to internal/pkg/util/yamlproto/testdata/inexistent_field.yaml diff --git a/internal/pkg/util/testdata/invalid.yaml b/internal/pkg/util/yamlproto/testdata/invalid.yaml similarity index 100% rename from internal/pkg/util/testdata/invalid.yaml rename to internal/pkg/util/yamlproto/testdata/invalid.yaml diff --git a/internal/pkg/util/testdata/keyer_configuration_empty_request_type_match.yaml b/internal/pkg/util/yamlproto/testdata/keyer_configuration_empty_request_type_match.yaml similarity index 100% rename from internal/pkg/util/testdata/keyer_configuration_empty_request_type_match.yaml rename to internal/pkg/util/yamlproto/testdata/keyer_configuration_empty_request_type_match.yaml diff --git a/internal/pkg/util/testdata/keyer_configuration_empty_request_type_math_in_second_rule.yaml b/internal/pkg/util/yamlproto/testdata/keyer_configuration_empty_request_type_math_in_second_rule.yaml similarity index 100% rename from internal/pkg/util/testdata/keyer_configuration_empty_request_type_math_in_second_rule.yaml rename to internal/pkg/util/yamlproto/testdata/keyer_configuration_empty_request_type_math_in_second_rule.yaml diff --git a/internal/pkg/util/testdata/keyer_configuration_match_predicate_invalid_bool.yaml b/internal/pkg/util/yamlproto/testdata/keyer_configuration_match_predicate_invalid_bool.yaml similarity index 100% rename from internal/pkg/util/testdata/keyer_configuration_match_predicate_invalid_bool.yaml rename to internal/pkg/util/yamlproto/testdata/keyer_configuration_match_predicate_invalid_bool.yaml diff --git a/internal/pkg/util/testdata/keyer_configuration_missing_match_predicate.yaml b/internal/pkg/util/yamlproto/testdata/keyer_configuration_missing_match_predicate.yaml similarity index 100% rename from internal/pkg/util/testdata/keyer_configuration_missing_match_predicate.yaml rename to internal/pkg/util/yamlproto/testdata/keyer_configuration_missing_match_predicate.yaml diff --git a/internal/pkg/util/testdata/keyer_configuration_missing_result_predicate.yaml b/internal/pkg/util/yamlproto/testdata/keyer_configuration_missing_result_predicate.yaml similarity index 100% rename from internal/pkg/util/testdata/keyer_configuration_missing_result_predicate.yaml rename to internal/pkg/util/yamlproto/testdata/keyer_configuration_missing_result_predicate.yaml diff --git a/internal/pkg/util/testdata/keyer_configuration_not_match_empty_request_type_match.yaml b/internal/pkg/util/yamlproto/testdata/keyer_configuration_not_match_empty_request_type_match.yaml similarity index 100% rename from internal/pkg/util/testdata/keyer_configuration_not_match_empty_request_type_match.yaml rename to internal/pkg/util/yamlproto/testdata/keyer_configuration_not_match_empty_request_type_match.yaml diff --git a/internal/pkg/util/testdata/keyer_configuration_not_match_request_type_match.yaml b/internal/pkg/util/yamlproto/testdata/keyer_configuration_not_match_request_type_match.yaml similarity index 100% rename from internal/pkg/util/testdata/keyer_configuration_not_match_request_type_match.yaml rename to internal/pkg/util/yamlproto/testdata/keyer_configuration_not_match_request_type_match.yaml diff --git a/internal/pkg/util/testdata/keyer_configuration_or_match_invalid_request_type_match.yaml b/internal/pkg/util/yamlproto/testdata/keyer_configuration_or_match_invalid_request_type_match.yaml similarity index 100% rename from internal/pkg/util/testdata/keyer_configuration_or_match_invalid_request_type_match.yaml rename to internal/pkg/util/yamlproto/testdata/keyer_configuration_or_match_invalid_request_type_match.yaml diff --git a/internal/pkg/util/testdata/keyer_configuration_or_match_request_type_match.yaml b/internal/pkg/util/yamlproto/testdata/keyer_configuration_or_match_request_type_match.yaml similarity index 100% rename from internal/pkg/util/testdata/keyer_configuration_or_match_request_type_match.yaml rename to internal/pkg/util/yamlproto/testdata/keyer_configuration_or_match_request_type_match.yaml diff --git a/internal/pkg/util/testdata/keyer_configuration_request_node_match_invalid_enum.yaml b/internal/pkg/util/yamlproto/testdata/keyer_configuration_request_node_match_invalid_enum.yaml similarity index 100% rename from internal/pkg/util/testdata/keyer_configuration_request_node_match_invalid_enum.yaml rename to internal/pkg/util/yamlproto/testdata/keyer_configuration_request_node_match_invalid_enum.yaml diff --git a/internal/pkg/util/testdata/keyer_configuration_request_node_match_string_fragment.yaml b/internal/pkg/util/yamlproto/testdata/keyer_configuration_request_node_match_string_fragment.yaml similarity index 100% rename from internal/pkg/util/testdata/keyer_configuration_request_node_match_string_fragment.yaml rename to internal/pkg/util/yamlproto/testdata/keyer_configuration_request_node_match_string_fragment.yaml diff --git a/internal/pkg/util/testdata/keyer_configuration_request_type_match_string_fragment.yaml b/internal/pkg/util/yamlproto/testdata/keyer_configuration_request_type_match_string_fragment.yaml similarity index 100% rename from internal/pkg/util/testdata/keyer_configuration_request_type_match_string_fragment.yaml rename to internal/pkg/util/yamlproto/testdata/keyer_configuration_request_type_match_string_fragment.yaml diff --git a/internal/pkg/util/testdata/request_node_fragment.yaml b/internal/pkg/util/yamlproto/testdata/request_node_fragment.yaml similarity index 100% rename from internal/pkg/util/testdata/request_node_fragment.yaml rename to internal/pkg/util/yamlproto/testdata/request_node_fragment.yaml diff --git a/internal/pkg/util/testdata/resource_names_fragment.yaml b/internal/pkg/util/yamlproto/testdata/resource_names_fragment.yaml similarity index 100% rename from internal/pkg/util/testdata/resource_names_fragment.yaml rename to internal/pkg/util/yamlproto/testdata/resource_names_fragment.yaml diff --git a/internal/pkg/util/testdata/string_fragment.yaml b/internal/pkg/util/yamlproto/testdata/string_fragment.yaml similarity index 100% rename from internal/pkg/util/testdata/string_fragment.yaml rename to internal/pkg/util/yamlproto/testdata/string_fragment.yaml diff --git a/internal/pkg/util/testdata/typo_in_fragments.yaml b/internal/pkg/util/yamlproto/testdata/typo_in_fragments.yaml similarity index 100% rename from internal/pkg/util/testdata/typo_in_fragments.yaml rename to internal/pkg/util/yamlproto/testdata/typo_in_fragments.yaml diff --git a/internal/pkg/util/yamlproto.go b/internal/pkg/util/yamlproto/yamlproto.go similarity index 100% rename from internal/pkg/util/yamlproto.go rename to internal/pkg/util/yamlproto/yamlproto.go diff --git a/internal/pkg/util/yamlproto_suite_test.go b/internal/pkg/util/yamlproto/yamlproto_suite_test.go similarity index 100% rename from internal/pkg/util/yamlproto_suite_test.go rename to internal/pkg/util/yamlproto/yamlproto_suite_test.go diff --git a/internal/pkg/util/yamlproto_test.go b/internal/pkg/util/yamlproto/yamlproto_test.go similarity index 100% rename from internal/pkg/util/yamlproto_test.go rename to internal/pkg/util/yamlproto/yamlproto_test.go diff --git a/main.go b/main.go index 4b8facf3..e317929a 100644 --- a/main.go +++ b/main.go @@ -5,7 +5,7 @@ import ( "log" "github.com/envoyproxy/xds-relay/internal/app/server" - yamlproto "github.com/envoyproxy/xds-relay/internal/pkg/util" + yamlproto "github.com/envoyproxy/xds-relay/internal/pkg/util/yamlproto" aggregationv1 "github.com/envoyproxy/xds-relay/pkg/api/aggregation/v1" bootstrapv1 "github.com/envoyproxy/xds-relay/pkg/api/bootstrap/v1" "github.com/spf13/cobra" diff --git a/tools/configuration-validator/main.go b/tools/configuration-validator/main.go index 8acc09e4..4dd4025a 100644 --- a/tools/configuration-validator/main.go +++ b/tools/configuration-validator/main.go @@ -5,7 +5,7 @@ import ( "log" "os" - yamlproto "github.com/envoyproxy/xds-relay/internal/pkg/util" + yamlproto "github.com/envoyproxy/xds-relay/internal/pkg/util/yamlproto" aggregationv1 "github.com/envoyproxy/xds-relay/pkg/api/aggregation/v1" "github.com/spf13/cobra" From 3f67684c8c610ec7d1523df92d807986354e1aa2 Mon Sep 17 00:00:00 2001 From: Jyoti Mahapatra Date: Wed, 22 Apr 2020 15:06:20 -0700 Subject: [PATCH 03/17] remove int test paths Signed-off-by: Jyoti Mahapatra --- .github/workflows/integration-tests.yml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index d01550aa..5efdc101 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -3,17 +3,9 @@ on: push: branches: - master - paths: - - 'api/**' - - 'tools/configuration-validator/**' - - .github/workflows/integration-tests.yml pull_request: branches: - master - paths: - - 'api/**' - - 'tools/configuration-validator/**' - - .github/workflows/integration-tests.yml env: PACKAGEPATH: ${{ github.workspace }}/go/src/github.com/${{ github.repository }} jobs: From 22b7899ca320db56b098c6729dae17efc086e04a Mon Sep 17 00:00:00 2001 From: Jyoti Mahapatra Date: Wed, 22 Apr 2020 15:35:09 -0700 Subject: [PATCH 04/17] add utility Signed-off-by: Jyoti Mahapatra --- internal/app/upstream/client.go | 24 ++---------- internal/pkg/util/execute.go | 24 ++++++++++++ internal/pkg/util/execute_test.go | 65 +++++++++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 20 deletions(-) create mode 100644 internal/pkg/util/execute.go create mode 100644 internal/pkg/util/execute_test.go diff --git a/internal/app/upstream/client.go b/internal/app/upstream/client.go index 2a9a1d6c..3ef6acf6 100644 --- a/internal/app/upstream/client.go +++ b/internal/app/upstream/client.go @@ -7,6 +7,7 @@ import ( v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" "github.com/envoyproxy/xds-relay/internal/pkg/log" + "github.com/envoyproxy/xds-relay/internal/pkg/util" "google.golang.org/grpc" ) @@ -163,7 +164,9 @@ func send( } request.ResponseNonce = sig.nonce request.VersionInfo = sig.version - err := doWithTimeout(ctx, func() error { + // Ref: https://github.com/grpc/grpc-go/issues/1229#issuecomment-302755717 + // Call SendMsg in a timeout because it can block in some cases. + err := util.DoWithTimeout(ctx, func() error { return stream.SendMsg(request) }, callOptions.Timeout) if err != nil { @@ -229,25 +232,6 @@ func shutDown(ctx context.Context, conn *grpc.ClientConn) { conn.Close() } -// DoWithTimeout runs f and returns its error. If the deadline d elapses first, -// it returns a DeadlineExceeded error instead. -// Ref: https://github.com/grpc/grpc-go/issues/1229#issuecomment-302755717 -func doWithTimeout(ctx context.Context, f func() error, d time.Duration) error { - timeoutCtx, cancel := context.WithTimeout(ctx, d) - defer cancel() - errChan := make(chan error, 1) - go func() { - errChan <- f() - close(errChan) - }() - select { - case <-timeoutCtx.Done(): - return timeoutCtx.Err() - case err := <-errChan: - return err - } -} - func (e *UnsupportedResourceError) Error() string { return fmt.Sprintf("Unsupported resource typeUrl: %s", e.TypeURL) } diff --git a/internal/pkg/util/execute.go b/internal/pkg/util/execute.go new file mode 100644 index 00000000..da80a70d --- /dev/null +++ b/internal/pkg/util/execute.go @@ -0,0 +1,24 @@ +package util + +import ( + "context" + "time" +) + +// DoWithTimeout runs f and returns its error. +// If the timeout elapses first, returns a ctx timeout error instead. +func DoWithTimeout(ctx context.Context, f func() error, t time.Duration) error { + timeoutCtx, cancel := context.WithTimeout(ctx, t) + defer cancel() + errChan := make(chan error, 1) + go func() { + errChan <- f() + close(errChan) + }() + select { + case <-timeoutCtx.Done(): + return timeoutCtx.Err() + case err := <-errChan: + return err + } +} diff --git a/internal/pkg/util/execute_test.go b/internal/pkg/util/execute_test.go new file mode 100644 index 00000000..2aacaf14 --- /dev/null +++ b/internal/pkg/util/execute_test.go @@ -0,0 +1,65 @@ +package util_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/envoyproxy/xds-relay/internal/pkg/util" + "github.com/stretchr/testify/assert" +) + +func TestExecuteWithinTimeout(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + methodExecuted := false + err := util.DoWithTimeout(ctx, func() error { + methodExecuted = true + return nil + }, time.Second) + + assert.Nil(t, err) + assert.Equal(t, methodExecuted, true) +} + +func TestExecuteWithinTimeoutReturnsError(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + returnErr := fmt.Errorf("error") + err := util.DoWithTimeout(ctx, func() error { + return returnErr + }, time.Second) + + assert.NotNil(t, err) + assert.Equal(t, err, returnErr) +} + +func TestExecuteWithinExceedsTimeout(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err := util.DoWithTimeout(ctx, func() error { + <-time.After(time.Millisecond) + return nil + }, time.Nanosecond) + + assert.NotNil(t, err) + assert.Equal(t, err.Error(), "context deadline exceeded") +} + +func TestExecuteCancel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + methodCalled := make(chan bool, 1) + go func() { + <-methodCalled + cancel() + }() + err := util.DoWithTimeout(ctx, func() error { + methodCalled <- true + <-time.After(time.Minute) + return nil + }, time.Minute) + + assert.NotNil(t, err) + assert.Equal(t, err.Error(), "context canceled") +} From a20a92f90ff90f7b8842a8e4cdf07ffbb7072b5e Mon Sep 17 00:00:00 2001 From: Jyoti Mahapatra Date: Wed, 22 Apr 2020 15:40:21 -0700 Subject: [PATCH 05/17] add timeout Signed-off-by: Jyoti Mahapatra --- internal/app/server/server.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/app/server/server.go b/internal/app/server/server.go index 4071b59f..fd6d26ad 100644 --- a/internal/app/server/server.go +++ b/internal/app/server/server.go @@ -13,6 +13,7 @@ import ( "github.com/envoyproxy/xds-relay/internal/app/orchestrator" "github.com/envoyproxy/xds-relay/internal/app/upstream" "github.com/envoyproxy/xds-relay/internal/pkg/log" + "github.com/envoyproxy/xds-relay/internal/pkg/util" aggregationv1 "github.com/envoyproxy/xds-relay/pkg/api/aggregation/v1" bootstrapv1 "github.com/envoyproxy/xds-relay/pkg/api/bootstrap/v1" @@ -91,6 +92,9 @@ func registerShutdownHandler(server *grpc.Server) { signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigs - server.GracefulStop() + util.DoWithTimeout(context.Background(), func() error { + server.GracefulStop() + return nil + }, time.Second*30) }() } From fc65aec430a00746b52ffedfcda2c50b59521f50 Mon Sep 17 00:00:00 2001 From: Jyoti Mahapatra Date: Wed, 22 Apr 2020 15:48:44 -0700 Subject: [PATCH 06/17] lint Signed-off-by: Jyoti Mahapatra --- internal/app/server/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/app/server/server.go b/internal/app/server/server.go index fd6d26ad..7178f2cd 100644 --- a/internal/app/server/server.go +++ b/internal/app/server/server.go @@ -92,7 +92,7 @@ func registerShutdownHandler(server *grpc.Server) { signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigs - util.DoWithTimeout(context.Background(), func() error { + _ = util.DoWithTimeout(context.Background(), func() error { server.GracefulStop() return nil }, time.Second*30) From e4fa237f8f6e96c8d5abbbc948fa092693e08af3 Mon Sep 17 00:00:00 2001 From: Jyoti Mahapatra Date: Wed, 22 Apr 2020 23:02:31 -0700 Subject: [PATCH 07/17] int test Signed-off-by: Jyoti Mahapatra --- integration/server_test.go | 36 +++++++++++++++++++++++++ internal/app/server/server.go | 48 ++++++++++++++++++--------------- internal/app/upstream/client.go | 1 + 3 files changed, 63 insertions(+), 22 deletions(-) create mode 100644 integration/server_test.go diff --git a/integration/server_test.go b/integration/server_test.go new file mode 100644 index 00000000..032f0d38 --- /dev/null +++ b/integration/server_test.go @@ -0,0 +1,36 @@ +// +build integration + +package integration + +import ( + "os" + "os/exec" + "path" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +const ( + binaryName = "xds-relay" +) + +func TestXdsClientGetsIncrementalResponsesFromUpstreamServer(t *testing.T) { + dir, err := os.Getwd() + assert.Nil(t, err) + cmd := exec.Command(path.Join(dir, "bin", binaryName), + "-c", "./integration/testdata/bootstrap_configuration_complete_tech_spec.yaml", + "-a", "./integration/testdata/keyer_configuration_complete_tech_spec.yaml", + "-l", "debug", + "-m", "serve") + err = cmd.Start() + assert.Nil(t, err) + <-time.After(5 * time.Second) + assert.Equal(t, -1, cmd.ProcessState.ExitCode()) + e := cmd.Process.Signal(syscall.SIGINT) + assert.Nil(t, e) + err = cmd.Wait() + assert.Nil(t, e) +} diff --git a/internal/app/server/server.go b/internal/app/server/server.go index 7178f2cd..eb548f1c 100644 --- a/internal/app/server/server.go +++ b/internal/app/server/server.go @@ -40,6 +40,21 @@ func Run(bootstrapConfig *bootstrapv1.Bootstrap, ctx, cancel := context.WithCancel(context.Background()) defer cancel() + // Initialize request aggregation mapper component. + requestMapper := mapper.NewMapper(aggregationRulesConfig) + + if mode != "serve" { + return + } + + server := grpc.NewServer() + serverPort := strconv.FormatUint(uint64(bootstrapConfig.Server.Address.PortValue), 10) + serverAddress := net.JoinHostPort(bootstrapConfig.Server.Address.Address, serverPort) + listener, err := net.Listen("tcp", serverAddress) // #nosec + if err != nil { + logger.With("err", err).Fatal(ctx, "failed to bind server to listener") + } + // Initialize upstream client. upstreamPort := strconv.FormatUint(uint64(bootstrapConfig.OriginServer.Address.PortValue), 10) upstreamAddress := net.JoinHostPort(bootstrapConfig.OriginServer.Address.Address, upstreamPort) @@ -49,52 +64,41 @@ func Run(bootstrapConfig *bootstrapv1.Bootstrap, ctx, upstreamAddress, upstream.CallOptions{Timeout: time.Minute}, - logger.Named("xdsclient"), + logger, ) if err != nil { logger.With("error", err).Panic(ctx, "failed to initialize upstream client") } - - // Initialize request aggregation mapper component. - requestMapper := mapper.NewMapper(aggregationRulesConfig) - // Initialize orchestrator. orchestrator := orchestrator.New(ctx, logger, requestMapper, upstreamClient, bootstrapConfig.Cache) - // Start server. gcpServer := gcp.NewServer(ctx, orchestrator, nil) - server := grpc.NewServer() - serverPort := strconv.FormatUint(uint64(bootstrapConfig.Server.Address.PortValue), 10) - serverAddress := net.JoinHostPort(bootstrapConfig.Server.Address.Address, serverPort) - listener, err := net.Listen("tcp", serverAddress) // #nosec - if err != nil { - logger.With("err", err).Fatal(ctx, "failed to bind server to listener") - } - api.RegisterEndpointDiscoveryServiceServer(server, gcpServer) api.RegisterClusterDiscoveryServiceServer(server, gcpServer) api.RegisterRouteDiscoveryServiceServer(server, gcpServer) api.RegisterListenerDiscoveryServiceServer(server, gcpServer) - if mode != "serve" { - return - } - - registerShutdownHandler(server) + registerShutdownHandler(server, logger) logger.With("address", listener.Addr()).Info(ctx, "Initializing server") if err := server.Serve(listener); err != nil { logger.With("err", err).Fatal(ctx, "failed to initialize server") } } -func registerShutdownHandler(server *grpc.Server) { +func registerShutdownHandler(server *grpc.Server, logger log.Logger) { sigs := make(chan os.Signal, 1) + ctx := context.Background() signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { - <-sigs - _ = util.DoWithTimeout(context.Background(), func() error { + sig := <-sigs + logger.Info(ctx, "received interrupt signal:", sig.String()) + err := util.DoWithTimeout(ctx, func() error { + logger.Debug(ctx, "initiating grpc graceful stop") server.GracefulStop() return nil }, time.Second*30) + if err != nil { + logger.Error(ctx, "shutdown error: %s", err) + } }() } diff --git a/internal/app/upstream/client.go b/internal/app/upstream/client.go index 3ef6acf6..6967d1af 100644 --- a/internal/app/upstream/client.go +++ b/internal/app/upstream/client.go @@ -79,6 +79,7 @@ type version struct { // The method does not block until the underlying connection is up. // Returns immediately and connecting the server happens in background func NewClient(ctx context.Context, url string, callOptions CallOptions, logger log.Logger) (Client, error) { + logger.Debug(ctx, "Initiating upstream connection.") // TODO: configure grpc options.https://github.com/envoyproxy/xds-relay/issues/55 conn, err := grpc.Dial(url, grpc.WithInsecure()) if err != nil { From e77005bf1830a6b9a0848338278b621f20f7e6de Mon Sep 17 00:00:00 2001 From: Jyoti Mahapatra Date: Thu, 23 Apr 2020 09:59:44 -0700 Subject: [PATCH 08/17] rename test Signed-off-by: Jyoti Mahapatra --- integration/server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/server_test.go b/integration/server_test.go index 032f0d38..fcc97a01 100644 --- a/integration/server_test.go +++ b/integration/server_test.go @@ -17,7 +17,7 @@ const ( binaryName = "xds-relay" ) -func TestXdsClientGetsIncrementalResponsesFromUpstreamServer(t *testing.T) { +func TestServerShutdown(t *testing.T) { dir, err := os.Getwd() assert.Nil(t, err) cmd := exec.Command(path.Join(dir, "bin", binaryName), From 8f95e10e76233bffabcc66767a80e0d96e12c125 Mon Sep 17 00:00:00 2001 From: Jyoti Mahapatra Date: Thu, 23 Apr 2020 10:57:52 -0700 Subject: [PATCH 09/17] info logs Signed-off-by: Jyoti Mahapatra --- internal/app/server/server.go | 19 ++++++++++--------- internal/app/upstream/client.go | 2 +- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/internal/app/server/server.go b/internal/app/server/server.go index eb548f1c..7aeb3ed6 100644 --- a/internal/app/server/server.go +++ b/internal/app/server/server.go @@ -47,14 +47,6 @@ func Run(bootstrapConfig *bootstrapv1.Bootstrap, return } - server := grpc.NewServer() - serverPort := strconv.FormatUint(uint64(bootstrapConfig.Server.Address.PortValue), 10) - serverAddress := net.JoinHostPort(bootstrapConfig.Server.Address.Address, serverPort) - listener, err := net.Listen("tcp", serverAddress) // #nosec - if err != nil { - logger.With("err", err).Fatal(ctx, "failed to bind server to listener") - } - // Initialize upstream client. upstreamPort := strconv.FormatUint(uint64(bootstrapConfig.OriginServer.Address.PortValue), 10) upstreamAddress := net.JoinHostPort(bootstrapConfig.OriginServer.Address.Address, upstreamPort) @@ -71,8 +63,17 @@ func Run(bootstrapConfig *bootstrapv1.Bootstrap, } // Initialize orchestrator. orchestrator := orchestrator.New(ctx, logger, requestMapper, upstreamClient, bootstrapConfig.Cache) + // Start server. gcpServer := gcp.NewServer(ctx, orchestrator, nil) + server := grpc.NewServer() + serverPort := strconv.FormatUint(uint64(bootstrapConfig.Server.Address.PortValue), 10) + serverAddress := net.JoinHostPort(bootstrapConfig.Server.Address.Address, serverPort) + listener, err := net.Listen("tcp", serverAddress) // #nosec + if err != nil { + logger.With("err", err).Fatal(ctx, "failed to bind server to listener") + } + api.RegisterEndpointDiscoveryServiceServer(server, gcpServer) api.RegisterClusterDiscoveryServiceServer(server, gcpServer) api.RegisterRouteDiscoveryServiceServer(server, gcpServer) @@ -93,7 +94,7 @@ func registerShutdownHandler(server *grpc.Server, logger log.Logger) { sig := <-sigs logger.Info(ctx, "received interrupt signal:", sig.String()) err := util.DoWithTimeout(ctx, func() error { - logger.Debug(ctx, "initiating grpc graceful stop") + logger.Info(ctx, "initiating grpc graceful stop") server.GracefulStop() return nil }, time.Second*30) diff --git a/internal/app/upstream/client.go b/internal/app/upstream/client.go index 6967d1af..eb85ac0f 100644 --- a/internal/app/upstream/client.go +++ b/internal/app/upstream/client.go @@ -79,7 +79,7 @@ type version struct { // The method does not block until the underlying connection is up. // Returns immediately and connecting the server happens in background func NewClient(ctx context.Context, url string, callOptions CallOptions, logger log.Logger) (Client, error) { - logger.Debug(ctx, "Initiating upstream connection.") + logger.Info(ctx, "Initiating upstream connection.") // TODO: configure grpc options.https://github.com/envoyproxy/xds-relay/issues/55 conn, err := grpc.Dial(url, grpc.WithInsecure()) if err != nil { From 71c583d39ec95b3509cdad2008558c4a3df96715 Mon Sep 17 00:00:00 2001 From: Jyoti Mahapatra Date: Thu, 23 Apr 2020 15:27:10 -0700 Subject: [PATCH 10/17] revert mode placemeent Signed-off-by: Jyoti Mahapatra --- internal/app/server/server.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/app/server/server.go b/internal/app/server/server.go index 7aeb3ed6..51c3e8bc 100644 --- a/internal/app/server/server.go +++ b/internal/app/server/server.go @@ -43,10 +43,6 @@ func Run(bootstrapConfig *bootstrapv1.Bootstrap, // Initialize request aggregation mapper component. requestMapper := mapper.NewMapper(aggregationRulesConfig) - if mode != "serve" { - return - } - // Initialize upstream client. upstreamPort := strconv.FormatUint(uint64(bootstrapConfig.OriginServer.Address.PortValue), 10) upstreamAddress := net.JoinHostPort(bootstrapConfig.OriginServer.Address.Address, upstreamPort) @@ -79,6 +75,10 @@ func Run(bootstrapConfig *bootstrapv1.Bootstrap, api.RegisterRouteDiscoveryServiceServer(server, gcpServer) api.RegisterListenerDiscoveryServiceServer(server, gcpServer) + if mode != "serve" { + return + } + registerShutdownHandler(server, logger) logger.With("address", listener.Addr()).Info(ctx, "Initializing server") if err := server.Serve(listener); err != nil { From 230bd6eef54f043552c37502d5e70d873d84227b Mon Sep 17 00:00:00 2001 From: Jyoti Mahapatra Date: Thu, 23 Apr 2020 15:29:07 -0700 Subject: [PATCH 11/17] revert mode placemeent Signed-off-by: Jyoti Mahapatra --- internal/app/server/server.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/app/server/server.go b/internal/app/server/server.go index 51c3e8bc..46f856b6 100644 --- a/internal/app/server/server.go +++ b/internal/app/server/server.go @@ -57,6 +57,10 @@ func Run(bootstrapConfig *bootstrapv1.Bootstrap, if err != nil { logger.With("error", err).Panic(ctx, "failed to initialize upstream client") } + + // Initialize request aggregation mapper component. + requestMapper := mapper.NewMapper(aggregationRulesConfig) + // Initialize orchestrator. orchestrator := orchestrator.New(ctx, logger, requestMapper, upstreamClient, bootstrapConfig.Cache) From d488e3d38e35ae39e84799998e498efecec3cd2b Mon Sep 17 00:00:00 2001 From: Jyoti Mahapatra Date: Thu, 23 Apr 2020 15:29:51 -0700 Subject: [PATCH 12/17] revert mode placemeent Signed-off-by: Jyoti Mahapatra --- internal/app/server/server.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/internal/app/server/server.go b/internal/app/server/server.go index 46f856b6..b7c6d5b6 100644 --- a/internal/app/server/server.go +++ b/internal/app/server/server.go @@ -40,9 +40,6 @@ func Run(bootstrapConfig *bootstrapv1.Bootstrap, ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // Initialize request aggregation mapper component. - requestMapper := mapper.NewMapper(aggregationRulesConfig) - // Initialize upstream client. upstreamPort := strconv.FormatUint(uint64(bootstrapConfig.OriginServer.Address.PortValue), 10) upstreamAddress := net.JoinHostPort(bootstrapConfig.OriginServer.Address.Address, upstreamPort) From a521809cae2369fb9b5003a60cde19e2d493553d Mon Sep 17 00:00:00 2001 From: Jyoti Mahapatra Date: Thu, 23 Apr 2020 15:44:18 -0700 Subject: [PATCH 13/17] logger.Sync Signed-off-by: Jyoti Mahapatra --- internal/app/server/server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/app/server/server.go b/internal/app/server/server.go index b7c6d5b6..638fb74c 100644 --- a/internal/app/server/server.go +++ b/internal/app/server/server.go @@ -97,6 +97,7 @@ func registerShutdownHandler(server *grpc.Server, logger log.Logger) { err := util.DoWithTimeout(ctx, func() error { logger.Info(ctx, "initiating grpc graceful stop") server.GracefulStop() + logger.Sync() return nil }, time.Second*30) if err != nil { From cfa80132478137772d9c52ab7340eb4b405d39c3 Mon Sep 17 00:00:00 2001 From: Jyoti Mahapatra Date: Thu, 23 Apr 2020 15:46:46 -0700 Subject: [PATCH 14/17] fix lint Signed-off-by: Jyoti Mahapatra --- internal/app/server/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/app/server/server.go b/internal/app/server/server.go index 638fb74c..88d65660 100644 --- a/internal/app/server/server.go +++ b/internal/app/server/server.go @@ -97,7 +97,7 @@ func registerShutdownHandler(server *grpc.Server, logger log.Logger) { err := util.DoWithTimeout(ctx, func() error { logger.Info(ctx, "initiating grpc graceful stop") server.GracefulStop() - logger.Sync() + _ = logger.Sync() return nil }, time.Second*30) if err != nil { From a59ea8e47d61f73445788034ffb631ab8b9a8127 Mon Sep 17 00:00:00 2001 From: Jyoti Mahapatra Date: Thu, 23 Apr 2020 16:04:03 -0700 Subject: [PATCH 15/17] cancel inside shuutdown Signed-off-by: Jyoti Mahapatra --- internal/app/server/server.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/internal/app/server/server.go b/internal/app/server/server.go index 88d65660..63337d5c 100644 --- a/internal/app/server/server.go +++ b/internal/app/server/server.go @@ -36,7 +36,6 @@ func Run(bootstrapConfig *bootstrapv1.Bootstrap, logger = log.New(bootstrapConfig.Logging.Level.String()) } - // TODO cancel should be invoked by shutdown handlers. ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -80,16 +79,15 @@ func Run(bootstrapConfig *bootstrapv1.Bootstrap, return } - registerShutdownHandler(server, logger) + registerShutdownHandler(ctx, cancel, server, logger) logger.With("address", listener.Addr()).Info(ctx, "Initializing server") if err := server.Serve(listener); err != nil { logger.With("err", err).Fatal(ctx, "failed to initialize server") } } -func registerShutdownHandler(server *grpc.Server, logger log.Logger) { +func registerShutdownHandler(ctx context.Context, cancel context.CancelFunc, server *grpc.Server, logger log.Logger) { sigs := make(chan os.Signal, 1) - ctx := context.Background() signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { sig := <-sigs @@ -98,6 +96,7 @@ func registerShutdownHandler(server *grpc.Server, logger log.Logger) { logger.Info(ctx, "initiating grpc graceful stop") server.GracefulStop() _ = logger.Sync() + cancel() return nil }, time.Second*30) if err != nil { From 215b5848e39c3049aa0149a8aecc619308b7355b Mon Sep 17 00:00:00 2001 From: Jyoti Mahapatra Date: Thu, 23 Apr 2020 17:14:50 -0700 Subject: [PATCH 16/17] test blocking shutdown Signed-off-by: Jyoti Mahapatra --- internal/app/server/server.go | 15 ++++--- internal/app/server/server_test.go | 69 ++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 5 deletions(-) create mode 100644 internal/app/server/server_test.go diff --git a/internal/app/server/server.go b/internal/app/server/server.go index 63337d5c..5c2b132f 100644 --- a/internal/app/server/server.go +++ b/internal/app/server/server.go @@ -79,14 +79,19 @@ func Run(bootstrapConfig *bootstrapv1.Bootstrap, return } - registerShutdownHandler(ctx, cancel, server, logger) + registerShutdownHandler(ctx, cancel, server.GracefulStop, logger, time.Second*30) logger.With("address", listener.Addr()).Info(ctx, "Initializing server") if err := server.Serve(listener); err != nil { logger.With("err", err).Fatal(ctx, "failed to initialize server") } } -func registerShutdownHandler(ctx context.Context, cancel context.CancelFunc, server *grpc.Server, logger log.Logger) { +func registerShutdownHandler( + ctx context.Context, + cancel context.CancelFunc, + gracefulStop func(), + logger log.Logger, + waitTime time.Duration) { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { @@ -94,13 +99,13 @@ func registerShutdownHandler(ctx context.Context, cancel context.CancelFunc, ser logger.Info(ctx, "received interrupt signal:", sig.String()) err := util.DoWithTimeout(ctx, func() error { logger.Info(ctx, "initiating grpc graceful stop") - server.GracefulStop() + gracefulStop() _ = logger.Sync() cancel() return nil - }, time.Second*30) + }, waitTime) if err != nil { - logger.Error(ctx, "shutdown error: %s", err) + logger.Error(ctx, "shutdown error: ", err.Error()) } }() } diff --git a/internal/app/server/server_test.go b/internal/app/server/server_test.go new file mode 100644 index 00000000..1ad82d29 --- /dev/null +++ b/internal/app/server/server_test.go @@ -0,0 +1,69 @@ +package server + +import ( + "context" + "fmt" + "syscall" + "testing" + "time" + + "github.com/envoyproxy/xds-relay/internal/pkg/log" + "github.com/stretchr/testify/assert" +) + +func TestShutdown(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + blockedCh := make(chan bool, 1) + l := &logger{} + registerShutdownHandler(ctx, cancel, func() { + blockedCh <- true + }, l, time.Second*5) + syscall.Kill(syscall.Getpid(), syscall.SIGINT) + <-blockedCh +} + +func TestShutdownTimeout(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + l := &logger{blockedCh: make(chan bool, 1)} + registerShutdownHandler(ctx, cancel, func() { + <-time.After(time.Minute) + }, l, time.Millisecond) + syscall.Kill(syscall.Getpid(), syscall.SIGINT) + <-l.blockedCh + assert.Equal(t, "shutdown error: context deadline exceeded", l.lastErr) +} + +type logger struct { + blockedCh chan bool + lastErr string +} + +func (l *logger) Named(name string) log.Logger { + return l +} + +func (l *logger) With(args ...interface{}) log.Logger { + return l +} + +func (l *logger) Sync() error { return nil } + +func (l *logger) Debug(ctx context.Context, args ...interface{}) { +} + +func (l *logger) Info(ctx context.Context, args ...interface{}) { +} + +func (l *logger) Warn(ctx context.Context, args ...interface{}) { +} + +func (l *logger) Error(ctx context.Context, args ...interface{}) { + l.lastErr = fmt.Sprint(args...) + l.blockedCh <- true +} + +func (l *logger) Fatal(ctx context.Context, args ...interface{}) { +} + +func (l *logger) Panic(ctx context.Context, args ...interface{}) { +} From 02716745a90c1e3278b6c8bcb6f73114bb15bd2d Mon Sep 17 00:00:00 2001 From: Jyoti Mahapatra Date: Thu, 23 Apr 2020 17:27:12 -0700 Subject: [PATCH 17/17] lint Signed-off-by: Jyoti Mahapatra --- internal/app/server/server_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/app/server/server_test.go b/internal/app/server/server_test.go index 1ad82d29..98f6b759 100644 --- a/internal/app/server/server_test.go +++ b/internal/app/server/server_test.go @@ -18,7 +18,7 @@ func TestShutdown(t *testing.T) { registerShutdownHandler(ctx, cancel, func() { blockedCh <- true }, l, time.Second*5) - syscall.Kill(syscall.Getpid(), syscall.SIGINT) + _ = syscall.Kill(syscall.Getpid(), syscall.SIGINT) <-blockedCh } @@ -28,7 +28,7 @@ func TestShutdownTimeout(t *testing.T) { registerShutdownHandler(ctx, cancel, func() { <-time.After(time.Minute) }, l, time.Millisecond) - syscall.Kill(syscall.Getpid(), syscall.SIGINT) + _ = syscall.Kill(syscall.Getpid(), syscall.SIGINT) <-l.blockedCh assert.Equal(t, "shutdown error: context deadline exceeded", l.lastErr) }