diff --git a/cmd/tools/benchmark/job/sample.yaml b/cmd/tools/benchmark/job/sample.yaml index c08b4ad603..ed71ba6f47 100644 --- a/cmd/tools/benchmark/job/sample.yaml +++ b/cmd/tools/benchmark/job/sample.yaml @@ -48,6 +48,8 @@ server_config: header_table_size: 0 interceptors: - "RecoverInterceptor" + - "TraceInterceptor" + - "MetricInterceptor" enable_reflection: true socket_option: reuse_port: true @@ -136,8 +138,8 @@ server_config: ip_recover_destination_addr: false startup_strategy: - liveness - - readiness - grpc + - readiness full_shutdown_duration: 30s tls: ca: /path/to/ca @@ -147,7 +149,7 @@ server_config: observability: enabled: false otlp: - collector_endpoint: "" + collector_endpoint: "opentelemetry-collector-collector.default.svc.cluster.local:4317" attribute: namespace: _MY_POD_NAMESPACE_ pod_name: _MY_POD_NAME_ @@ -157,6 +159,8 @@ observability: trace_export_timeout: "1m" trace_max_export_batch_size: 1024 trace_max_queue_size: 256 + metrics_export_interval: "1s" + metrics_export_timeout: "1m" metrics: enable_cgo: true enable_goroutine: true @@ -172,7 +176,7 @@ observability: - go_arch - algorithm_info trace: - enabled: false + enabled: true job: replica: 1 repetition: 1 @@ -181,7 +185,7 @@ job: rps: 200 concurrency_limit: 200 client_config: - health_check_duration: "1s" + health_check_duration: "" connection_pool: enable_dns_resolver: true enable_rebalance: true @@ -221,7 +225,9 @@ job: enable_backoff: false insecure: true timeout: "" - interceptors: [] + interceptors: + - MetricInterceptor + - TraceInterceptor net: dns: cache_enabled: true diff --git a/internal/k8s/vald/benchmark/job/job_template.go b/internal/k8s/vald/benchmark/job/job_template.go index 4105dcca9b..fab264919c 100644 --- a/internal/k8s/vald/benchmark/job/job_template.go +++ b/internal/k8s/vald/benchmark/job/job_template.go @@ -36,9 +36,8 @@ const ( RestartPolicyOnFailure RestartPolicy = "OnFailure" RestartPolicyNever RestartPolicy = "Never" - volumeName = "vald-benchmark-job-config" - configMapName = "vald-benchmark-operator-config" - svcAccount = "vald-benchmark-operator" + volumeName = "vald-benchmark-job-config" + svcAccount = "vald-benchmark-operator" ) var mode = int32(420) @@ -50,6 +49,7 @@ type BenchmarkJobTpl interface { type benchmarkJobTpl struct { containerName string containerImageName string + configMapName string imagePullPolicy ImagePullPolicy jobTpl k8s.Job } @@ -181,8 +181,7 @@ func (b *benchmarkJobTpl) CreateJobTpl(opts ...BenchmarkJobOption) (k8s.Job, err VolumeSource: corev1.VolumeSource{ ConfigMap: &corev1.ConfigMapVolumeSource{ LocalObjectReference: corev1.LocalObjectReference{ - // FIXME: get benchmark operator configmap name - Name: configMapName, + Name: b.configMapName, }, DefaultMode: &mode, }, diff --git a/internal/k8s/vald/benchmark/job/job_template_option.go b/internal/k8s/vald/benchmark/job/job_template_option.go index 46574ea0c4..3b05fa7ebb 100644 --- a/internal/k8s/vald/benchmark/job/job_template_option.go +++ b/internal/k8s/vald/benchmark/job/job_template_option.go @@ -52,10 +52,19 @@ func WithContainerImage(name string) BenchmarkJobTplOption { // WithImagePullPolicy sets the docker image pull policy for benchmark job. func WithImagePullPolicy(p ImagePullPolicy) BenchmarkJobTplOption { return func(b *benchmarkJobTpl) error { - if len(p) == 0 { - return nil + if len(p) > 0 { + b.imagePullPolicy = p + } + return nil + } +} + +// WithOperatorConfigMap sets the configMapName for mounting Job Pod +func WithOperatorConfigMap(cm string) BenchmarkJobTplOption { + return func(b *benchmarkJobTpl) error { + if len(cm) > 0 { + b.configMapName = cm } - b.imagePullPolicy = p return nil } } diff --git a/internal/net/grpc/interceptor/client/metric/metric.go b/internal/net/grpc/interceptor/client/metric/metric.go index c3c5ae64d3..7fe929a875 100644 --- a/internal/net/grpc/interceptor/client/metric/metric.go +++ b/internal/net/grpc/interceptor/client/metric/metric.go @@ -15,9 +15,9 @@ package metric import ( "context" + "time" "github.com/vdaas/vald/internal/errors" - "github.com/vdaas/vald/internal/log" "github.com/vdaas/vald/internal/net/grpc/codes" "github.com/vdaas/vald/internal/net/grpc/status" "github.com/vdaas/vald/internal/observability/attribute" @@ -59,9 +59,19 @@ func ClientMetricInterceptors() (grpc.UnaryClientInterceptor, grpc.StreamClientI latencyHistgram.Record(ctx, latency, metrics.WithAttributes(attrs...)) completedRPCCnt.Add(ctx, 1, metrics.WithAttributes(attrs...)) } - // FIXME: implement sending metric data - log.Debug(record) - return nil, nil, nil + return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + now := time.Now() + err := invoker(ctx, method, req, reply, cc, opts...) + elapsedTime := time.Since(now) + record(ctx, method, err, float64(elapsedTime)/float64(time.Millisecond)) + return err + }, func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + now := time.Now() + _, err := streamer(ctx, desc, cc, method, opts...) + elapsedTime := time.Since(now) + record(ctx, method, err, float64(elapsedTime)/float64(time.Millisecond)) + return nil, nil + }, nil } func attributesFromError(method string, err error) []attribute.KeyValue { diff --git a/pkg/tools/benchmark/operator/service/operator.go b/pkg/tools/benchmark/operator/service/operator.go index f03c48b866..cadfa672cc 100644 --- a/pkg/tools/benchmark/operator/service/operator.go +++ b/pkg/tools/benchmark/operator/service/operator.go @@ -59,6 +59,7 @@ type operator struct { jobNamespace string jobImage string jobImagePullPolicy string + configMapName string scenarios *atomic.Pointer[map[string]*scenario] benchjobs *atomic.Pointer[map[string]*v1.ValdBenchmarkJob] jobs *atomic.Pointer[map[string]string] @@ -455,7 +456,6 @@ func (o *operator) createBenchmarkJob(ctx context.Context, scenario v1.ValdBench } // set status bj.Status = v1.BenchmarkJobNotReady - // TODO: set metrics // create benchmark job resource c := o.ctrl.GetManager().GetClient() if err := c.Create(ctx, bj); err != nil { @@ -476,6 +476,7 @@ func (o *operator) createJob(ctx context.Context, bjr v1.ValdBenchmarkJob) error benchjob.WithContainerName(bjr.GetName()), benchjob.WithContainerImage(o.jobImage), benchjob.WithImagePullPolicy(benchjob.ImagePullPolicy(o.jobImagePullPolicy)), + benchjob.WithOperatorConfigMap(o.configMapName), ) if err != nil { return err diff --git a/pkg/tools/benchmark/operator/service/option.go b/pkg/tools/benchmark/operator/service/option.go index 11cb1a7a79..9cf06552ca 100644 --- a/pkg/tools/benchmark/operator/service/option.go +++ b/pkg/tools/benchmark/operator/service/option.go @@ -32,6 +32,7 @@ var defaultOpts = []Option{ WithJobImagePullPolicy("Always"), WithReconcileCheckDuration("10s"), WithJobNamespace("default"), + WithConfigMapName("vald-benchmark-operator-config"), } // WithErrGroup sets the error group to scenario. @@ -60,9 +61,7 @@ func WithReconcileCheckDuration(ts string) Option { // WithJobNamespace sets the namespace for running benchmark job. func WithJobNamespace(ns string) Option { return func(o *operator) error { - if ns == "" { - o.jobNamespace = "default" - } else { + if ns != "" { o.jobNamespace = ns } return nil @@ -88,3 +87,12 @@ func WithJobImagePullPolicy(p string) Option { return nil } } + +func WithConfigMapName(cm string) Option { + return func(o *operator) error { + if cm != "" { + o.configMapName = cm + } + return nil + } +}