Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement client metrics interceptor for continuous benchmark job #2477

Merged
54 changes: 52 additions & 2 deletions internal/k8s/vald/benchmark/job/job_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@ const (
RestartPolicyAlways RestartPolicy = "Always"
RestartPolicyOnFailure RestartPolicy = "OnFailure"
RestartPolicyNever RestartPolicy = "Never"

volumeName = "vald-benchmark-job-config"
configMapName = "vald-benchmark-operator-config"
svcAccount = "vald-benchmark-operator"
)

const (
svcAccount = "vald-benchmark-operator"
var (
vankichi marked this conversation as resolved.
Show resolved Hide resolved
mode = int32(420)
vankichi marked this conversation as resolved.
Show resolved Hide resolved
)

type BenchmarkJobTpl interface {
Expand Down Expand Up @@ -138,6 +142,52 @@ func (b *benchmarkJobTpl) CreateJobTpl(opts ...BenchmarkJobOption) (k8s.Job, err
},
},
},
{
Name: "MY_NODE_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "spec.nodeName",
},
},
},
{
Name: "MY_POD_NAMESPACE",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
},
{
Name: "MY_POD_NAME",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.name",
},
},
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: volumeName,
MountPath: "/etc/server",
},
},
},
}
// mount benchmark operator config map.
// It is used for bind only observability config for each benchmark job
b.jobTpl.Spec.Template.Spec.Volumes = []corev1.Volume{
{
Name: volumeName,
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
// FIXME: get benchmark operator configmap name
Name: configMapName,
},
DefaultMode: &mode,
},
},
},
}
Expand Down
79 changes: 79 additions & 0 deletions internal/net/grpc/interceptor/client/metric/metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (C) 2019-2024 vdaas.org vald team <vald@vdaas.org>
vankichi marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// You may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metric

import (
"context"

"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/net/grpc/codes"
"github.com/vdaas/vald/internal/net/grpc/status"
"github.com/vdaas/vald/internal/observability/attribute"
"github.com/vdaas/vald/internal/observability/metrics"
"google.golang.org/grpc"
)

const (
latencyMetricsName = "client_latency"
completedRPCsMetricsName = "client_completed_rpcs"

gRPCMethodKeyName = "grpc_client_method"
gRPCStatus = "grpc_client_status"
)

func ClientMetricInterceptors() (grpc.UnaryClientInterceptor, grpc.StreamClientInterceptor, error) {
meter := metrics.GetMeter()

Check warning on line 37 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L36-L37

Added lines #L36 - L37 were not covered by tests

latencyHistgram, err := meter.Float64Histogram(
latencyMetricsName,
metrics.WithDescription("Client latency in milliseconds, by method"),
metrics.WithUnit(metrics.Milliseconds),
)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create latency metric")

Check warning on line 45 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L39-L45

Added lines #L39 - L45 were not covered by tests
}

completedRPCCnt, err := meter.Int64Counter(
completedRPCsMetricsName,
metrics.WithDescription("Count of RPCs by method and status"),
metrics.WithUnit(metrics.Milliseconds),
)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create completedRPCs metric")

Check warning on line 54 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L48-L54

Added lines #L48 - L54 were not covered by tests
}

record := func(ctx context.Context, method string, err error, latency float64) {
attrs := attributesFromError(method, err)
latencyHistgram.Record(ctx, latency, metrics.WithAttributes(attrs...))
completedRPCCnt.Add(ctx, 1, metrics.WithAttributes(attrs...))

Check warning on line 60 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L57-L60

Added lines #L57 - L60 were not covered by tests
}
// FIXME: implement sending metric data
log.Debug(record)
return nil, nil, nil

Check warning on line 64 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L63-L64

Added lines #L63 - L64 were not covered by tests
}

func attributesFromError(method string, err error) []attribute.KeyValue {
code := codes.OK // default error is success when error is nil
if err != nil {
st, _ := status.FromError(err)
if st != nil {
code = st.Code()

Check warning on line 72 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L67-L72

Added lines #L67 - L72 were not covered by tests
}
}
return []attribute.KeyValue{
attribute.String(gRPCMethodKeyName, method),
attribute.String(gRPCStatus, code.String()),

Check warning on line 77 in internal/net/grpc/interceptor/client/metric/metric.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/interceptor/client/metric/metric.go#L75-L77

Added lines #L75 - L77 were not covered by tests
}
}
12 changes: 12 additions & 0 deletions internal/net/grpc/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

"github.com/vdaas/vald/internal/backoff"
"github.com/vdaas/vald/internal/circuitbreaker"
"github.com/vdaas/vald/internal/errors"
"github.com/vdaas/vald/internal/log"
"github.com/vdaas/vald/internal/net"
"github.com/vdaas/vald/internal/net/grpc/interceptor/client/metric"
"github.com/vdaas/vald/internal/net/grpc/interceptor/client/trace"
"github.com/vdaas/vald/internal/strings"
"github.com/vdaas/vald/internal/sync/errgroup"
Expand Down Expand Up @@ -353,6 +355,16 @@
grpc.WithUnaryInterceptor(trace.UnaryClientInterceptor()),
grpc.WithStreamInterceptor(trace.StreamClientInterceptor()),
)
case "metricinterceptor", "metric":
uci, sci, err := metric.ClientMetricInterceptors()
if err != nil {
lerr := errors.NewErrCriticalOption("gRPCInterceptors", "metric", errors.Wrap(err, "failed to create interceptor"))
log.Warn(lerr.Error())

Check warning on line 362 in internal/net/grpc/option.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/option.go#L358-L362

Added lines #L358 - L362 were not covered by tests
}
g.dopts = append(g.dopts,
grpc.WithUnaryInterceptor(uci),
grpc.WithStreamInterceptor(sci),
)

Check warning on line 367 in internal/net/grpc/option.go

View check run for this annotation

Codecov / codecov/patch

internal/net/grpc/option.go#L364-L367

Added lines #L364 - L367 were not covered by tests
default:
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/tools/benchmark/job/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
NAME = os.Getenv("CRD_NAME")
JOBNAME_ANNOTATION = "before-job-name"
JOBNAMESPACE_ANNOTATION = "before-job-namespace"
SERVICE_NAME = "vald-benchmark-job"
)

// NewConfig represents the set config from the given setting file path.
Expand All @@ -74,6 +75,7 @@

if cfg.Observability != nil {
cfg.Observability = cfg.Observability.Bind()
cfg.Observability.OTLP.Attribute.ServiceName = SERVICE_NAME

Check warning on line 78 in pkg/tools/benchmark/job/config/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/tools/benchmark/job/config/config.go#L78

Added line #L78 was not covered by tests
} else {
cfg.Observability = new(config.Observability)
}
Expand Down
36 changes: 24 additions & 12 deletions pkg/tools/benchmark/job/usecase/benchmarkd.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,34 @@
}
}

// bind metrics interceptor
var clientInterceptors []string
var obs observability.Observability
if cfg.Observability.Enabled {
obs, err = observability.NewWithConfig(
cfg.Observability,
infometrics.New("vald_benchmark_job_info", "Benchmark Job info", *cfg.Job),
)
if err != nil {
return nil, err

Check warning on line 76 in pkg/tools/benchmark/job/usecase/benchmarkd.go

View check run for this annotation

Codecov / codecov/patch

pkg/tools/benchmark/job/usecase/benchmarkd.go#L68-L76

Added lines #L68 - L76 were not covered by tests
}
var str []string
str = append(str, "metric")
if cfg.Observability.Trace.Enabled {
str = append(str, "trace")

Check warning on line 81 in pkg/tools/benchmark/job/usecase/benchmarkd.go

View check run for this annotation

Codecov / codecov/patch

pkg/tools/benchmark/job/usecase/benchmarkd.go#L78-L81

Added lines #L78 - L81 were not covered by tests
}
clientInterceptors = str

Check warning on line 83 in pkg/tools/benchmark/job/usecase/benchmarkd.go

View check run for this annotation

Codecov / codecov/patch

pkg/tools/benchmark/job/usecase/benchmarkd.go#L83

Added line #L83 was not covered by tests
vankichi marked this conversation as resolved.
Show resolved Hide resolved
}

copts, err := cfg.Job.ClientConfig.Opts()
if err != nil {
return nil, err
}
if cfg.Job.ClientConfig.DialOption == nil {
copts = append(copts, grpc.WithInsecure(true))
copts = append(copts,
grpc.WithInsecure(true),
grpc.WithClientInterceptors(clientInterceptors...),
)

Check warning on line 94 in pkg/tools/benchmark/job/usecase/benchmarkd.go

View check run for this annotation

Codecov / codecov/patch

pkg/tools/benchmark/job/usecase/benchmarkd.go#L91-L94

Added lines #L91 - L94 were not covered by tests
}
gcli := grpc.New(copts...)
vcli, err := vald.New(
Expand Down Expand Up @@ -131,17 +153,6 @@
}),
}

var obs observability.Observability
if cfg.Observability.Enabled {
obs, err = observability.NewWithConfig(
cfg.Observability,
infometrics.New("vald_benchmark_job_info", "Benchmark Job info", *cfg.Job),
)
if err != nil {
return nil, err
}
}

srv, err := starter.New(
starter.WithConfig(cfg.Server),
starter.WithREST(func(sc *iconf.Server) []server.Option {
Expand Down Expand Up @@ -197,6 +208,7 @@
if r.observability != nil {
oech = r.observability.Start(ctx)
}

dech, err = r.job.Start(ctx)
if err != nil {
ech <- err
Expand Down
Loading