Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deeply replace the 'MetricKey' with 'NamespacedName'. #4894

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/autoscaler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
kubeinformers "k8s.io/client-go/informers"
fakeK8s "k8s.io/client-go/kubernetes/fake"
"knative.dev/serving/pkg/apis/serving"
Expand Down Expand Up @@ -149,6 +150,6 @@ func getTestUniScalerFactory() func(decider *autoscaler.Decider) (autoscaler.Uni

type testMetricClient struct{}

func (t *testMetricClient) StableAndPanicConcurrency(key string) (float64, float64, error) {
func (t *testMetricClient) StableAndPanicConcurrency(key types.NamespacedName) (float64, float64, error) {
return 1.0, 1.0, nil
}
14 changes: 6 additions & 8 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pkg/errors"
"go.opencensus.io/stats"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"knative.dev/pkg/logging/logkey"
"knative.dev/pkg/metrics"
Expand Down Expand Up @@ -86,11 +87,10 @@ const (
)

var (
servingRevisionKey string
userTargetAddress string
reqChan = make(chan queue.ReqEvent, requestCountingQueueLength)
logger *zap.SugaredLogger
breaker *queue.Breaker
userTargetAddress string
reqChan = make(chan queue.ReqEvent, requestCountingQueueLength)
logger *zap.SugaredLogger
breaker *queue.Breaker

httpProxy *httputil.ReverseProxy

Expand Down Expand Up @@ -148,8 +148,6 @@ func initConfig(env config) {
logger.Fatal("INTERNAL_VOLUME_PATH must be specified when ENABLE_VAR_LOG_COLLECTION is true")
}

// TODO(mattmoor): Move this key to be in terms of the KPA.
servingRevisionKey = autoscaler.NewMetricKey(env.ServingNamespace, env.ServingRevision)
_psr, err := queue.NewPrometheusStatsReporter(env.ServingNamespace, env.ServingConfiguration, env.ServingRevision, env.ServingPod)
if err != nil {
logger.Fatalw("Failed to create stats reporter", zap.Error(err))
Expand Down Expand Up @@ -299,7 +297,7 @@ func main() {

initConfig(env)
logger = logger.With(
zap.String(logkey.Key, servingRevisionKey),
zap.String(logkey.Key, types.NamespacedName{Namespace: env.ServingNamespace, Name: env.ServingRevision}.String()),
zap.String(logkey.Pod, env.ServingPod))

target, err := url.Parse("http://" + userTargetAddress)
Expand Down
9 changes: 5 additions & 4 deletions pkg/activator/handler/concurrency_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package handler
import (
"time"

"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/system"
"knative.dev/serving/pkg/autoscaler"
)
Expand Down Expand Up @@ -55,7 +56,7 @@ func NewConcurrencyReporterWithClock(podName string, reqChan chan ReqEvent, repo
}
}

func (cr *ConcurrencyReporter) report(key string, concurrency, requestCount int32) {
func (cr *ConcurrencyReporter) report(key types.NamespacedName, concurrency, requestCount int32) {
stat := autoscaler.Stat{
PodName: cr.podName,
AverageConcurrentRequests: float64(concurrency),
Expand All @@ -73,10 +74,10 @@ func (cr *ConcurrencyReporter) report(key string, concurrency, requestCount int3
// Run runs until stopCh is closed and processes events on all incoming channels
func (cr *ConcurrencyReporter) Run(stopCh <-chan struct{}) {
// Contains the number of in-flight requests per-key
outstandingRequestsPerKey := make(map[string]int32)
outstandingRequestsPerKey := make(map[types.NamespacedName]int32)
// Contains the number of incoming requests in the current
// reporting period, per key.
incomingRequestsPerKey := make(map[string]int32)
incomingRequestsPerKey := make(map[types.NamespacedName]int32)

for {
select {
Expand All @@ -102,7 +103,7 @@ func (cr *ConcurrencyReporter) Run(stopCh <-chan struct{}) {
}
}

incomingRequestsPerKey = make(map[string]int32)
incomingRequestsPerKey = make(map[types.NamespacedName]int32)
case <-stopCh:
return
}
Expand Down
59 changes: 33 additions & 26 deletions pkg/activator/handler/concurrency_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/google/go-cmp/cmp/cmpopts"
"k8s.io/apimachinery/pkg/types"

"github.com/google/go-cmp/cmp"
"knative.dev/pkg/system"
Expand All @@ -33,6 +34,12 @@ const (
requestOpEnd = "RequestOpEnd"
)

var (
pod1 = types.NamespacedName{Namespace: "test", Name: "pod1"}
pod2 = types.NamespacedName{Namespace: "test", Name: "pod2"}
pod3 = types.NamespacedName{Namespace: "test", Name: "pod3"}
)

type fakeClock struct {
Time time.Time
}
Expand All @@ -43,7 +50,7 @@ func (c fakeClock) Now() time.Time {

type reqOp struct {
op string
key string
key types.NamespacedName
time time.Time
}

Expand All @@ -56,19 +63,19 @@ func TestStats(t *testing.T) {
name: "Scale-from-zero sends stat",
ops: []reqOp{{
op: requestOpStart,
key: "pod1",
key: pod1,
}, {
op: requestOpStart,
key: "pod2",
key: pod2,
}},
expectedStats: []*autoscaler.StatMessage{{
Key: "pod1",
Key: pod1,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
PodName: "activator",
}}, {
Key: "pod2",
Key: pod2,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
Expand All @@ -78,21 +85,21 @@ func TestStats(t *testing.T) {
name: "Scale to two",
ops: []reqOp{{
op: requestOpStart,
key: "pod1",
key: pod1,
}, {
op: requestOpStart,
key: "pod1",
key: pod1,
}, {
op: requestOpTick,
}},
expectedStats: []*autoscaler.StatMessage{{
Key: "pod1",
Key: pod1,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
PodName: "activator",
}}, {
Key: "pod1",
Key: pod1,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 2,
RequestCount: 2,
Expand All @@ -102,24 +109,24 @@ func TestStats(t *testing.T) {
name: "Scale-from-zero after tick sends stat",
ops: []reqOp{{
op: requestOpStart,
key: "pod1",
key: pod1,
}, {
op: requestOpEnd,
key: "pod1",
key: pod1,
}, {
op: requestOpTick,
}, {
op: requestOpStart,
key: "pod1",
key: pod1,
}},
expectedStats: []*autoscaler.StatMessage{{
Key: "pod1",
Key: pod1,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
PodName: "activator",
}}, {
Key: "pod1",
Key: pod1,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
Expand All @@ -129,59 +136,59 @@ func TestStats(t *testing.T) {
name: "Multiple pods tick",
ops: []reqOp{{
op: requestOpStart,
key: "pod1",
key: pod1,
}, {
op: requestOpStart,
key: "pod2",
key: pod2,
}, {
op: requestOpTick,
}, {
op: requestOpEnd,
key: "pod1",
key: pod1,
}, {
op: requestOpStart,
key: "pod3",
key: pod3,
}, {
op: requestOpTick,
}},
expectedStats: []*autoscaler.StatMessage{{
Key: "pod1",
Key: pod1,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
PodName: "activator",
}}, {
Key: "pod2",
Key: pod2,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
PodName: "activator",
}}, {
Key: "pod1",
Key: pod1,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
PodName: "activator",
}}, {
Key: "pod2",
Key: pod2,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
PodName: "activator",
}}, {
Key: "pod3",
Key: pod3,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
PodName: "activator",
}}, {
Key: "pod2",
Key: pod2,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 0,
PodName: "activator",
}}, {
Key: "pod3",
Key: pod3,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
Expand Down Expand Up @@ -219,7 +226,7 @@ func TestStats(t *testing.T) {

// Check the stats we got match what we wanted
sorter := cmpopts.SortSlices(func(a, b *autoscaler.StatMessage) bool {
return a.Key < b.Key
return a.Key.Name < b.Key.Name
})
if diff := cmp.Diff(tc.expectedStats, stats, sorter); diff != "" {
t.Errorf("Unexpected stats (-want +got): %v", diff)
Expand Down
11 changes: 7 additions & 4 deletions pkg/activator/handler/requestevent_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ import (
"net/http"

"knative.dev/serving/pkg/activator"
"knative.dev/serving/pkg/autoscaler"

"k8s.io/apimachinery/pkg/types"
)

// ReqEvent represents an incoming/finished request with a given key
type ReqEvent struct {
Key string
Key types.NamespacedName
EventType ReqEventType
}

Expand Down Expand Up @@ -57,9 +58,11 @@ func (h *RequestEventHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
namespace := r.Header.Get(activator.RevisionHeaderNamespace)
name := r.Header.Get(activator.RevisionHeaderName)

revisionKey := autoscaler.NewMetricKey(namespace, name)
revisionKey := types.NamespacedName{Namespace: namespace, Name: name}

h.ReqChan <- ReqEvent{Key: revisionKey, EventType: ReqIn}
defer func() { h.ReqChan <- ReqEvent{Key: revisionKey, EventType: ReqOut} }()
defer func() {
h.ReqChan <- ReqEvent{Key: revisionKey, EventType: ReqOut}
}()
h.nextHandler.ServeHTTP(w, r)
}
4 changes: 2 additions & 2 deletions pkg/activator/handler/requestevent_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ package handler

import (
"bytes"
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/types"
"knative.dev/serving/pkg/activator"
)

Expand All @@ -43,7 +43,7 @@ func TestRequestEventHandler(t *testing.T) {

in := <-handler.ReqChan
wantIn := ReqEvent{
Key: fmt.Sprintf("%s/%s", namespace, revision),
Key: types.NamespacedName{Namespace: namespace, Name: revision},
EventType: ReqIn,
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"knative.dev/serving/pkg/resources"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
)

// Autoscaler stores current state of an instance of an autoscaler.
Expand Down Expand Up @@ -124,7 +125,7 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (desiredPodCount
// Use 1 if there are zero current pods.
readyPodsCount := math.Max(1, float64(originalReadyPodsCount))

metricKey := NewMetricKey(a.namespace, a.revision)
metricKey := types.NamespacedName{Namespace: a.namespace, Name: a.revision}
observedStableConcurrency, observedPanicConcurrency, err := a.metricClient.StableAndPanicConcurrency(metricKey)
if err != nil {
if err == ErrNoData {
Expand Down
3 changes: 2 additions & 1 deletion pkg/autoscaler/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
kubeinformers "k8s.io/client-go/informers"
fakeK8s "k8s.io/client-go/kubernetes/fake"
. "knative.dev/pkg/logging/testing"
Expand Down Expand Up @@ -324,7 +325,7 @@ type testMetricClient struct {
err error
}

func (t *testMetricClient) StableAndPanicConcurrency(key string) (float64, float64, error) {
func (t *testMetricClient) StableAndPanicConcurrency(key types.NamespacedName) (float64, float64, error) {
return t.stableConcurrency, t.panicConcurrency, t.err
}

Expand Down
Loading