Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement client metrics interceptor for continuous benchmark job #2477

Merged
16 changes: 11 additions & 5 deletions cmd/tools/benchmark/job/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ server_config:
header_table_size: 0
interceptors:
- "RecoverInterceptor"
- "TraceInterceptor"
- "MetricInterceptor"
enable_reflection: true
socket_option:
reuse_port: true
Expand Down Expand Up @@ -136,8 +138,8 @@ server_config:
ip_recover_destination_addr: false
startup_strategy:
- liveness
- readiness
- grpc
- readiness
full_shutdown_duration: 30s
tls:
ca: /path/to/ca
Expand All @@ -147,7 +149,7 @@ server_config:
observability:
enabled: false
otlp:
collector_endpoint: ""
collector_endpoint: "opentelemetry-collector-collector.default.svc.cluster.local:4317"
attribute:
namespace: _MY_POD_NAMESPACE_
pod_name: _MY_POD_NAME_
Expand All @@ -157,6 +159,8 @@ observability:
trace_export_timeout: "1m"
trace_max_export_batch_size: 1024
trace_max_queue_size: 256
metrics_export_interval: "1s"
metrics_export_timeout: "1m"
metrics:
enable_cgo: true
enable_goroutine: true
Expand All @@ -172,7 +176,7 @@ observability:
- go_arch
- algorithm_info
trace:
enabled: false
enabled: true
job:
replica: 1
repetition: 1
Expand All @@ -181,7 +185,7 @@ job:
rps: 200
concurrency_limit: 200
client_config:
health_check_duration: "1s"
health_check_duration: ""
connection_pool:
enable_dns_resolver: true
enable_rebalance: true
Expand Down Expand Up @@ -221,7 +225,9 @@ job:
enable_backoff: false
insecure: true
timeout: ""
interceptors: []
interceptors:
- MetricInterceptor
- TraceInterceptor
net:
dns:
cache_enabled: true
Expand Down
51 changes: 49 additions & 2 deletions internal/k8s/vald/benchmark/job/job_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,21 @@ const (
RestartPolicyAlways RestartPolicy = "Always"
RestartPolicyOnFailure RestartPolicy = "OnFailure"
RestartPolicyNever RestartPolicy = "Never"
)

const (
volumeName = "vald-benchmark-job-config"
svcAccount = "vald-benchmark-operator"
)

var mode = int32(420)
vankichi marked this conversation as resolved.
Show resolved Hide resolved

type BenchmarkJobTpl interface {
CreateJobTpl(opts ...BenchmarkJobOption) (k8s.Job, error)
}

type benchmarkJobTpl struct {
containerName string
containerImageName string
configMapName string
imagePullPolicy ImagePullPolicy
jobTpl k8s.Job
}
Expand Down Expand Up @@ -138,6 +140,51 @@ func (b *benchmarkJobTpl) CreateJobTpl(opts ...BenchmarkJobOption) (k8s.Job, err
},
},
},
{
Name: "MY_NODE_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "spec.nodeName",
},
},
},
{
Name: "MY_POD_NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
{
Name: "MY_POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: volumeName,
MountPath: "/etc/server",
},
},
},
}
// mount benchmark operator config map.
// It is used for bind only observability config for each benchmark job
b.jobTpl.Spec.Template.Spec.Volumes = []corev1.Volume{
{
Name: volumeName,
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: b.configMapName,
},
DefaultMode: &mode,
},
},
},
}
Expand Down
15 changes: 12 additions & 3 deletions internal/k8s/vald/benchmark/job/job_template_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,19 @@
// WithImagePullPolicy sets the docker image pull policy for benchmark job.
func WithImagePullPolicy(p ImagePullPolicy) BenchmarkJobTplOption {
return func(b *benchmarkJobTpl) error {
if len(p) == 0 {
return nil
if len(p) > 0 {
b.imagePullPolicy = p
}
return nil
}
}

// WithOperatorConfigMap sets the configMapName for mounting Job Pod
vankichi marked this conversation as resolved.
Show resolved Hide resolved
func WithOperatorConfigMap(cm string) BenchmarkJobTplOption {
return func(b *benchmarkJobTpl) error {
if len(cm) > 0 {
b.configMapName = cm

Check warning on line 66 in internal/k8s/vald/benchmark/job/job_template_option.go

View check run for this annotation

Codecov / codecov/patch

internal/k8s/vald/benchmark/job/job_template_option.go#L66

Added line #L66 was not covered by tests
}
b.imagePullPolicy = p
return nil
}
}
Expand Down
89 changes: 89 additions & 0 deletions internal/net/grpc/interceptor/client/metric/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (C) 2019-2024 vdaas.org vald team <vald@vdaas.org>
vankichi marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric

import (
"context"
"time"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/net/grpc/codes"
"github.com/vdaas/vald/internal/net/grpc/status"
"github.com/vdaas/vald/internal/observability/attribute"
"github.com/vdaas/vald/internal/observability/metrics"
"google.golang.org/grpc"
)

const (
latencyMetricsName = "client_latency"
completedRPCsMetricsName = "client_completed_rpcs"

gRPCMethodKeyName = "grpc_client_method"
gRPCStatus = "grpc_client_status"
)

func ClientMetricInterceptors() (grpc.UnaryClientInterceptor, grpc.StreamClientInterceptor, error) {
meter := metrics.GetMeter()

Check warning on line 37 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L36-L37

Added lines #L36 - L37 were not covered by tests

latencyHistgram, err := meter.Float64Histogram(
latencyMetricsName,
metrics.WithDescription("Client latency in milliseconds, by method"),
metrics.WithUnit(metrics.Milliseconds),
)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create latency metric")

Check warning on line 45 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L39-L45

Added lines #L39 - L45 were not covered by tests
}

completedRPCCnt, err := meter.Int64Counter(
completedRPCsMetricsName,
metrics.WithDescription("Count of RPCs by method and status"),
metrics.WithUnit(metrics.Milliseconds),
)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create completedRPCs metric")

Check warning on line 54 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L48-L54

Added lines #L48 - L54 were not covered by tests
}

record := func(ctx context.Context, method string, err error, latency float64) {
attrs := attributesFromError(method, err)
latencyHistgram.Record(ctx, latency, metrics.WithAttributes(attrs...))
completedRPCCnt.Add(ctx, 1, metrics.WithAttributes(attrs...))

Check warning on line 60 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L57-L60

Added lines #L57 - L60 were not covered by tests
}
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
now := time.Now()
err := invoker(ctx, method, req, reply, cc, opts...)
elapsedTime := time.Since(now)
record(ctx, method, err, float64(elapsedTime)/float64(time.Millisecond))
return err
}, func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
now := time.Now()
_, err := streamer(ctx, desc, cc, method, opts...)
elapsedTime := time.Since(now)
record(ctx, method, err, float64(elapsedTime)/float64(time.Millisecond))
return nil, nil
}, nil

Check warning on line 74 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L62-L74

Added lines #L62 - L74 were not covered by tests
}

func attributesFromError(method string, err error) []attribute.KeyValue {
code := codes.OK // default error is success when error is nil
if err != nil {
st, _ := status.FromError(err)
if st != nil {
code = st.Code()

Check warning on line 82 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L77-L82

Added lines #L77 - L82 were not covered by tests
}
}
return []attribute.KeyValue{
attribute.String(gRPCMethodKeyName, method),
attribute.String(gRPCStatus, code.String()),

Check warning on line 87 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L85-L87

Added lines #L85 - L87 were not covered by tests
}
}
12 changes: 12 additions & 0 deletions internal/net/grpc/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

"github.com/vdaas/vald/internal/backoff"
"github.com/vdaas/vald/internal/circuitbreaker"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/net"
"github.com/vdaas/vald/internal/net/grpc/interceptor/client/metric"
"github.com/vdaas/vald/internal/net/grpc/interceptor/client/trace"
"github.com/vdaas/vald/internal/strings"
"github.com/vdaas/vald/internal/sync/errgroup"
Expand Down Expand Up @@ -353,6 +355,16 @@
grpc.WithUnaryInterceptor(trace.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(trace.StreamClientInterceptor()),
)
case "metricinterceptor", "metric":
uci, sci, err := metric.ClientMetricInterceptors()
if err != nil {
lerr := errors.NewErrCriticalOption("gRPCInterceptors", "metric", errors.Wrap(err, "failed to create interceptor"))
log.Warn(lerr.Error())

Check warning on line 362 in internal/net/grpc/option.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/option.go#L358-L362

Added lines #L358 - L362 were not covered by tests
}
g.dopts = append(g.dopts,
grpc.WithUnaryInterceptor(uci),
grpc.WithStreamInterceptor(sci),
)

Check warning on line 367 in internal/net/grpc/option.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/option.go#L364-L367

Added lines #L364 - L367 were not covered by tests
default:
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/tools/benchmark/job/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
NAME = os.Getenv("CRD_NAME")
JOBNAME_ANNOTATION = "before-job-name"
JOBNAMESPACE_ANNOTATION = "before-job-namespace"
SERVICE_NAME = "vald-benchmark-job"
)

// NewConfig represents the set config from the given setting file path.
Expand All @@ -74,6 +75,7 @@

if cfg.Observability != nil {
cfg.Observability = cfg.Observability.Bind()
cfg.Observability.OTLP.Attribute.ServiceName = SERVICE_NAME

Check warning on line 78 in pkg/tools/benchmark/job/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/tools/benchmark/job/config/config.go#L78

Added line #L78 was not covered by tests
} else {
cfg.Observability = new(config.Observability)
}
Expand Down
36 changes: 24 additions & 12 deletions pkg/tools/benchmark/job/usecase/benchmarkd.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,34 @@
}
}

// bind metrics interceptor
var clientInterceptors []string
var obs observability.Observability
if cfg.Observability.Enabled {
obs, err = observability.NewWithConfig(
cfg.Observability,
infometrics.New("vald_benchmark_job_info", "Benchmark Job info", *cfg.Job),
)
if err != nil {
return nil, err

Check warning on line 76 in pkg/tools/benchmark/job/usecase/benchmarkd.go

View check run for this annotation

Codecov / codecov/patch

pkg/tools/benchmark/job/usecase/benchmarkd.go#L68-L76

Added lines #L68 - L76 were not covered by tests
}
var str []string
str = append(str, "metric")
if cfg.Observability.Trace.Enabled {
str = append(str, "trace")

Check warning on line 81 in pkg/tools/benchmark/job/usecase/benchmarkd.go

View check run for this annotation

Codecov / codecov/patch

pkg/tools/benchmark/job/usecase/benchmarkd.go#L78-L81

Added lines #L78 - L81 were not covered by tests
}
clientInterceptors = str

Check warning on line 83 in pkg/tools/benchmark/job/usecase/benchmarkd.go

View check run for this annotation

Codecov / codecov/patch

pkg/tools/benchmark/job/usecase/benchmarkd.go#L83

Added line #L83 was not covered by tests
vankichi marked this conversation as resolved.
Show resolved Hide resolved
}

copts, err := cfg.Job.ClientConfig.Opts()
if err != nil {
return nil, err
}
if cfg.Job.ClientConfig.DialOption == nil {
copts = append(copts, grpc.WithInsecure(true))
copts = append(copts,
grpc.WithInsecure(true),
grpc.WithClientInterceptors(clientInterceptors...),
)

Check warning on line 94 in pkg/tools/benchmark/job/usecase/benchmarkd.go

View check run for this annotation

Codecov / codecov/patch

pkg/tools/benchmark/job/usecase/benchmarkd.go#L91-L94

Added lines #L91 - L94 were not covered by tests
}
gcli := grpc.New(copts...)
vcli, err := vald.New(
Expand Down Expand Up @@ -131,17 +153,6 @@
}),
}

var obs observability.Observability
if cfg.Observability.Enabled {
obs, err = observability.NewWithConfig(
cfg.Observability,
infometrics.New("vald_benchmark_job_info", "Benchmark Job info", *cfg.Job),
)
if err != nil {
return nil, err
}
}

srv, err := starter.New(
starter.WithConfig(cfg.Server),
starter.WithREST(func(sc *iconf.Server) []server.Option {
Expand Down Expand Up @@ -197,6 +208,7 @@
if r.observability != nil {
oech = r.observability.Start(ctx)
}

dech, err = r.job.Start(ctx)
if err != nil {
ech <- err
Expand Down
3 changes: 2 additions & 1 deletion pkg/tools/benchmark/operator/service/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type operator struct {
jobNamespace string
jobImage string
jobImagePullPolicy string
configMapName string
scenarios *atomic.Pointer[map[string]*scenario]
benchjobs *atomic.Pointer[map[string]*v1.ValdBenchmarkJob]
jobs *atomic.Pointer[map[string]string]
Expand Down Expand Up @@ -455,7 +456,6 @@ func (o *operator) createBenchmarkJob(ctx context.Context, scenario v1.ValdBench
}
// set status
bj.Status = v1.BenchmarkJobNotReady
// TODO: set metrics
// create benchmark job resource
c := o.ctrl.GetManager().GetClient()
if err := c.Create(ctx, bj); err != nil {
Expand All @@ -476,6 +476,7 @@ func (o *operator) createJob(ctx context.Context, bjr v1.ValdBenchmarkJob) error
benchjob.WithContainerName(bjr.GetName()),
benchjob.WithContainerImage(o.jobImage),
benchjob.WithImagePullPolicy(benchjob.ImagePullPolicy(o.jobImagePullPolicy)),
benchjob.WithOperatorConfigMap(o.configMapName),
)
if err != nil {
return err
Expand Down
Loading
Loading