Skip to content

Commit

Permalink
Merge pull request #599 from tkashem/pick-96984-4.8
Browse files Browse the repository at this point in the history
UPSTREAM: 96984: APF e2e: wait for steady state before proceeding
  • Loading branch information
openshift-merge-robot authored Mar 2, 2021
2 parents ac0db7d + 3d798bb commit 2ce2be0
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 33 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions test/e2e/apimachinery/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/apihelpers:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/cli-runtime/pkg/printers:go_default_library",
"//staging/src/k8s.io/client-go/discovery:go_default_library",
Expand Down
122 changes: 90 additions & 32 deletions test/e2e/apimachinery/flowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package apimachinery
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -32,39 +33,60 @@ import (

flowcontrol "k8s.io/api/flowcontrol/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/apihelpers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
clientsideflowcontrol "k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/test/e2e/framework"
)

const (
requestConcurrencyLimitMetricName = "apiserver_flowcontrol_request_concurrency_limit"
requestConcurrencyLimitMetricLabelName = "priority_level"
requestConcurrencyLimitMetricName = "apiserver_flowcontrol_request_concurrency_limit"
priorityLevelLabelName = "priority_level"
)

var (
errPriorityLevelNotFound = errors.New("cannot find a metric sample with a matching priority level name label")
)

var _ = SIGDescribe("API priority and fairness", func() {
f := framework.NewDefaultFramework("apf")

ginkgo.It("should ensure that requests can be classified by testing flow-schemas/priority-levels", func() {
ginkgo.It("should ensure that requests can be classified by adding FlowSchema and PriorityLevelConfiguration", func() {
testingFlowSchemaName := "e2e-testing-flowschema"
testingPriorityLevelName := "e2e-testing-prioritylevel"
matchingUsername := "noxu"
nonMatchingUsername := "foo"

ginkgo.By("creating a testing prioritylevel")
ginkgo.By("creating a testing PriorityLevelConfiguration object")
createdPriorityLevel, cleanup := createPriorityLevel(f, testingPriorityLevelName, 1)
defer cleanup()

ginkgo.By("creating a testing flowschema")
ginkgo.By("creating a testing FlowSchema object")
createdFlowSchema, cleanup := createFlowSchema(f, testingFlowSchemaName, 1000, testingPriorityLevelName, []string{matchingUsername})
defer cleanup()

ginkgo.By("checking response headers contain flow-schema/priority-level uid")
if !testResponseHeaderMatches(f, matchingUsername, string(createdPriorityLevel.UID), string(createdFlowSchema.UID)) {
framework.Failf("matching user doesnt received UID for the testing priority-level and flow-schema")
ginkgo.By("waiting for testing FlowSchema and PriorityLevelConfiguration to reach steady state")
waitForSteadyState(f, testingFlowSchemaName, testingPriorityLevelName)

var response *http.Response
ginkgo.By("response headers should contain the UID of the appropriate FlowSchema and PriorityLevelConfiguration for a matching user")
response = makeRequest(f, matchingUsername)
if plUIDWant, plUIDGot := string(createdPriorityLevel.UID), getPriorityLevelUID(response); plUIDWant != plUIDGot {
framework.Failf("expected PriorityLevelConfiguration UID in the response header: %s, but got: %s, response header: %#v", plUIDWant, plUIDGot, response.Header)
}
if fsUIDWant, fsUIDGot := string(createdFlowSchema.UID), getFlowSchemaUID(response); fsUIDWant != fsUIDGot {
framework.Failf("expected FlowSchema UID in the response header: %s, but got: %s, response header: %#v", fsUIDWant, fsUIDGot, response.Header)
}

ginkgo.By("response headers should contain non-empty UID of FlowSchema and PriorityLevelConfiguration for a non-matching user")
response = makeRequest(f, nonMatchingUsername)
if plUIDGot := getPriorityLevelUID(response); plUIDGot == "" {
framework.Failf("expected a non-empty PriorityLevelConfiguration UID in the response header, but got: %s, response header: %#v", plUIDGot, response.Header)
}
if testResponseHeaderMatches(f, nonMatchingUsername, string(createdPriorityLevel.UID), string(createdPriorityLevel.UID)) {
framework.Failf("non-matching user unexpectedly received UID for the testing priority-level and flow-schema")
if fsUIDGot := getFlowSchemaUID(response); fsUIDGot == "" {
framework.Failf("expected a non-empty FlowSchema UID in the response header but got: %s, response header: %#v", fsUIDGot, response.Header)
}
})

Expand Down Expand Up @@ -115,11 +137,15 @@ var _ = SIGDescribe("API priority and fairness", func() {
framework.Logf("creating FlowSchema %q", clients[i].flowSchemaName)
_, cleanup = createFlowSchema(f, clients[i].flowSchemaName, clients[i].matchingPrecedence, clients[i].priorityLevelName, []string{clients[i].username})
defer cleanup()

ginkgo.By("waiting for testing FlowSchema and PriorityLevelConfiguration to reach steady state")
waitForSteadyState(f, clients[i].flowSchemaName, clients[i].priorityLevelName)
}

ginkgo.By("getting request concurrency from metrics")
for i := range clients {
realConcurrency := getPriorityLevelConcurrency(f, clients[i].priorityLevelName)
realConcurrency, err := getPriorityLevelConcurrency(f.ClientSet, clients[i].priorityLevelName)
framework.ExpectNoError(err)
clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier)
if clients[i].concurrency < 1 {
clients[i].concurrency = 1
Expand Down Expand Up @@ -174,6 +200,9 @@ var _ = SIGDescribe("API priority and fairness", func() {
_, cleanup = createFlowSchema(f, flowSchemaName, 1000, priorityLevelName, []string{highQPSClientName, lowQPSClientName})
defer cleanup()

ginkgo.By("waiting for testing flow schema and priority level to reach steady state")
waitForSteadyState(f, flowSchemaName, priorityLevelName)

type client struct {
username string
qps float64
Expand All @@ -188,7 +217,8 @@ var _ = SIGDescribe("API priority and fairness", func() {
}

framework.Logf("getting real concurrency")
realConcurrency := getPriorityLevelConcurrency(f, priorityLevelName)
realConcurrency, err := getPriorityLevelConcurrency(f.ClientSet, priorityLevelName)
framework.ExpectNoError(err)
for i := range clients {
clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier)
if clients[i].concurrency < 1 {
Expand Down Expand Up @@ -248,33 +278,35 @@ func createPriorityLevel(f *framework.Framework, priorityLevelName string, assur
}
}

//lint:ignore U1000 function is actually referenced
func getPriorityLevelConcurrency(f *framework.Framework, priorityLevelName string) int32 {
resp, err := f.ClientSet.CoreV1().RESTClient().Get().RequestURI("/metrics").DoRaw(context.TODO())
framework.ExpectNoError(err)
func getPriorityLevelConcurrency(c clientset.Interface, priorityLevelName string) (int32, error) {
resp, err := c.CoreV1().RESTClient().Get().RequestURI("/metrics").DoRaw(context.TODO())
if err != nil {
return 0, err
}
sampleDecoder := expfmt.SampleDecoder{
Dec: expfmt.NewDecoder(bytes.NewBuffer(resp), expfmt.FmtText),
Opts: &expfmt.DecodeOptions{},
}
for {
var v model.Vector
err := sampleDecoder.Decode(&v)
if err == io.EOF {
break
if err != nil {
if err == io.EOF {
break
}
return 0, err
}
framework.ExpectNoError(err)
for _, metric := range v {
if string(metric.Metric[model.MetricNameLabel]) != requestConcurrencyLimitMetricName {
continue
}
if string(metric.Metric[requestConcurrencyLimitMetricLabelName]) != priorityLevelName {
if string(metric.Metric[priorityLevelLabelName]) != priorityLevelName {
continue
}
return int32(metric.Value)
return int32(metric.Value), nil
}
}
framework.ExpectNoError(fmt.Errorf("cannot find metric %q with matching priority level name label %q", requestConcurrencyLimitMetricName, priorityLevelName))
return 0
return 0, errPriorityLevelNotFound
}

// createFlowSchema creates a flow schema referring to a particular priority
Expand Down Expand Up @@ -324,6 +356,35 @@ func createFlowSchema(f *framework.Framework, flowSchemaName string, matchingPre
}
}

// waitForSteadyState repeatedly polls the API server to check if the newly
// created flow schema and priority level have been seen by the APF controller
// by checking: (1) the dangling priority level reference condition in the flow
// schema status, and (2) metrics. The function times out after 30 seconds.
func waitForSteadyState(f *framework.Framework, flowSchemaName string, priorityLevelName string) {
framework.ExpectNoError(wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
fs, err := f.ClientSet.FlowcontrolV1beta1().FlowSchemas().Get(context.TODO(), flowSchemaName, metav1.GetOptions{})
if err != nil {
return false, err
}
condition := apihelpers.GetFlowSchemaConditionByType(fs, flowcontrol.FlowSchemaConditionDangling)
if condition == nil || condition.Status != flowcontrol.ConditionFalse {
// The absence of the dangling status object implies that the APF
// controller isn't done with syncing the flow schema object. And, of
// course, the condition being anything but false means that steady state
// hasn't been achieved.
return false, nil
}
_, err = getPriorityLevelConcurrency(f.ClientSet, priorityLevelName)
if err != nil {
if err == errPriorityLevelNotFound {
return false, nil
}
return false, err
}
return true, nil
}))
}

// makeRequests creates a request to the API server and returns the response.
func makeRequest(f *framework.Framework, username string) *http.Response {
config := f.ClientConfig()
Expand All @@ -341,15 +402,12 @@ func makeRequest(f *framework.Framework, username string) *http.Response {
return response
}

func testResponseHeaderMatches(f *framework.Framework, impersonatingUser, plUID, fsUID string) bool {
response := makeRequest(f, impersonatingUser)
if response.Header.Get(flowcontrol.ResponseHeaderMatchedFlowSchemaUID) != fsUID {
return false
}
if response.Header.Get(flowcontrol.ResponseHeaderMatchedPriorityLevelConfigurationUID) != plUID {
return false
}
return true
func getPriorityLevelUID(response *http.Response) string {
return response.Header.Get(flowcontrol.ResponseHeaderMatchedPriorityLevelConfigurationUID)
}

func getFlowSchemaUID(response *http.Response) string {
return response.Header.Get(flowcontrol.ResponseHeaderMatchedFlowSchemaUID)
}

// uniformQPSLoadSingle loads the API server with requests at a uniform <qps>
Expand Down

0 comments on commit 2ce2be0

Please sign in to comment.