Skip to content

Commit

Permalink
feat: Use server-side throttling, if enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
karlkfi committed Apr 27, 2022
1 parent 2ae0f05 commit 534438c
Show file tree
Hide file tree
Showing 7 changed files with 389 additions and 22 deletions.
67 changes: 53 additions & 14 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,26 @@
package main

import (
"context"
"flag"
"fmt"
"os"
"strings"
"time"

"github.com/spf13/cobra"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/rest"
"k8s.io/component-base/cli"
"k8s.io/klog/v2"
"k8s.io/kubectl/pkg/cmd/util"
"sigs.k8s.io/cli-utils/cmd/apply"
"sigs.k8s.io/cli-utils/cmd/destroy"
"sigs.k8s.io/cli-utils/cmd/diff"
"sigs.k8s.io/cli-utils/cmd/initcmd"
"sigs.k8s.io/cli-utils/cmd/preview"
"sigs.k8s.io/cli-utils/cmd/status"
"sigs.k8s.io/cli-utils/pkg/flowcontrol"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/manifestreader"

Expand Down Expand Up @@ -46,29 +52,32 @@ func main() {
flags.AddGoFlagSet(flag.CommandLine)
f := util.NewFactory(matchVersionKubeConfigFlags)

// Update ConfigFlags before subcommands run that talk to the server.
preRunE := newConfigFilerPreRunE(f, kubeConfigFlags)

ioStreams := genericclioptions.IOStreams{
In: os.Stdin,
Out: os.Stdout,
ErrOut: os.Stderr,
}

names := []string{"init", "apply", "preview", "diff", "destroy", "status"}
initCmd := initcmd.NewCmdInit(f, ioStreams)
updateHelp(names, initCmd)
loader := manifestreader.NewManifestLoader(f)
invFactory := inventory.ClusterClientFactory{StatusPolicy: inventory.StatusPolicyNone}
applyCmd := apply.Command(f, invFactory, loader, ioStreams)
updateHelp(names, applyCmd)
previewCmd := preview.Command(f, invFactory, loader, ioStreams)
updateHelp(names, previewCmd)
diffCmd := diff.NewCommand(f, ioStreams)
updateHelp(names, diffCmd)
destroyCmd := destroy.Command(f, invFactory, loader, ioStreams)
updateHelp(names, destroyCmd)
statusCmd := status.Command(f, invFactory, loader)
updateHelp(names, statusCmd)

cmd.AddCommand(initCmd, applyCmd, diffCmd, destroyCmd, previewCmd, statusCmd)
names := []string{"init", "apply", "destroy", "diff", "preview", "status"}
subCmds := []*cobra.Command{
initcmd.NewCmdInit(f, ioStreams),
apply.Command(f, invFactory, loader, ioStreams),
destroy.Command(f, invFactory, loader, ioStreams),
diff.NewCommand(f, ioStreams),
preview.Command(f, invFactory, loader, ioStreams),
status.Command(f, invFactory, loader),
}
for _, subCmd := range subCmds {
subCmd.PreRunE = preRunE
updateHelp(names, subCmd)
cmd.AddCommand(subCmd)
}

code := cli.Run(cmd)
os.Exit(code)
Expand All @@ -83,3 +92,33 @@ func updateHelp(names []string, c *cobra.Command) {
c.Example = strings.ReplaceAll(c.Example, "kubectl "+name, "kapply "+name)
}
}

// newConfigFilerPreRunE returns a cobra command PreRunE function that
// performs a lookup to determine if server-side throttling is enabled. If so,
// client-side throttling is disabled in the ConfigFlags.
func newConfigFilerPreRunE(f util.Factory, configFlags *genericclioptions.ConfigFlags) func(*cobra.Command, []string) error {
return func(_ *cobra.Command, args []string) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

restConfig, err := f.ToRESTConfig()
if err != nil {
return err
}
enabled, err := flowcontrol.IsEnabled(ctx, restConfig)
if err != nil {
return fmt.Errorf("checking server-side throttling enablement: %w", err)
}
if enabled {
// Disable client-side throttling.
klog.V(3).Infof("Client-side throttling disabled")
// WrapConfigFn will affect future Factory.ToRESTConfig() calls.
configFlags.WrapConfigFn = func(cfg *rest.Config) *rest.Config {
cfg.QPS = -1
cfg.Burst = -1
return cfg
}
}
return nil
}
}
107 changes: 107 additions & 0 deletions pkg/flowcontrol/flowcontrol.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0

package flowcontrol

import (
"context"
"fmt"
"net/http"
"net/url"

flowcontrolapi "k8s.io/api/flowcontrol/v1beta2"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport"
)

const pingPath = "/livez/ping"

// IsEnabled returns true if the server has the PriorityAndFairness flow control
// filter enabled. This check performs a GET request to the /version endpoint
// and looks for the presence of the `X-Kubernetes-PF-FlowSchema-UID` header.
func IsEnabled(ctx context.Context, config *rest.Config) (bool, error) {
// Build a RoundTripper from the provided REST client config.
// RoundTriper handles TLS, auth, auth proxy, user agent, impersonation, and
// debug logs. It also provides acess to the response headers, unlike the
// REST client. And we can't just use an HTTP client, because the version
// endpoint may or may not require auth, depending on RBAC config.
tcfg, err := config.TransportConfig()
if err != nil {
return false, fmt.Errorf("building transport config: %w", err)
}
roudtripper, err := transport.New(tcfg)
if err != nil {
return false, fmt.Errorf("building round tripper: %w", err)
}

// Build the base apiserver URL from the provided REST client config.
url, err := serverURL(config)
if err != nil {
return false, fmt.Errorf("building server URL: %w", err)
}

// Use the ping endpoint, because it's small and fast.
// It's alpha in v1.23+, but a 404 will still have the flowcontrol headers.
// Replacing the path is safe, because DefaultServerURL will have errored
// if it wasn't empty from the config.
url.Path = pingPath

// Build HEAD request with an empty body.
req, err := http.NewRequestWithContext(ctx, "HEAD", url.String(), nil)
if err != nil {
return false, fmt.Errorf("building request: %w", err)
}

if config.UserAgent != "" {
req.Header.Set("User-Agent", config.UserAgent)
}

// We don't care what the response body is.
// req.Header.Set("Accept", "text/plain")

// Perform the request.
resp, err := roudtripper.RoundTrip(req)
if err != nil {
return false, fmt.Errorf("making %s request: %w", pingPath, err)
}
// Probably nil for HEAD, but check anyway.
if resp.Body != nil {
// Always close the response body, to free up resources.
err := resp.Body.Close()
if err != nil {
return false, fmt.Errorf("closing response body: %v", err)
}
}

// If the response has one of the flowcontrol headers,
// that means the flowcontrol filter is enabled.
// There are two headers, but they're always both set by FlowControl.
// So we only need to check one.
// key = flowcontrolapi.ResponseHeaderMatchedPriorityLevelConfigurationUID
key := flowcontrolapi.ResponseHeaderMatchedFlowSchemaUID
if value := resp.Header.Get(key); value != "" {
// We don't care what the value is (always a UID).
// We just care that the header is present.
return true, nil
}
return false, nil
}

// serverUrl returns the base URL for the cluster based on the supplied config.
// Host and Version are required. GroupVersion is ignored.
// Based on `defaultServerUrlFor` from k8s.io/client-go@v0.23.2/rest/url_utils.go
func serverURL(config *rest.Config) (*url.URL, error) {
// TODO: move the default to secure when the apiserver supports TLS by default
// config.Insecure is taken to mean "I want HTTPS but don't bother checking the certs against a CA."
hasCA := len(config.CAFile) != 0 || len(config.CAData) != 0
hasCert := len(config.CertFile) != 0 || len(config.CertData) != 0
defaultTLS := hasCA || hasCert || config.Insecure
host := config.Host
if host == "" {
host = "localhost"
}

hostURL, _, err := rest.DefaultServerURL(host, config.APIPath, schema.GroupVersion{}, defaultTLS)
return hostURL, err
}
91 changes: 91 additions & 0 deletions pkg/flowcontrol/flowcontrol_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0

package flowcontrol

import (
"bytes"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"time"

"context"

"github.com/stretchr/testify/assert"
flowcontrolapi "k8s.io/api/flowcontrol/v1beta2"
"k8s.io/client-go/rest"
"sigs.k8s.io/cli-utils/pkg/testutil"
)

func TestIsEnabled(t *testing.T) {
testCases := []struct {
name string
handler func(*http.Request) *http.Response
expectedEnabled bool
expectedError error
}{
{
name: "header found",
handler: func(req *http.Request) *http.Response {
defer req.Body.Close()
headers := http.Header{}
headers.Add(flowcontrolapi.ResponseHeaderMatchedFlowSchemaUID, "unused-uuid")

return &http.Response{
StatusCode: 200,
Header: headers,
Body: ioutil.NopCloser(bytes.NewReader(nil)),
}
},
expectedEnabled: true,
},
{
name: "header not found",
handler: func(req *http.Request) *http.Response {
defer req.Body.Close()
return &http.Response{
StatusCode: 200,
Header: http.Header{},
Body: ioutil.NopCloser(bytes.NewReader(nil)),
}
},
expectedEnabled: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
assert.Equal(t, "/livez/ping", req.URL.Path)
resp := tc.handler(req)
defer resp.Body.Close()
for k, vs := range resp.Header {
w.Header().Del(k)
for _, v := range vs {
w.Header().Add(k, v)
}
}
w.WriteHeader(resp.StatusCode)
_, err := io.Copy(w, resp.Body)
assert.NoError(t, err)
})

server := httptest.NewServer(handler)
defer server.Close()

cfg := &rest.Config{
Host: server.URL,
}

enabled, err := IsEnabled(ctx, cfg)
testutil.AssertEqual(t, tc.expectedError, err)
assert.Equal(t, tc.expectedEnabled, enabled)
})
}
}
12 changes: 8 additions & 4 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,14 @@ var _ = BeforeSuite(func() {
cfg, err := ctrl.GetConfig()
Expect(err).NotTo(HaveOccurred())

// Disable client-side throttling.
// Recent versions of kind support server-side throttling.
cfg.QPS = -1
cfg.Burst = -1
cfg.UserAgent = e2eutil.UserAgent("test/e2e")

if e2eutil.IsFlowControlEnabled(cfg) {
// Disable client-side throttling.
klog.V(3).Infof("Client-side throttling disabled")
cfg.QPS = -1
cfg.Burst = -1
}

inventoryConfigs[ConfigMapTypeInvConfig] = invconfig.NewConfigMapTypeInvConfig(cfg)
inventoryConfigs[CustomTypeInvConfig] = invconfig.NewCustomTypeInvConfig(cfg)
Expand Down
12 changes: 12 additions & 0 deletions test/e2e/e2eutil/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/rest"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/flowcontrol"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object/dependson"
"sigs.k8s.io/cli-utils/pkg/object/mutation"
Expand Down Expand Up @@ -410,3 +412,13 @@ func UnstructuredNamespace(name string) *unstructured.Unstructured {
u.SetName(name)
return u
}

func IsFlowControlEnabled(config *rest.Config) bool {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

enabled, err := flowcontrol.IsEnabled(ctx, config)
gomega.Expect(err).ToNot(gomega.HaveOccurred())

return enabled
}
Loading

0 comments on commit 534438c

Please sign in to comment.