diff --git a/cmd/main.go b/cmd/main.go index c35dc068..88752602 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -4,13 +4,18 @@ package main import ( + "context" "flag" + "fmt" "os" "strings" + "time" "github.com/spf13/cobra" "k8s.io/cli-runtime/pkg/genericclioptions" + "k8s.io/client-go/rest" "k8s.io/component-base/cli" + "k8s.io/klog/v2" "k8s.io/kubectl/pkg/cmd/util" "sigs.k8s.io/cli-utils/cmd/apply" "sigs.k8s.io/cli-utils/cmd/destroy" @@ -18,6 +23,7 @@ import ( "sigs.k8s.io/cli-utils/cmd/initcmd" "sigs.k8s.io/cli-utils/cmd/preview" "sigs.k8s.io/cli-utils/cmd/status" + "sigs.k8s.io/cli-utils/pkg/flowcontrol" "sigs.k8s.io/cli-utils/pkg/inventory" "sigs.k8s.io/cli-utils/pkg/manifestreader" @@ -46,29 +52,32 @@ func main() { flags.AddGoFlagSet(flag.CommandLine) f := util.NewFactory(matchVersionKubeConfigFlags) + // Update ConfigFlags before subcommands run that talk to the server. + preRunE := newConfigFilerPreRunE(f, kubeConfigFlags) + ioStreams := genericclioptions.IOStreams{ In: os.Stdin, Out: os.Stdout, ErrOut: os.Stderr, } - names := []string{"init", "apply", "preview", "diff", "destroy", "status"} - initCmd := initcmd.NewCmdInit(f, ioStreams) - updateHelp(names, initCmd) loader := manifestreader.NewManifestLoader(f) invFactory := inventory.ClusterClientFactory{StatusPolicy: inventory.StatusPolicyNone} - applyCmd := apply.Command(f, invFactory, loader, ioStreams) - updateHelp(names, applyCmd) - previewCmd := preview.Command(f, invFactory, loader, ioStreams) - updateHelp(names, previewCmd) - diffCmd := diff.NewCommand(f, ioStreams) - updateHelp(names, diffCmd) - destroyCmd := destroy.Command(f, invFactory, loader, ioStreams) - updateHelp(names, destroyCmd) - statusCmd := status.Command(f, invFactory, loader) - updateHelp(names, statusCmd) - cmd.AddCommand(initCmd, applyCmd, diffCmd, destroyCmd, previewCmd, statusCmd) + names := []string{"init", "apply", "destroy", "diff", "preview", "status"} + subCmds := []*cobra.Command{ + initcmd.NewCmdInit(f, ioStreams), + apply.Command(f, invFactory, loader, ioStreams), + destroy.Command(f, invFactory, loader, ioStreams), + diff.NewCommand(f, ioStreams), + preview.Command(f, invFactory, loader, ioStreams), + status.Command(f, invFactory, loader), + } + for _, subCmd := range subCmds { + subCmd.PreRunE = preRunE + updateHelp(names, subCmd) + cmd.AddCommand(subCmd) + } code := cli.Run(cmd) os.Exit(code) @@ -83,3 +92,33 @@ func updateHelp(names []string, c *cobra.Command) { c.Example = strings.ReplaceAll(c.Example, "kubectl "+name, "kapply "+name) } } + +// newConfigFilerPreRunE returns a cobra command PreRunE function that +// performs a lookup to determine if server-side throttling is enabled. If so, +// client-side throttling is disabled in the ConfigFlags. +func newConfigFilerPreRunE(f util.Factory, configFlags *genericclioptions.ConfigFlags) func(*cobra.Command, []string) error { + return func(_ *cobra.Command, args []string) error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + restConfig, err := f.ToRESTConfig() + if err != nil { + return err + } + enabled, err := flowcontrol.IsEnabled(ctx, restConfig) + if err != nil { + return fmt.Errorf("checking server-side throttling enablement: %w", err) + } + if enabled { + // Disable client-side throttling. + klog.V(3).Infof("Client-side throttling disabled") + // WrapConfigFn will affect future Factory.ToRESTConfig() calls. + configFlags.WrapConfigFn = func(cfg *rest.Config) *rest.Config { + cfg.QPS = -1 + cfg.Burst = -1 + return cfg + } + } + return nil + } +} diff --git a/pkg/flowcontrol/flowcontrol.go b/pkg/flowcontrol/flowcontrol.go new file mode 100644 index 00000000..799df11c --- /dev/null +++ b/pkg/flowcontrol/flowcontrol.go @@ -0,0 +1,107 @@ +// Copyright 2022 The Kubernetes Authors. +// SPDX-License-Identifier: Apache-2.0 + +package flowcontrol + +import ( + "context" + "fmt" + "net/http" + "net/url" + + flowcontrolapi "k8s.io/api/flowcontrol/v1beta2" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/rest" + "k8s.io/client-go/transport" +) + +const pingPath = "/livez/ping" + +// IsEnabled returns true if the server has the PriorityAndFairness flow control +// filter enabled. This check performs a GET request to the /version endpoint +// and looks for the presence of the `X-Kubernetes-PF-FlowSchema-UID` header. +func IsEnabled(ctx context.Context, config *rest.Config) (bool, error) { + // Build a RoundTripper from the provided REST client config. + // RoundTriper handles TLS, auth, auth proxy, user agent, impersonation, and + // debug logs. It also provides acess to the response headers, unlike the + // REST client. And we can't just use an HTTP client, because the version + // endpoint may or may not require auth, depending on RBAC config. + tcfg, err := config.TransportConfig() + if err != nil { + return false, fmt.Errorf("building transport config: %w", err) + } + roudtripper, err := transport.New(tcfg) + if err != nil { + return false, fmt.Errorf("building round tripper: %w", err) + } + + // Build the base apiserver URL from the provided REST client config. + url, err := serverURL(config) + if err != nil { + return false, fmt.Errorf("building server URL: %w", err) + } + + // Use the ping endpoint, because it's small and fast. + // It's alpha in v1.23+, but a 404 will still have the flowcontrol headers. + // Replacing the path is safe, because DefaultServerURL will have errored + // if it wasn't empty from the config. + url.Path = pingPath + + // Build HEAD request with an empty body. + req, err := http.NewRequestWithContext(ctx, "HEAD", url.String(), nil) + if err != nil { + return false, fmt.Errorf("building request: %w", err) + } + + if config.UserAgent != "" { + req.Header.Set("User-Agent", config.UserAgent) + } + + // We don't care what the response body is. + // req.Header.Set("Accept", "text/plain") + + // Perform the request. + resp, err := roudtripper.RoundTrip(req) + if err != nil { + return false, fmt.Errorf("making %s request: %w", pingPath, err) + } + // Probably nil for HEAD, but check anyway. + if resp.Body != nil { + // Always close the response body, to free up resources. + err := resp.Body.Close() + if err != nil { + return false, fmt.Errorf("closing response body: %v", err) + } + } + + // If the response has one of the flowcontrol headers, + // that means the flowcontrol filter is enabled. + // There are two headers, but they're always both set by FlowControl. + // So we only need to check one. + // key = flowcontrolapi.ResponseHeaderMatchedPriorityLevelConfigurationUID + key := flowcontrolapi.ResponseHeaderMatchedFlowSchemaUID + if value := resp.Header.Get(key); value != "" { + // We don't care what the value is (always a UID). + // We just care that the header is present. + return true, nil + } + return false, nil +} + +// serverUrl returns the base URL for the cluster based on the supplied config. +// Host and Version are required. GroupVersion is ignored. +// Based on `defaultServerUrlFor` from k8s.io/client-go@v0.23.2/rest/url_utils.go +func serverURL(config *rest.Config) (*url.URL, error) { + // TODO: move the default to secure when the apiserver supports TLS by default + // config.Insecure is taken to mean "I want HTTPS but don't bother checking the certs against a CA." + hasCA := len(config.CAFile) != 0 || len(config.CAData) != 0 + hasCert := len(config.CertFile) != 0 || len(config.CertData) != 0 + defaultTLS := hasCA || hasCert || config.Insecure + host := config.Host + if host == "" { + host = "localhost" + } + + hostURL, _, err := rest.DefaultServerURL(host, config.APIPath, schema.GroupVersion{}, defaultTLS) + return hostURL, err +} diff --git a/pkg/flowcontrol/flowcontrol_test.go b/pkg/flowcontrol/flowcontrol_test.go new file mode 100644 index 00000000..f559e858 --- /dev/null +++ b/pkg/flowcontrol/flowcontrol_test.go @@ -0,0 +1,91 @@ +// Copyright 2022 The Kubernetes Authors. +// SPDX-License-Identifier: Apache-2.0 + +package flowcontrol + +import ( + "bytes" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + "time" + + "context" + + "github.com/stretchr/testify/assert" + flowcontrolapi "k8s.io/api/flowcontrol/v1beta2" + "k8s.io/client-go/rest" + "sigs.k8s.io/cli-utils/pkg/testutil" +) + +func TestIsEnabled(t *testing.T) { + testCases := []struct { + name string + handler func(*http.Request) *http.Response + expectedEnabled bool + expectedError error + }{ + { + name: "header found", + handler: func(req *http.Request) *http.Response { + defer req.Body.Close() + headers := http.Header{} + headers.Add(flowcontrolapi.ResponseHeaderMatchedFlowSchemaUID, "unused-uuid") + + return &http.Response{ + StatusCode: 200, + Header: headers, + Body: ioutil.NopCloser(bytes.NewReader(nil)), + } + }, + expectedEnabled: true, + }, + { + name: "header not found", + handler: func(req *http.Request) *http.Response { + defer req.Body.Close() + return &http.Response{ + StatusCode: 200, + Header: http.Header{}, + Body: ioutil.NopCloser(bytes.NewReader(nil)), + } + }, + expectedEnabled: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + assert.Equal(t, "/livez/ping", req.URL.Path) + resp := tc.handler(req) + defer resp.Body.Close() + for k, vs := range resp.Header { + w.Header().Del(k) + for _, v := range vs { + w.Header().Add(k, v) + } + } + w.WriteHeader(resp.StatusCode) + _, err := io.Copy(w, resp.Body) + assert.NoError(t, err) + }) + + server := httptest.NewServer(handler) + defer server.Close() + + cfg := &rest.Config{ + Host: server.URL, + } + + enabled, err := IsEnabled(ctx, cfg) + testutil.AssertEqual(t, tc.expectedError, err) + assert.Equal(t, tc.expectedEnabled, enabled) + }) + } +} diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 9f938528..2cf23d73 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -55,10 +55,14 @@ var _ = BeforeSuite(func() { cfg, err := ctrl.GetConfig() Expect(err).NotTo(HaveOccurred()) - // Disable client-side throttling. - // Recent versions of kind support server-side throttling. - cfg.QPS = -1 - cfg.Burst = -1 + cfg.UserAgent = e2eutil.UserAgent("test/e2e") + + if e2eutil.IsFlowControlEnabled(cfg) { + // Disable client-side throttling. + klog.V(3).Infof("Client-side throttling disabled") + cfg.QPS = -1 + cfg.Burst = -1 + } inventoryConfigs[ConfigMapTypeInvConfig] = invconfig.NewConfigMapTypeInvConfig(cfg) inventoryConfigs[CustomTypeInvConfig] = invconfig.NewCustomTypeInvConfig(cfg) diff --git a/test/e2e/e2eutil/common.go b/test/e2e/e2eutil/common.go index a6518a79..ab90d271 100644 --- a/test/e2e/e2eutil/common.go +++ b/test/e2e/e2eutil/common.go @@ -19,8 +19,10 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/yaml" + "k8s.io/client-go/rest" "sigs.k8s.io/cli-utils/pkg/apply/event" "sigs.k8s.io/cli-utils/pkg/common" + "sigs.k8s.io/cli-utils/pkg/flowcontrol" "sigs.k8s.io/cli-utils/pkg/kstatus/status" "sigs.k8s.io/cli-utils/pkg/object/dependson" "sigs.k8s.io/cli-utils/pkg/object/mutation" @@ -410,3 +412,13 @@ func UnstructuredNamespace(name string) *unstructured.Unstructured { u.SetName(name) return u } + +func IsFlowControlEnabled(config *rest.Config) bool { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + enabled, err := flowcontrol.IsEnabled(ctx, config) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + + return enabled +} diff --git a/test/e2e/e2eutil/rest.go b/test/e2e/e2eutil/rest.go new file mode 100644 index 00000000..32735076 --- /dev/null +++ b/test/e2e/e2eutil/rest.go @@ -0,0 +1,110 @@ +// Copyright 2022 The Kubernetes Authors. +// SPDX-License-Identifier: Apache-2.0 + +package e2eutil + +import ( + "bytes" + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" + "time" + + "github.com/onsi/gomega" +) + +const unknown = "unknown" + +// UserAgent returns the a User-Agent for use with HTTP clients. +// The string corresponds to the current version of the binary being executed, +// using metadata from git and go. +func UserAgent(suffix string) string { + return fmt.Sprintf("%s/%s (%s/%s) cli-utils/%s/%s", + adjustCommand(os.Args[0]), + adjustVersion(gitVersion()), + runtime.GOOS, + runtime.GOARCH, + adjustCommit(gitCommit()), + suffix) +} + +// gitVersion returns the output from `git describe` +func gitVersion() string { + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + cmd := exec.CommandContext(ctx, "git", "describe") + + // Ginkgo sets the working directory to the current test dir + cwd, err := os.Getwd() + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + cmd.Dir = cwd + + var outBuf bytes.Buffer + var errBuf bytes.Buffer + cmd.Stdout = &outBuf + cmd.Stderr = &errBuf + + err = cmd.Run() + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "STDERR: %s", errBuf.String()) + + return strings.TrimSpace(outBuf.String()) +} + +// gitCommit returns the output from `git rev-parse HEAD` +func gitCommit() string { + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + cmd := exec.CommandContext(ctx, "git", "rev-parse", "HEAD") + + // Ginkgo sets the working directory to the current test dir + cwd, err := os.Getwd() + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + cmd.Dir = cwd + + var outBuf bytes.Buffer + var errBuf bytes.Buffer + cmd.Stdout = &outBuf + cmd.Stderr = &errBuf + + err = cmd.Run() + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "STDERR: %s", errBuf.String()) + + return strings.TrimSpace(outBuf.String()) +} + +// adjustCommand returns the last component of the +// OS-specific command path for use in User-Agent. +func adjustCommand(p string) string { + // Unlikely, but better than returning "". + if len(p) == 0 { + return unknown + } + return filepath.Base(p) +} + +// adjustVersion strips "alpha", "beta", etc. from version in form +// major.minor.patch-[alpha|beta|etc]. +func adjustVersion(v string) string { + if len(v) == 0 { + return unknown + } + seg := strings.SplitN(v, "-", 2) + return seg[0] +} + +// adjustCommit returns sufficient significant figures of the commit's git hash. +func adjustCommit(c string) string { + if len(c) == 0 { + return unknown + } + if len(c) > 7 { + return c[:7] + } + return c +} diff --git a/test/stress/stress_test.go b/test/stress/stress_test.go index 9cebd78c..06889590 100644 --- a/test/stress/stress_test.go +++ b/test/stress/stress_test.go @@ -43,10 +43,14 @@ var _ = BeforeSuite(func() { cfg, err := ctrl.GetConfig() Expect(err).NotTo(HaveOccurred()) - // Disable client-side throttling. - // Recent versions of kind support server-side throttling. - cfg.QPS = -1 - cfg.Burst = -1 + cfg.UserAgent = e2eutil.UserAgent("test/stress") + + if e2eutil.IsFlowControlEnabled(cfg) { + // Disable client-side throttling. + klog.V(3).Infof("Client-side throttling disabled") + cfg.QPS = -1 + cfg.Burst = -1 + } invConfig = invconfig.NewCustomTypeInvConfig(cfg)