Skip to content

Commit

Permalink
Deeply replace the 'MetricKey' with 'NamespacedName'. (#4894)
Browse files Browse the repository at this point in the history
The metric keys are currently a concatenated version of namespace/name. However, working with strings all throughout the codebase is kinda brittle as in the end, anything could be in them.

A much more typesafe method is to use 'NamespacedName' from k8s apimachinery. As it is only a struct with two strings, it is comparable via == and thus can even be used as keys for maps.
  • Loading branch information
markusthoemmes authored and knative-prow-robot committed Jul 23, 2019
1 parent c441452 commit 6c4d5ac
Show file tree
Hide file tree
Showing 19 changed files with 113 additions and 99 deletions.
3 changes: 2 additions & 1 deletion cmd/autoscaler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
kubeinformers "k8s.io/client-go/informers"
fakeK8s "k8s.io/client-go/kubernetes/fake"
"knative.dev/serving/pkg/apis/serving"
Expand Down Expand Up @@ -149,6 +150,6 @@ func getTestUniScalerFactory() func(decider *autoscaler.Decider) (autoscaler.Uni

type testMetricClient struct{}

func (t *testMetricClient) StableAndPanicConcurrency(key string) (float64, float64, error) {
func (t *testMetricClient) StableAndPanicConcurrency(key types.NamespacedName) (float64, float64, error) {
return 1.0, 1.0, nil
}
14 changes: 6 additions & 8 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pkg/errors"
"go.opencensus.io/stats"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"knative.dev/pkg/logging/logkey"
"knative.dev/pkg/metrics"
Expand Down Expand Up @@ -86,11 +87,10 @@ const (
)

var (
servingRevisionKey string
userTargetAddress string
reqChan = make(chan queue.ReqEvent, requestCountingQueueLength)
logger *zap.SugaredLogger
breaker *queue.Breaker
userTargetAddress string
reqChan = make(chan queue.ReqEvent, requestCountingQueueLength)
logger *zap.SugaredLogger
breaker *queue.Breaker

httpProxy *httputil.ReverseProxy

Expand Down Expand Up @@ -148,8 +148,6 @@ func initConfig(env config) {
logger.Fatal("INTERNAL_VOLUME_PATH must be specified when ENABLE_VAR_LOG_COLLECTION is true")
}

// TODO(mattmoor): Move this key to be in terms of the KPA.
servingRevisionKey = autoscaler.NewMetricKey(env.ServingNamespace, env.ServingRevision)
_psr, err := queue.NewPrometheusStatsReporter(env.ServingNamespace, env.ServingConfiguration, env.ServingRevision, env.ServingPod)
if err != nil {
logger.Fatalw("Failed to create stats reporter", zap.Error(err))
Expand Down Expand Up @@ -299,7 +297,7 @@ func main() {

initConfig(env)
logger = logger.With(
zap.String(logkey.Key, servingRevisionKey),
zap.String(logkey.Key, types.NamespacedName{Namespace: env.ServingNamespace, Name: env.ServingRevision}.String()),
zap.String(logkey.Pod, env.ServingPod))

target, err := url.Parse("http://" + userTargetAddress)
Expand Down
9 changes: 5 additions & 4 deletions pkg/activator/handler/concurrency_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package handler
import (
"time"

"k8s.io/apimachinery/pkg/types"
"knative.dev/pkg/system"
"knative.dev/serving/pkg/autoscaler"
)
Expand Down Expand Up @@ -55,7 +56,7 @@ func NewConcurrencyReporterWithClock(podName string, reqChan chan ReqEvent, repo
}
}

func (cr *ConcurrencyReporter) report(key string, concurrency, requestCount int32) {
func (cr *ConcurrencyReporter) report(key types.NamespacedName, concurrency, requestCount int32) {
stat := autoscaler.Stat{
PodName: cr.podName,
AverageConcurrentRequests: float64(concurrency),
Expand All @@ -73,10 +74,10 @@ func (cr *ConcurrencyReporter) report(key string, concurrency, requestCount int3
// Run runs until stopCh is closed and processes events on all incoming channels
func (cr *ConcurrencyReporter) Run(stopCh <-chan struct{}) {
// Contains the number of in-flight requests per-key
outstandingRequestsPerKey := make(map[string]int32)
outstandingRequestsPerKey := make(map[types.NamespacedName]int32)
// Contains the number of incoming requests in the current
// reporting period, per key.
incomingRequestsPerKey := make(map[string]int32)
incomingRequestsPerKey := make(map[types.NamespacedName]int32)

for {
select {
Expand All @@ -102,7 +103,7 @@ func (cr *ConcurrencyReporter) Run(stopCh <-chan struct{}) {
}
}

incomingRequestsPerKey = make(map[string]int32)
incomingRequestsPerKey = make(map[types.NamespacedName]int32)
case <-stopCh:
return
}
Expand Down
59 changes: 33 additions & 26 deletions pkg/activator/handler/concurrency_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/google/go-cmp/cmp/cmpopts"
"k8s.io/apimachinery/pkg/types"

"github.com/google/go-cmp/cmp"
"knative.dev/pkg/system"
Expand All @@ -33,6 +34,12 @@ const (
requestOpEnd = "RequestOpEnd"
)

var (
pod1 = types.NamespacedName{Namespace: "test", Name: "pod1"}
pod2 = types.NamespacedName{Namespace: "test", Name: "pod2"}
pod3 = types.NamespacedName{Namespace: "test", Name: "pod3"}
)

type fakeClock struct {
Time time.Time
}
Expand All @@ -43,7 +50,7 @@ func (c fakeClock) Now() time.Time {

type reqOp struct {
op string
key string
key types.NamespacedName
time time.Time
}

Expand All @@ -56,19 +63,19 @@ func TestStats(t *testing.T) {
name: "Scale-from-zero sends stat",
ops: []reqOp{{
op: requestOpStart,
key: "pod1",
key: pod1,
}, {
op: requestOpStart,
key: "pod2",
key: pod2,
}},
expectedStats: []*autoscaler.StatMessage{{
Key: "pod1",
Key: pod1,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
PodName: "activator",
}}, {
Key: "pod2",
Key: pod2,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
Expand All @@ -78,21 +85,21 @@ func TestStats(t *testing.T) {
name: "Scale to two",
ops: []reqOp{{
op: requestOpStart,
key: "pod1",
key: pod1,
}, {
op: requestOpStart,
key: "pod1",
key: pod1,
}, {
op: requestOpTick,
}},
expectedStats: []*autoscaler.StatMessage{{
Key: "pod1",
Key: pod1,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
PodName: "activator",
}}, {
Key: "pod1",
Key: pod1,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 2,
RequestCount: 2,
Expand All @@ -102,24 +109,24 @@ func TestStats(t *testing.T) {
name: "Scale-from-zero after tick sends stat",
ops: []reqOp{{
op: requestOpStart,
key: "pod1",
key: pod1,
}, {
op: requestOpEnd,
key: "pod1",
key: pod1,
}, {
op: requestOpTick,
}, {
op: requestOpStart,
key: "pod1",
key: pod1,
}},
expectedStats: []*autoscaler.StatMessage{{
Key: "pod1",
Key: pod1,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
PodName: "activator",
}}, {
Key: "pod1",
Key: pod1,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
Expand All @@ -129,59 +136,59 @@ func TestStats(t *testing.T) {
name: "Multiple pods tick",
ops: []reqOp{{
op: requestOpStart,
key: "pod1",
key: pod1,
}, {
op: requestOpStart,
key: "pod2",
key: pod2,
}, {
op: requestOpTick,
}, {
op: requestOpEnd,
key: "pod1",
key: pod1,
}, {
op: requestOpStart,
key: "pod3",
key: pod3,
}, {
op: requestOpTick,
}},
expectedStats: []*autoscaler.StatMessage{{
Key: "pod1",
Key: pod1,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
PodName: "activator",
}}, {
Key: "pod2",
Key: pod2,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
PodName: "activator",
}}, {
Key: "pod1",
Key: pod1,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
PodName: "activator",
}}, {
Key: "pod2",
Key: pod2,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
PodName: "activator",
}}, {
Key: "pod3",
Key: pod3,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
PodName: "activator",
}}, {
Key: "pod2",
Key: pod2,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 0,
PodName: "activator",
}}, {
Key: "pod3",
Key: pod3,
Stat: autoscaler.Stat{
AverageConcurrentRequests: 1,
RequestCount: 1,
Expand Down Expand Up @@ -219,7 +226,7 @@ func TestStats(t *testing.T) {

// Check the stats we got match what we wanted
sorter := cmpopts.SortSlices(func(a, b *autoscaler.StatMessage) bool {
return a.Key < b.Key
return a.Key.Name < b.Key.Name
})
if diff := cmp.Diff(tc.expectedStats, stats, sorter); diff != "" {
t.Errorf("Unexpected stats (-want +got): %v", diff)
Expand Down
11 changes: 7 additions & 4 deletions pkg/activator/handler/requestevent_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ import (
"net/http"

"knative.dev/serving/pkg/activator"
"knative.dev/serving/pkg/autoscaler"

"k8s.io/apimachinery/pkg/types"
)

// ReqEvent represents an incoming/finished request with a given key
type ReqEvent struct {
Key string
Key types.NamespacedName
EventType ReqEventType
}

Expand Down Expand Up @@ -57,9 +58,11 @@ func (h *RequestEventHandler) ServeHTTP(w http.ResponseWriter, r *http.Request)
namespace := r.Header.Get(activator.RevisionHeaderNamespace)
name := r.Header.Get(activator.RevisionHeaderName)

revisionKey := autoscaler.NewMetricKey(namespace, name)
revisionKey := types.NamespacedName{Namespace: namespace, Name: name}

h.ReqChan <- ReqEvent{Key: revisionKey, EventType: ReqIn}
defer func() { h.ReqChan <- ReqEvent{Key: revisionKey, EventType: ReqOut} }()
defer func() {
h.ReqChan <- ReqEvent{Key: revisionKey, EventType: ReqOut}
}()
h.nextHandler.ServeHTTP(w, r)
}
4 changes: 2 additions & 2 deletions pkg/activator/handler/requestevent_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ package handler

import (
"bytes"
"fmt"
"net/http"
"net/http/httptest"
"testing"

"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/types"
"knative.dev/serving/pkg/activator"
)

Expand All @@ -43,7 +43,7 @@ func TestRequestEventHandler(t *testing.T) {

in := <-handler.ReqChan
wantIn := ReqEvent{
Key: fmt.Sprintf("%s/%s", namespace, revision),
Key: types.NamespacedName{Namespace: namespace, Name: revision},
EventType: ReqIn,
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"knative.dev/serving/pkg/resources"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
)

// Autoscaler stores current state of an instance of an autoscaler.
Expand Down Expand Up @@ -124,7 +125,7 @@ func (a *Autoscaler) Scale(ctx context.Context, now time.Time) (desiredPodCount
// Use 1 if there are zero current pods.
readyPodsCount := math.Max(1, float64(originalReadyPodsCount))

metricKey := NewMetricKey(a.namespace, a.revision)
metricKey := types.NamespacedName{Namespace: a.namespace, Name: a.revision}
observedStableConcurrency, observedPanicConcurrency, err := a.metricClient.StableAndPanicConcurrency(metricKey)
if err != nil {
if err == ErrNoData {
Expand Down
3 changes: 2 additions & 1 deletion pkg/autoscaler/autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
kubeinformers "k8s.io/client-go/informers"
fakeK8s "k8s.io/client-go/kubernetes/fake"
. "knative.dev/pkg/logging/testing"
Expand Down Expand Up @@ -324,7 +325,7 @@ type testMetricClient struct {
err error
}

func (t *testMetricClient) StableAndPanicConcurrency(key string) (float64, float64, error) {
func (t *testMetricClient) StableAndPanicConcurrency(key types.NamespacedName) (float64, float64, error) {
return t.stableConcurrency, t.panicConcurrency, t.err
}

Expand Down
Loading

0 comments on commit 6c4d5ac

Please sign in to comment.