Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use server-side throttling, if enabled #584

Merged
merged 1 commit into from
Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 53 additions & 14 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,26 @@
package main

import (
"context"
"flag"
"fmt"
"os"
"strings"
"time"

"github.com/spf13/cobra"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/rest"
"k8s.io/component-base/cli"
"k8s.io/klog/v2"
"k8s.io/kubectl/pkg/cmd/util"
"sigs.k8s.io/cli-utils/cmd/apply"
"sigs.k8s.io/cli-utils/cmd/destroy"
"sigs.k8s.io/cli-utils/cmd/diff"
"sigs.k8s.io/cli-utils/cmd/initcmd"
"sigs.k8s.io/cli-utils/cmd/preview"
"sigs.k8s.io/cli-utils/cmd/status"
"sigs.k8s.io/cli-utils/pkg/flowcontrol"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/manifestreader"

Expand Down Expand Up @@ -46,29 +52,32 @@ func main() {
flags.AddGoFlagSet(flag.CommandLine)
f := util.NewFactory(matchVersionKubeConfigFlags)

// Update ConfigFlags before subcommands run that talk to the server.
preRunE := newConfigFilerPreRunE(f, kubeConfigFlags)

ioStreams := genericclioptions.IOStreams{
In: os.Stdin,
Out: os.Stdout,
ErrOut: os.Stderr,
}

names := []string{"init", "apply", "preview", "diff", "destroy", "status"}
initCmd := initcmd.NewCmdInit(f, ioStreams)
updateHelp(names, initCmd)
loader := manifestreader.NewManifestLoader(f)
invFactory := inventory.ClusterClientFactory{StatusPolicy: inventory.StatusPolicyNone}
applyCmd := apply.Command(f, invFactory, loader, ioStreams)
updateHelp(names, applyCmd)
previewCmd := preview.Command(f, invFactory, loader, ioStreams)
updateHelp(names, previewCmd)
diffCmd := diff.NewCommand(f, ioStreams)
updateHelp(names, diffCmd)
destroyCmd := destroy.Command(f, invFactory, loader, ioStreams)
updateHelp(names, destroyCmd)
statusCmd := status.Command(f, invFactory, loader)
updateHelp(names, statusCmd)

cmd.AddCommand(initCmd, applyCmd, diffCmd, destroyCmd, previewCmd, statusCmd)
names := []string{"init", "apply", "destroy", "diff", "preview", "status"}
subCmds := []*cobra.Command{
initcmd.NewCmdInit(f, ioStreams),
apply.Command(f, invFactory, loader, ioStreams),
destroy.Command(f, invFactory, loader, ioStreams),
diff.NewCommand(f, ioStreams),
preview.Command(f, invFactory, loader, ioStreams),
status.Command(f, invFactory, loader),
}
for _, subCmd := range subCmds {
subCmd.PreRunE = preRunE
updateHelp(names, subCmd)
cmd.AddCommand(subCmd)
}

code := cli.Run(cmd)
os.Exit(code)
Expand All @@ -83,3 +92,33 @@ func updateHelp(names []string, c *cobra.Command) {
c.Example = strings.ReplaceAll(c.Example, "kubectl "+name, "kapply "+name)
}
}

// newConfigFilerPreRunE returns a cobra command PreRunE function that
// performs a lookup to determine if server-side throttling is enabled. If so,
// client-side throttling is disabled in the ConfigFlags.
func newConfigFilerPreRunE(f util.Factory, configFlags *genericclioptions.ConfigFlags) func(*cobra.Command, []string) error {
return func(_ *cobra.Command, args []string) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

restConfig, err := f.ToRESTConfig()
if err != nil {
return err
}
enabled, err := flowcontrol.IsEnabled(ctx, restConfig)
if err != nil {
return fmt.Errorf("checking server-side throttling enablement: %w", err)
}
if enabled {
// Disable client-side throttling.
klog.V(3).Infof("Client-side throttling disabled")
// WrapConfigFn will affect future Factory.ToRESTConfig() calls.
configFlags.WrapConfigFn = func(cfg *rest.Config) *rest.Config {
cfg.QPS = -1
cfg.Burst = -1
return cfg
}
}
return nil
}
}
107 changes: 107 additions & 0 deletions pkg/flowcontrol/flowcontrol.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0

package flowcontrol

import (
"context"
"fmt"
"net/http"
"net/url"

flowcontrolapi "k8s.io/api/flowcontrol/v1beta2"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport"
)

const pingPath = "/livez/ping"

// IsEnabled returns true if the server has the PriorityAndFairness flow control
// filter enabled. This check performs a GET request to the /version endpoint
// and looks for the presence of the `X-Kubernetes-PF-FlowSchema-UID` header.
func IsEnabled(ctx context.Context, config *rest.Config) (bool, error) {
// Build a RoundTripper from the provided REST client config.
// RoundTriper handles TLS, auth, auth proxy, user agent, impersonation, and
// debug logs. It also provides acess to the response headers, unlike the
// REST client. And we can't just use an HTTP client, because the version
// endpoint may or may not require auth, depending on RBAC config.
tcfg, err := config.TransportConfig()
if err != nil {
return false, fmt.Errorf("building transport config: %w", err)
}
roudtripper, err := transport.New(tcfg)
if err != nil {
return false, fmt.Errorf("building round tripper: %w", err)
}

// Build the base apiserver URL from the provided REST client config.
url, err := serverURL(config)
if err != nil {
return false, fmt.Errorf("building server URL: %w", err)
}

// Use the ping endpoint, because it's small and fast.
// It's alpha in v1.23+, but a 404 will still have the flowcontrol headers.
// Replacing the path is safe, because DefaultServerURL will have errored
// if it wasn't empty from the config.
url.Path = pingPath

// Build HEAD request with an empty body.
req, err := http.NewRequestWithContext(ctx, "HEAD", url.String(), nil)
if err != nil {
return false, fmt.Errorf("building request: %w", err)
}

if config.UserAgent != "" {
req.Header.Set("User-Agent", config.UserAgent)
}

// We don't care what the response body is.
// req.Header.Set("Accept", "text/plain")

// Perform the request.
resp, err := roudtripper.RoundTrip(req)
if err != nil {
return false, fmt.Errorf("making %s request: %w", pingPath, err)
}
// Probably nil for HEAD, but check anyway.
if resp.Body != nil {
// Always close the response body, to free up resources.
err := resp.Body.Close()
if err != nil {
return false, fmt.Errorf("closing response body: %v", err)
}
}

// If the response has one of the flowcontrol headers,
// that means the flowcontrol filter is enabled.
// There are two headers, but they're always both set by FlowControl.
// So we only need to check one.
// key = flowcontrolapi.ResponseHeaderMatchedPriorityLevelConfigurationUID
key := flowcontrolapi.ResponseHeaderMatchedFlowSchemaUID
if value := resp.Header.Get(key); value != "" {
// We don't care what the value is (always a UID).
// We just care that the header is present.
return true, nil
}
return false, nil
}

// serverUrl returns the base URL for the cluster based on the supplied config.
// Host and Version are required. GroupVersion is ignored.
// Based on `defaultServerUrlFor` from k8s.io/client-go@v0.23.2/rest/url_utils.go
func serverURL(config *rest.Config) (*url.URL, error) {
// TODO: move the default to secure when the apiserver supports TLS by default
// config.Insecure is taken to mean "I want HTTPS but don't bother checking the certs against a CA."
hasCA := len(config.CAFile) != 0 || len(config.CAData) != 0
hasCert := len(config.CertFile) != 0 || len(config.CertData) != 0
defaultTLS := hasCA || hasCert || config.Insecure
host := config.Host
if host == "" {
host = "localhost"
}

hostURL, _, err := rest.DefaultServerURL(host, config.APIPath, schema.GroupVersion{}, defaultTLS)
return hostURL, err
}
91 changes: 91 additions & 0 deletions pkg/flowcontrol/flowcontrol_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0

package flowcontrol

import (
"bytes"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"time"

"context"

"github.com/stretchr/testify/assert"
flowcontrolapi "k8s.io/api/flowcontrol/v1beta2"
"k8s.io/client-go/rest"
"sigs.k8s.io/cli-utils/pkg/testutil"
)

func TestIsEnabled(t *testing.T) {
testCases := []struct {
name string
handler func(*http.Request) *http.Response
expectedEnabled bool
expectedError error
}{
{
name: "header found",
handler: func(req *http.Request) *http.Response {
defer req.Body.Close()
headers := http.Header{}
headers.Add(flowcontrolapi.ResponseHeaderMatchedFlowSchemaUID, "unused-uuid")

return &http.Response{
StatusCode: 200,
Header: headers,
Body: ioutil.NopCloser(bytes.NewReader(nil)),
}
},
expectedEnabled: true,
},
{
name: "header not found",
handler: func(req *http.Request) *http.Response {
defer req.Body.Close()
return &http.Response{
StatusCode: 200,
Header: http.Header{},
Body: ioutil.NopCloser(bytes.NewReader(nil)),
}
},
expectedEnabled: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
assert.Equal(t, "/livez/ping", req.URL.Path)
resp := tc.handler(req)
defer resp.Body.Close()
for k, vs := range resp.Header {
w.Header().Del(k)
for _, v := range vs {
w.Header().Add(k, v)
}
}
w.WriteHeader(resp.StatusCode)
_, err := io.Copy(w, resp.Body)
assert.NoError(t, err)
})

server := httptest.NewServer(handler)
defer server.Close()

cfg := &rest.Config{
Host: server.URL,
}

enabled, err := IsEnabled(ctx, cfg)
testutil.AssertEqual(t, tc.expectedError, err)
assert.Equal(t, tc.expectedEnabled, enabled)
})
}
}
12 changes: 8 additions & 4 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,14 @@ var _ = BeforeSuite(func() {
cfg, err := ctrl.GetConfig()
Expect(err).NotTo(HaveOccurred())

// Disable client-side throttling.
// Recent versions of kind support server-side throttling.
cfg.QPS = -1
cfg.Burst = -1
cfg.UserAgent = e2eutil.UserAgent("test/e2e")

if e2eutil.IsFlowControlEnabled(cfg) {
// Disable client-side throttling.
klog.V(3).Infof("Client-side throttling disabled")
cfg.QPS = -1
cfg.Burst = -1
}

inventoryConfigs[ConfigMapTypeInvConfig] = invconfig.NewConfigMapTypeInvConfig(cfg)
inventoryConfigs[CustomTypeInvConfig] = invconfig.NewCustomTypeInvConfig(cfg)
Expand Down
12 changes: 12 additions & 0 deletions test/e2e/e2eutil/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/rest"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/flowcontrol"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object/dependson"
"sigs.k8s.io/cli-utils/pkg/object/mutation"
Expand Down Expand Up @@ -410,3 +412,13 @@ func UnstructuredNamespace(name string) *unstructured.Unstructured {
u.SetName(name)
return u
}

func IsFlowControlEnabled(config *rest.Config) bool {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

enabled, err := flowcontrol.IsEnabled(ctx, config)
gomega.Expect(err).ToNot(gomega.HaveOccurred())

return enabled
}
Loading