Skip to content

Commit

Permalink
📈 Add client metric interceptor
Browse files Browse the repository at this point in the history
Signed-off-by: vankichi <kyukawa315@gmail.com>
  • Loading branch information
vankichi committed Apr 11, 2024
1 parent 327e053 commit fc32f64
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 21 deletions.
16 changes: 11 additions & 5 deletions cmd/tools/benchmark/job/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ server_config:
header_table_size: 0
interceptors:
- "RecoverInterceptor"
- "TraceInterceptor"
- "MetricInterceptor"
enable_reflection: true
socket_option:
reuse_port: true
Expand Down Expand Up @@ -136,8 +138,8 @@ server_config:
ip_recover_destination_addr: false
startup_strategy:
- liveness
- readiness
- grpc
- readiness
full_shutdown_duration: 30s
tls:
ca: /path/to/ca
Expand All @@ -147,7 +149,7 @@ server_config:
observability:
enabled: false
otlp:
collector_endpoint: ""
collector_endpoint: "opentelemetry-collector-collector.default.svc.cluster.local:4317"
attribute:
namespace: _MY_POD_NAMESPACE_
pod_name: _MY_POD_NAME_
Expand All @@ -157,6 +159,8 @@ observability:
trace_export_timeout: "1m"
trace_max_export_batch_size: 1024
trace_max_queue_size: 256
metrics_export_interval: "1s"
metrics_export_timeout: "1m"
metrics:
enable_cgo: true
enable_goroutine: true
Expand All @@ -172,7 +176,7 @@ observability:
- go_arch
- algorithm_info
trace:
enabled: false
enabled: true
job:
replica: 1
repetition: 1
Expand All @@ -181,7 +185,7 @@ job:
rps: 200
concurrency_limit: 200
client_config:
health_check_duration: "1s"
health_check_duration: ""
connection_pool:
enable_dns_resolver: true
enable_rebalance: true
Expand Down Expand Up @@ -221,7 +225,9 @@ job:
enable_backoff: false
insecure: true
timeout: ""
interceptors: []
interceptors:
- MetricInterceptor
- TraceInterceptor
net:
dns:
cache_enabled: true
Expand Down
9 changes: 4 additions & 5 deletions internal/k8s/vald/benchmark/job/job_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ const (
RestartPolicyOnFailure RestartPolicy = "OnFailure"
RestartPolicyNever RestartPolicy = "Never"

volumeName = "vald-benchmark-job-config"
configMapName = "vald-benchmark-operator-config"
svcAccount = "vald-benchmark-operator"
volumeName = "vald-benchmark-job-config"
svcAccount = "vald-benchmark-operator"
)

var mode = int32(420)
Expand All @@ -50,6 +49,7 @@ type BenchmarkJobTpl interface {
type benchmarkJobTpl struct {
containerName string
containerImageName string
configMapName string
imagePullPolicy ImagePullPolicy
jobTpl k8s.Job
}
Expand Down Expand Up @@ -181,8 +181,7 @@ func (b *benchmarkJobTpl) CreateJobTpl(opts ...BenchmarkJobOption) (k8s.Job, err
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
// FIXME: get benchmark operator configmap name
Name: configMapName,
Name: b.configMapName,
},
DefaultMode: &mode,
},
Expand Down
15 changes: 12 additions & 3 deletions internal/k8s/vald/benchmark/job/job_template_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,19 @@ func WithContainerImage(name string) BenchmarkJobTplOption {
// WithImagePullPolicy sets the docker image pull policy for benchmark job.
func WithImagePullPolicy(p ImagePullPolicy) BenchmarkJobTplOption {
return func(b *benchmarkJobTpl) error {
if len(p) == 0 {
return nil
if len(p) > 0 {
b.imagePullPolicy = p
}
return nil
}
}

// WithOperatorConfigMap sets the configMapName for mounting Job Pod
func WithOperatorConfigMap(cm string) BenchmarkJobTplOption {
return func(b *benchmarkJobTpl) error {
if len(cm) > 0 {
b.configMapName = cm

Check warning on line 66 in internal/k8s/vald/benchmark/job/job_template_option.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/vald/benchmark/job/job_template_option.go#L66

Added line #L66 was not covered by tests
}
b.imagePullPolicy = p
return nil
}
}
Expand Down
18 changes: 14 additions & 4 deletions internal/net/grpc/interceptor/client/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ package metric

import (
"context"
"time"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/net/grpc/codes"
"github.com/vdaas/vald/internal/net/grpc/status"
"github.com/vdaas/vald/internal/observability/attribute"
Expand Down Expand Up @@ -59,9 +59,19 @@ func ClientMetricInterceptors() (grpc.UnaryClientInterceptor, grpc.StreamClientI
latencyHistgram.Record(ctx, latency, metrics.WithAttributes(attrs...))
completedRPCCnt.Add(ctx, 1, metrics.WithAttributes(attrs...))

Check warning on line 60 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L57-L60

Added lines #L57 - L60 were not covered by tests
}
// FIXME: implement sending metric data
log.Debug(record)
return nil, nil, nil
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
now := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
elapsedTime := time.Since(now)
record(ctx, method, err, float64(elapsedTime)/float64(time.Millisecond))
return err
}, func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
now := time.Now()
_, err := streamer(ctx, desc, cc, method, opts...)
elapsedTime := time.Since(now)
record(ctx, method, err, float64(elapsedTime)/float64(time.Millisecond))
return nil, nil
}, nil

Check warning on line 74 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L62-L74

Added lines #L62 - L74 were not covered by tests
}

func attributesFromError(method string, err error) []attribute.KeyValue {
Expand Down
3 changes: 2 additions & 1 deletion pkg/tools/benchmark/operator/service/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type operator struct {
jobNamespace string
jobImage string
jobImagePullPolicy string
configMapName string
scenarios *atomic.Pointer[map[string]*scenario]
benchjobs *atomic.Pointer[map[string]*v1.ValdBenchmarkJob]
jobs *atomic.Pointer[map[string]string]
Expand Down Expand Up @@ -455,7 +456,6 @@ func (o *operator) createBenchmarkJob(ctx context.Context, scenario v1.ValdBench
}
// set status
bj.Status = v1.BenchmarkJobNotReady
// TODO: set metrics
// create benchmark job resource
c := o.ctrl.GetManager().GetClient()
if err := c.Create(ctx, bj); err != nil {
Expand All @@ -476,6 +476,7 @@ func (o *operator) createJob(ctx context.Context, bjr v1.ValdBenchmarkJob) error
benchjob.WithContainerName(bjr.GetName()),
benchjob.WithContainerImage(o.jobImage),
benchjob.WithImagePullPolicy(benchjob.ImagePullPolicy(o.jobImagePullPolicy)),
benchjob.WithOperatorConfigMap(o.configMapName),
)
if err != nil {
return err
Expand Down
14 changes: 11 additions & 3 deletions pkg/tools/benchmark/operator/service/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var defaultOpts = []Option{
WithJobImagePullPolicy("Always"),
WithReconcileCheckDuration("10s"),
WithJobNamespace("default"),
WithConfigMapName("vald-benchmark-operator-config"),
}

// WithErrGroup sets the error group to scenario.
Expand Down Expand Up @@ -60,9 +61,7 @@ func WithReconcileCheckDuration(ts string) Option {
// WithJobNamespace sets the namespace for running benchmark job.
func WithJobNamespace(ns string) Option {
return func(o *operator) error {
if ns == "" {
o.jobNamespace = "default"
} else {
if ns != "" {

Check warning on line 64 in pkg/tools/benchmark/operator/service/option.go

View check run for this annotation

Codecov / codecov/patch

pkg/tools/benchmark/operator/service/option.go#L64

Added line #L64 was not covered by tests
o.jobNamespace = ns
}
return nil
Expand All @@ -88,3 +87,12 @@ func WithJobImagePullPolicy(p string) Option {
return nil
}
}

func WithConfigMapName(cm string) Option {
return func(o *operator) error {
if cm != "" {
o.configMapName = cm

Check warning on line 94 in pkg/tools/benchmark/operator/service/option.go

View check run for this annotation

Codecov / codecov/patch

pkg/tools/benchmark/operator/service/option.go#L93-L94

Added lines #L93 - L94 were not covered by tests
}
return nil

Check warning on line 96 in pkg/tools/benchmark/operator/service/option.go

View check run for this annotation

Codecov / codecov/patch

pkg/tools/benchmark/operator/service/option.go#L96

Added line #L96 was not covered by tests
}
}

0 comments on commit fc32f64

Please sign in to comment.