diff --git a/internal/k8s/vald/benchmark/job/job_template.go b/internal/k8s/vald/benchmark/job/job_template.go index dbde9ff4a5..cd6174b50c 100644 --- a/internal/k8s/vald/benchmark/job/job_template.go +++ b/internal/k8s/vald/benchmark/job/job_template.go @@ -35,14 +35,16 @@ const ( RestartPolicyAlways RestartPolicy = "Always" RestartPolicyOnFailure RestartPolicy = "OnFailure" RestartPolicyNever RestartPolicy = "Never" -) -const ( volumeName = "vald-benchmark-job-config" configMapName = "vald-benchmark-operator-config" svcAccount = "vald-benchmark-operator" ) +var ( + mode = int32(420) +) + type BenchmarkJobTpl interface { CreateJobTpl(opts ...BenchmarkJobOption) (k8s.Job, error) } @@ -175,7 +177,6 @@ func (b *benchmarkJobTpl) CreateJobTpl(opts ...BenchmarkJobOption) (k8s.Job, err } // mount benchmark operator config map. // It is used for bind only observability config for each benchmark job - mode := int32(420) b.jobTpl.Spec.Template.Spec.Volumes = []corev1.Volume{ { Name: volumeName, diff --git a/internal/net/grpc/interceptor/client/metric/metric.go b/internal/net/grpc/interceptor/client/metric/metric.go new file mode 100644 index 0000000000..c3c5ae64d3 --- /dev/null +++ b/internal/net/grpc/interceptor/client/metric/metric.go @@ -0,0 +1,79 @@ +// Copyright (C) 2019-2024 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package metric + +import ( + "context" + + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/net/grpc/codes" + "github.com/vdaas/vald/internal/net/grpc/status" + "github.com/vdaas/vald/internal/observability/attribute" + "github.com/vdaas/vald/internal/observability/metrics" + "google.golang.org/grpc" +) + +const ( + latencyMetricsName = "client_latency" + completedRPCsMetricsName = "client_completed_rpcs" + + gRPCMethodKeyName = "grpc_client_method" + gRPCStatus = "grpc_client_status" +) + +func ClientMetricInterceptors() (grpc.UnaryClientInterceptor, grpc.StreamClientInterceptor, error) { + meter := metrics.GetMeter() + + latencyHistgram, err := meter.Float64Histogram( + latencyMetricsName, + metrics.WithDescription("Client latency in milliseconds, by method"), + metrics.WithUnit(metrics.Milliseconds), + ) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to create latency metric") + } + + completedRPCCnt, err := meter.Int64Counter( + completedRPCsMetricsName, + metrics.WithDescription("Count of RPCs by method and status"), + metrics.WithUnit(metrics.Milliseconds), + ) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to create completedRPCs metric") + } + + record := func(ctx context.Context, method string, err error, latency float64) { + attrs := attributesFromError(method, err) + latencyHistgram.Record(ctx, latency, metrics.WithAttributes(attrs...)) + completedRPCCnt.Add(ctx, 1, metrics.WithAttributes(attrs...)) + } + // FIXME: implement sending metric data + log.Debug(record) + return nil, nil, nil +} + +func attributesFromError(method string, err error) []attribute.KeyValue { + code := codes.OK // default error is success when error is nil + if err != nil { + st, _ := status.FromError(err) + if st != nil { + code = st.Code() + } + } + return []attribute.KeyValue{ + attribute.String(gRPCMethodKeyName, method), + attribute.String(gRPCStatus, code.String()), + } +} diff --git a/internal/net/grpc/option.go b/internal/net/grpc/option.go index 3d896f48d8..4e5f3533d0 100644 --- a/internal/net/grpc/option.go +++ b/internal/net/grpc/option.go @@ -24,8 +24,10 @@ import ( "github.com/vdaas/vald/internal/backoff" "github.com/vdaas/vald/internal/circuitbreaker" + "github.com/vdaas/vald/internal/errors" "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/net" + "github.com/vdaas/vald/internal/net/grpc/interceptor/client/metric" "github.com/vdaas/vald/internal/net/grpc/interceptor/client/trace" "github.com/vdaas/vald/internal/strings" "github.com/vdaas/vald/internal/sync/errgroup" @@ -353,6 +355,16 @@ func WithClientInterceptors(names ...string) Option { grpc.WithUnaryInterceptor(trace.UnaryClientInterceptor()), grpc.WithStreamInterceptor(trace.StreamClientInterceptor()), ) + case "metricinterceptor", "metric": + uci, sci, err := metric.ClientMetricInterceptors() + if err != nil { + lerr := errors.NewErrCriticalOption("gRPCInterceptors", "metric", errors.Wrap(err, "failed to create interceptor")) + log.Warn(lerr.Error()) + } + g.dopts = append(g.dopts, + grpc.WithUnaryInterceptor(uci), + grpc.WithStreamInterceptor(sci), + ) default: } } diff --git a/pkg/tools/benchmark/job/usecase/benchmarkd.go b/pkg/tools/benchmark/job/usecase/benchmarkd.go index e20714f627..e9a1bda638 100644 --- a/pkg/tools/benchmark/job/usecase/benchmarkd.go +++ b/pkg/tools/benchmark/job/usecase/benchmarkd.go @@ -64,12 +64,34 @@ func New(cfg *config.Config) (r runner.Runner, err error) { } } + // bind metrics interceptor + var clientInterceptors []string + var obs observability.Observability + if cfg.Observability.Enabled { + obs, err = observability.NewWithConfig( + cfg.Observability, + infometrics.New("vald_benchmark_job_info", "Benchmark Job info", *cfg.Job), + ) + if err != nil { + return nil, err + } + var str []string + str = append(str, "metric") + if cfg.Observability.Trace.Enabled { + str = append(str, "trace") + } + clientInterceptors = str + } + copts, err := cfg.Job.ClientConfig.Opts() if err != nil { return nil, err } if cfg.Job.ClientConfig.DialOption == nil { - copts = append(copts, grpc.WithInsecure(true)) + copts = append(copts, + grpc.WithInsecure(true), + grpc.WithClientInterceptors(clientInterceptors...), + ) } gcli := grpc.New(copts...) vcli, err := vald.New( @@ -131,17 +153,6 @@ func New(cfg *config.Config) (r runner.Runner, err error) { }), } - var obs observability.Observability - if cfg.Observability.Enabled { - obs, err = observability.NewWithConfig( - cfg.Observability, - infometrics.New("vald_benchmark_job_info", "Benchmark Job info", *cfg.Job), - ) - if err != nil { - return nil, err - } - } - srv, err := starter.New( starter.WithConfig(cfg.Server), starter.WithREST(func(sc *iconf.Server) []server.Option {