diff --git a/cmd/varlogadm/cli.go b/cmd/varlogadm/cli.go index b10ddae42..7922ec60a 100644 --- a/cmd/varlogadm/cli.go +++ b/cmd/varlogadm/cli.go @@ -93,7 +93,7 @@ func start(c *cli.Context) error { _ = logger.Sync() }() - meterProviderOpts, err := flags.ParseTelemetryFlags(context.Background(), c, "adm", clusterID.String()) + meterProviderOpts, err := flags.ParseTelemetryFlags(context.Background(), c, "adm", clusterID.String(), clusterID) if err != nil { return err } diff --git a/cmd/varlogmr/metadata_repository.go b/cmd/varlogmr/metadata_repository.go index 3eac6f00e..c050841f3 100644 --- a/cmd/varlogmr/metadata_repository.go +++ b/cmd/varlogmr/metadata_repository.go @@ -57,7 +57,7 @@ func start(c *cli.Context) error { raftAddr := c.String(flagRaftAddr.Name) nodeID := types.NewNodeIDFromURL(raftAddr) - meterProviderOpts, err := flags.ParseTelemetryFlags(context.Background(), c, "mr", nodeID.String()) + meterProviderOpts, err := flags.ParseTelemetryFlags(context.Background(), c, "mr", nodeID.String(), cid) if err != nil { return err } diff --git a/cmd/varlogsn/varlogsn.go b/cmd/varlogsn/varlogsn.go index 7c59c9559..d4d339bbe 100644 --- a/cmd/varlogsn/varlogsn.go +++ b/cmd/varlogsn/varlogsn.go @@ -94,7 +94,7 @@ func start(c *cli.Context) error { logger = logger.Named("sn").With(zap.Uint32("cid", uint32(clusterID)), zap.Int32("snid", int32(storageNodeID))) - meterProviderOpts, err := flags.ParseTelemetryFlags(context.Background(), c, "sn", storageNodeID.String()) + meterProviderOpts, err := flags.ParseTelemetryFlags(context.Background(), c, "sn", storageNodeID.String(), clusterID) if err != nil { return err } diff --git a/internal/flags/telemetry.go b/internal/flags/telemetry.go index 3b96d371b..afad57e41 100644 --- a/internal/flags/telemetry.go +++ b/internal/flags/telemetry.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/otel/sdk/resource" semconv "go.opentelemetry.io/otel/semconv/v1.20.0" + "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/util/telemetry" ) @@ -88,16 +89,18 @@ var ( } ) -func ParseTelemetryFlags(ctx context.Context, c *cli.Context, serviceName, serviceInstanceID string) (opts []telemetry.MeterProviderOption, err error) { +func ParseTelemetryFlags(ctx context.Context, c *cli.Context, serviceName, serviceInstanceID string, cid types.ClusterID) (opts []telemetry.MeterProviderOption, err error) { const serviceNamespace = "varlog" res, err := resource.New(ctx, resource.WithFromEnv(), resource.WithHost(), + resource.WithTelemetrySDK(), resource.WithAttributes( semconv.ServiceName(serviceName), semconv.ServiceNamespace(serviceNamespace), semconv.ServiceInstanceID(serviceInstanceID), + telemetry.ClusterID(cid), )) if err != nil { return nil, err diff --git a/internal/storagenode/storagenode.go b/internal/storagenode/storagenode.go index 8f474e438..cb55122a9 100644 --- a/internal/storagenode/storagenode.go +++ b/internal/storagenode/storagenode.go @@ -82,7 +82,7 @@ func NewStorageNode(opts ...Option) (*StorageNode, error) { return nil, err } - metrics, err := telemetry.RegisterMetrics(otel.Meter("varlogsn"), cfg.snid) + metrics, err := telemetry.RegisterMetrics(otel.Meter("varlogsn")) if err != nil { return nil, err } @@ -334,7 +334,7 @@ func (sn *StorageNode) runLogStreamReplica(_ context.Context, tpid types.TopicID return nil, snerrors.ErrTooManyReplicas } - lsm, err := telemetry.RegisterLogStreamMetrics(sn.metrics, lsid) + lsm, err := telemetry.RegisterLogStreamMetrics(sn.metrics, tpid, lsid) if err != nil { return nil, err } diff --git a/internal/storagenode/telemetry/metrics.go b/internal/storagenode/telemetry/metrics.go index ca085da5b..1e8162805 100644 --- a/internal/storagenode/telemetry/metrics.go +++ b/internal/storagenode/telemetry/metrics.go @@ -10,9 +10,12 @@ import ( "go.opentelemetry.io/otel/metric" "github.com/kakao/varlog/pkg/types" + "github.com/kakao/varlog/pkg/util/telemetry" ) type LogStreamMetrics struct { + attrs attribute.Set + AppendLogs atomic.Int64 AppendBytes atomic.Int64 AppendDuration atomic.Int64 @@ -47,12 +50,11 @@ type LogStreamMetrics struct { } type Metrics struct { - snid types.StorageNodeID metricsMap sync.Map } -func RegisterMetrics(meter metric.Meter, snid types.StorageNodeID) (m *Metrics, err error) { - m = &Metrics{snid: snid} +func RegisterMetrics(meter metric.Meter) (m *Metrics, err error) { + m = &Metrics{} var ( appendLogs metric.Int64ObservableCounter @@ -291,41 +293,39 @@ func RegisterMetrics(meter metric.Meter, snid types.StorageNodeID) (m *Metrics, defer mu.Unlock() m.metricsMap.Range(func(key, value any) bool { - lsid := key.(types.LogStreamID) lsm := value.(*LogStreamMetrics) - attrs := attribute.NewSet(attribute.Int("lsid", int(lsid))) - observer.ObserveInt64(appendLogs, lsm.AppendLogs.Load(), metric.WithAttributeSet(attrs)) - observer.ObserveInt64(appendBytes, lsm.AppendBytes.Load()) - observer.ObserveInt64(appendDuration, lsm.AppendDuration.Load()) - observer.ObserveInt64(appendOperations, lsm.AppendOperations.Load()) - observer.ObserveInt64(appendPreparationMicroseconds, lsm.AppendPreparationMicro.Load()) - observer.ObserveInt64(appendBatchCommitGap, lsm.AppendBatchCommitGap.Load()) + observer.ObserveInt64(appendLogs, lsm.AppendLogs.Load(), metric.WithAttributeSet(lsm.attrs)) + observer.ObserveInt64(appendBytes, lsm.AppendBytes.Load(), metric.WithAttributeSet(lsm.attrs)) + observer.ObserveInt64(appendDuration, lsm.AppendDuration.Load(), metric.WithAttributeSet(lsm.attrs)) + observer.ObserveInt64(appendOperations, lsm.AppendOperations.Load(), metric.WithAttributeSet(lsm.attrs)) + observer.ObserveInt64(appendPreparationMicroseconds, lsm.AppendPreparationMicro.Load(), metric.WithAttributeSet(lsm.attrs)) + observer.ObserveInt64(appendBatchCommitGap, lsm.AppendBatchCommitGap.Load(), metric.WithAttributeSet(lsm.attrs)) - observer.ObserveInt64(sequencerOperationDuration, lsm.SequencerOperationDuration.Load()) - observer.ObserveInt64(sequencerFanoutDuration, lsm.SequencerFanoutDuration.Load()) - observer.ObserveInt64(sequencerOperations, lsm.SequencerOperations.Load()) - observer.ObserveInt64(sequencerInflightOperations, lsm.SequencerInflightOperations.Load()) + observer.ObserveInt64(sequencerOperationDuration, lsm.SequencerOperationDuration.Load(), metric.WithAttributeSet(lsm.attrs)) + observer.ObserveInt64(sequencerFanoutDuration, lsm.SequencerFanoutDuration.Load(), metric.WithAttributeSet(lsm.attrs)) + observer.ObserveInt64(sequencerOperations, lsm.SequencerOperations.Load(), metric.WithAttributeSet(lsm.attrs)) + observer.ObserveInt64(sequencerInflightOperations, lsm.SequencerInflightOperations.Load(), metric.WithAttributeSet(lsm.attrs)) - observer.ObserveInt64(writerOperationDuration, lsm.WriterOperationDuration.Load()) - observer.ObserveInt64(writerOperations, lsm.WriterOperations.Load()) - observer.ObserveInt64(writerInflightOperations, lsm.WriterInflightOperations.Load()) + observer.ObserveInt64(writerOperationDuration, lsm.WriterOperationDuration.Load(), metric.WithAttributeSet(lsm.attrs)) + observer.ObserveInt64(writerOperations, lsm.WriterOperations.Load(), metric.WithAttributeSet(lsm.attrs)) + observer.ObserveInt64(writerInflightOperations, lsm.WriterInflightOperations.Load(), metric.WithAttributeSet(lsm.attrs)) - observer.ObserveInt64(committerOperationDuration, lsm.CommitterOperationDuration.Load()) - observer.ObserveInt64(committerOperations, lsm.CommitterOperations.Load()) - observer.ObserveInt64(committerLogs, lsm.CommitterLogs.Load()) + observer.ObserveInt64(committerOperationDuration, lsm.CommitterOperationDuration.Load(), metric.WithAttributeSet(lsm.attrs)) + observer.ObserveInt64(committerOperations, lsm.CommitterOperations.Load(), metric.WithAttributeSet(lsm.attrs)) + observer.ObserveInt64(committerLogs, lsm.CommitterLogs.Load(), metric.WithAttributeSet(lsm.attrs)) - observer.ObserveInt64(replicateClientOperationDuration, lsm.ReplicateClientOperationDuration.Load()) - observer.ObserveInt64(replicateClientOperations, lsm.ReplicateClientOperations.Load()) - observer.ObserveInt64(replicateClientInflightOperations, lsm.ReplicateClientInflightOperations.Load()) + observer.ObserveInt64(replicateClientOperationDuration, lsm.ReplicateClientOperationDuration.Load(), metric.WithAttributeSet(lsm.attrs)) + observer.ObserveInt64(replicateClientOperations, lsm.ReplicateClientOperations.Load(), metric.WithAttributeSet(lsm.attrs)) + observer.ObserveInt64(replicateClientInflightOperations, lsm.ReplicateClientInflightOperations.Load(), metric.WithAttributeSet(lsm.attrs)) - observer.ObserveInt64(replicateServerOperations, lsm.ReplicateServerOperations.Load()) + observer.ObserveInt64(replicateServerOperations, lsm.ReplicateServerOperations.Load(), metric.WithAttributeSet(lsm.attrs)) - observer.ObserveInt64(replicateLogs, lsm.ReplicateLogs.Load()) - observer.ObserveInt64(replicateBytes, lsm.ReplicateBytes.Load()) - observer.ObserveInt64(replicateDuration, lsm.ReplicateDuration.Load()) - observer.ObserveInt64(replicateOperations, lsm.ReplicateOperations.Load()) - observer.ObserveInt64(replicatePreparationMicroseconds, lsm.ReplicatePreparationMicro.Load()) + observer.ObserveInt64(replicateLogs, lsm.ReplicateLogs.Load(), metric.WithAttributeSet(lsm.attrs)) + observer.ObserveInt64(replicateBytes, lsm.ReplicateBytes.Load(), metric.WithAttributeSet(lsm.attrs)) + observer.ObserveInt64(replicateDuration, lsm.ReplicateDuration.Load(), metric.WithAttributeSet(lsm.attrs)) + observer.ObserveInt64(replicateOperations, lsm.ReplicateOperations.Load(), metric.WithAttributeSet(lsm.attrs)) + observer.ObserveInt64(replicatePreparationMicroseconds, lsm.ReplicatePreparationMicro.Load(), metric.WithAttributeSet(lsm.attrs)) return true }) @@ -346,8 +346,14 @@ func RegisterMetrics(meter metric.Meter, snid types.StorageNodeID) (m *Metrics, return m, nil } -func RegisterLogStreamMetrics(m *Metrics, lsid types.LogStreamID) (*LogStreamMetrics, error) { - lsm, loaded := m.metricsMap.LoadOrStore(lsid, &LogStreamMetrics{}) +func RegisterLogStreamMetrics(m *Metrics, tpid types.TopicID, lsid types.LogStreamID) (*LogStreamMetrics, error) { + attrs := attribute.NewSet( + telemetry.TopicID(tpid), + telemetry.LogStreamID(lsid), + ) + lsm, loaded := m.metricsMap.LoadOrStore(lsid, &LogStreamMetrics{ + attrs: attrs, + }) if loaded { return nil, fmt.Errorf("storagenode: already registered %v", lsid) } diff --git a/internal/storagenode/telemetry/metrics_test.go b/internal/storagenode/telemetry/metrics_test.go index 6a5af2bee..37b402e38 100644 --- a/internal/storagenode/telemetry/metrics_test.go +++ b/internal/storagenode/telemetry/metrics_test.go @@ -6,6 +6,8 @@ import ( "github.com/stretchr/testify/assert" "go.opentelemetry.io/otel" "go.uber.org/goleak" + + "github.com/kakao/varlog/pkg/types" ) func TestMain(m *testing.M) { @@ -13,17 +15,20 @@ func TestMain(m *testing.M) { } func TestRegisterLogStreamMetrics(t *testing.T) { - m, err := RegisterMetrics(otel.Meter("test"), 1) + const tpid = types.TopicID(1) + const lsid = types.LogStreamID(2) + + m, err := RegisterMetrics(otel.Meter("test")) assert.NoError(t, err) - _, err = RegisterLogStreamMetrics(m, 1) + _, err = RegisterLogStreamMetrics(m, tpid, lsid) assert.NoError(t, err) - _, err = RegisterLogStreamMetrics(m, 1) + _, err = RegisterLogStreamMetrics(m, tpid, lsid) assert.Error(t, err) - UnregisterLogStreamMetrics(m, 1) + UnregisterLogStreamMetrics(m, lsid) - _, err = RegisterLogStreamMetrics(m, 1) + _, err = RegisterLogStreamMetrics(m, tpid, lsid) assert.NoError(t, err) } diff --git a/pkg/util/telemetry/semconv.go b/pkg/util/telemetry/semconv.go new file mode 100644 index 000000000..5d3d9d3d0 --- /dev/null +++ b/pkg/util/telemetry/semconv.go @@ -0,0 +1,25 @@ +package telemetry + +import ( + "go.opentelemetry.io/otel/attribute" + + "github.com/kakao/varlog/pkg/types" +) + +const ( + ClusterIDKey = attribute.Key("varlog.cluster.id") + TopicIDKey = attribute.Key("varlog.topic.id") + LogStreamIDKey = attribute.Key("varlog.logstream.id") +) + +func ClusterID(val types.ClusterID) attribute.KeyValue { + return ClusterIDKey.Int(int(val)) +} + +func TopicID(val types.TopicID) attribute.KeyValue { + return TopicIDKey.Int(int(val)) +} + +func LogStreamID(val types.LogStreamID) attribute.KeyValue { + return LogStreamIDKey.Int(int(val)) +}