Skip to content

Commit

Permalink
refactor(telemetry): add semantic conventions for varlog
Browse files Browse the repository at this point in the history
  • Loading branch information
ijsong committed Jul 28, 2023
1 parent 44eead0 commit dcff02c
Show file tree
Hide file tree
Showing 8 changed files with 82 additions and 43 deletions.
2 changes: 1 addition & 1 deletion cmd/varlogadm/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func start(c *cli.Context) error {
_ = logger.Sync()
}()

meterProviderOpts, err := flags.ParseTelemetryFlags(context.Background(), c, "adm", clusterID.String())
meterProviderOpts, err := flags.ParseTelemetryFlags(context.Background(), c, "adm", clusterID.String(), clusterID)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/varlogmr/metadata_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func start(c *cli.Context) error {

raftAddr := c.String(flagRaftAddr.Name)
nodeID := types.NewNodeIDFromURL(raftAddr)
meterProviderOpts, err := flags.ParseTelemetryFlags(context.Background(), c, "mr", nodeID.String())
meterProviderOpts, err := flags.ParseTelemetryFlags(context.Background(), c, "mr", nodeID.String(), cid)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/varlogsn/varlogsn.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func start(c *cli.Context) error {

logger = logger.Named("sn").With(zap.Uint32("cid", uint32(clusterID)), zap.Int32("snid", int32(storageNodeID)))

meterProviderOpts, err := flags.ParseTelemetryFlags(context.Background(), c, "sn", storageNodeID.String())
meterProviderOpts, err := flags.ParseTelemetryFlags(context.Background(), c, "sn", storageNodeID.String(), clusterID)
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion internal/flags/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/telemetry"
)

Expand Down Expand Up @@ -88,16 +89,18 @@ var (
}
)

func ParseTelemetryFlags(ctx context.Context, c *cli.Context, serviceName, serviceInstanceID string) (opts []telemetry.MeterProviderOption, err error) {
func ParseTelemetryFlags(ctx context.Context, c *cli.Context, serviceName, serviceInstanceID string, cid types.ClusterID) (opts []telemetry.MeterProviderOption, err error) {
const serviceNamespace = "varlog"

res, err := resource.New(ctx,
resource.WithFromEnv(),
resource.WithHost(),
resource.WithTelemetrySDK(),
resource.WithAttributes(
semconv.ServiceName(serviceName),
semconv.ServiceNamespace(serviceNamespace),
semconv.ServiceInstanceID(serviceInstanceID),
telemetry.ClusterID(cid),
))
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions internal/storagenode/storagenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func NewStorageNode(opts ...Option) (*StorageNode, error) {
return nil, err
}

metrics, err := telemetry.RegisterMetrics(otel.Meter("varlogsn"), cfg.snid)
metrics, err := telemetry.RegisterMetrics(otel.Meter("varlogsn"))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -334,7 +334,7 @@ func (sn *StorageNode) runLogStreamReplica(_ context.Context, tpid types.TopicID
return nil, snerrors.ErrTooManyReplicas
}

lsm, err := telemetry.RegisterLogStreamMetrics(sn.metrics, lsid)
lsm, err := telemetry.RegisterLogStreamMetrics(sn.metrics, tpid, lsid)
if err != nil {
return nil, err
}
Expand Down
70 changes: 38 additions & 32 deletions internal/storagenode/telemetry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ import (
"go.opentelemetry.io/otel/metric"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/telemetry"
)

type LogStreamMetrics struct {
attrs attribute.Set

AppendLogs atomic.Int64
AppendBytes atomic.Int64
AppendDuration atomic.Int64
Expand Down Expand Up @@ -47,12 +50,11 @@ type LogStreamMetrics struct {
}

type Metrics struct {
snid types.StorageNodeID
metricsMap sync.Map
}

func RegisterMetrics(meter metric.Meter, snid types.StorageNodeID) (m *Metrics, err error) {
m = &Metrics{snid: snid}
func RegisterMetrics(meter metric.Meter) (m *Metrics, err error) {
m = &Metrics{}

var (
appendLogs metric.Int64ObservableCounter
Expand Down Expand Up @@ -291,41 +293,39 @@ func RegisterMetrics(meter metric.Meter, snid types.StorageNodeID) (m *Metrics,
defer mu.Unlock()

m.metricsMap.Range(func(key, value any) bool {
lsid := key.(types.LogStreamID)
lsm := value.(*LogStreamMetrics)
attrs := attribute.NewSet(attribute.Int("lsid", int(lsid)))

observer.ObserveInt64(appendLogs, lsm.AppendLogs.Load(), metric.WithAttributeSet(attrs))
observer.ObserveInt64(appendBytes, lsm.AppendBytes.Load())
observer.ObserveInt64(appendDuration, lsm.AppendDuration.Load())
observer.ObserveInt64(appendOperations, lsm.AppendOperations.Load())
observer.ObserveInt64(appendPreparationMicroseconds, lsm.AppendPreparationMicro.Load())
observer.ObserveInt64(appendBatchCommitGap, lsm.AppendBatchCommitGap.Load())
observer.ObserveInt64(appendLogs, lsm.AppendLogs.Load(), metric.WithAttributeSet(lsm.attrs))
observer.ObserveInt64(appendBytes, lsm.AppendBytes.Load(), metric.WithAttributeSet(lsm.attrs))
observer.ObserveInt64(appendDuration, lsm.AppendDuration.Load(), metric.WithAttributeSet(lsm.attrs))
observer.ObserveInt64(appendOperations, lsm.AppendOperations.Load(), metric.WithAttributeSet(lsm.attrs))
observer.ObserveInt64(appendPreparationMicroseconds, lsm.AppendPreparationMicro.Load(), metric.WithAttributeSet(lsm.attrs))
observer.ObserveInt64(appendBatchCommitGap, lsm.AppendBatchCommitGap.Load(), metric.WithAttributeSet(lsm.attrs))

observer.ObserveInt64(sequencerOperationDuration, lsm.SequencerOperationDuration.Load())
observer.ObserveInt64(sequencerFanoutDuration, lsm.SequencerFanoutDuration.Load())
observer.ObserveInt64(sequencerOperations, lsm.SequencerOperations.Load())
observer.ObserveInt64(sequencerInflightOperations, lsm.SequencerInflightOperations.Load())
observer.ObserveInt64(sequencerOperationDuration, lsm.SequencerOperationDuration.Load(), metric.WithAttributeSet(lsm.attrs))
observer.ObserveInt64(sequencerFanoutDuration, lsm.SequencerFanoutDuration.Load(), metric.WithAttributeSet(lsm.attrs))
observer.ObserveInt64(sequencerOperations, lsm.SequencerOperations.Load(), metric.WithAttributeSet(lsm.attrs))
observer.ObserveInt64(sequencerInflightOperations, lsm.SequencerInflightOperations.Load(), metric.WithAttributeSet(lsm.attrs))

observer.ObserveInt64(writerOperationDuration, lsm.WriterOperationDuration.Load())
observer.ObserveInt64(writerOperations, lsm.WriterOperations.Load())
observer.ObserveInt64(writerInflightOperations, lsm.WriterInflightOperations.Load())
observer.ObserveInt64(writerOperationDuration, lsm.WriterOperationDuration.Load(), metric.WithAttributeSet(lsm.attrs))
observer.ObserveInt64(writerOperations, lsm.WriterOperations.Load(), metric.WithAttributeSet(lsm.attrs))
observer.ObserveInt64(writerInflightOperations, lsm.WriterInflightOperations.Load(), metric.WithAttributeSet(lsm.attrs))

observer.ObserveInt64(committerOperationDuration, lsm.CommitterOperationDuration.Load())
observer.ObserveInt64(committerOperations, lsm.CommitterOperations.Load())
observer.ObserveInt64(committerLogs, lsm.CommitterLogs.Load())
observer.ObserveInt64(committerOperationDuration, lsm.CommitterOperationDuration.Load(), metric.WithAttributeSet(lsm.attrs))
observer.ObserveInt64(committerOperations, lsm.CommitterOperations.Load(), metric.WithAttributeSet(lsm.attrs))
observer.ObserveInt64(committerLogs, lsm.CommitterLogs.Load(), metric.WithAttributeSet(lsm.attrs))

observer.ObserveInt64(replicateClientOperationDuration, lsm.ReplicateClientOperationDuration.Load())
observer.ObserveInt64(replicateClientOperations, lsm.ReplicateClientOperations.Load())
observer.ObserveInt64(replicateClientInflightOperations, lsm.ReplicateClientInflightOperations.Load())
observer.ObserveInt64(replicateClientOperationDuration, lsm.ReplicateClientOperationDuration.Load(), metric.WithAttributeSet(lsm.attrs))
observer.ObserveInt64(replicateClientOperations, lsm.ReplicateClientOperations.Load(), metric.WithAttributeSet(lsm.attrs))
observer.ObserveInt64(replicateClientInflightOperations, lsm.ReplicateClientInflightOperations.Load(), metric.WithAttributeSet(lsm.attrs))

observer.ObserveInt64(replicateServerOperations, lsm.ReplicateServerOperations.Load())
observer.ObserveInt64(replicateServerOperations, lsm.ReplicateServerOperations.Load(), metric.WithAttributeSet(lsm.attrs))

observer.ObserveInt64(replicateLogs, lsm.ReplicateLogs.Load())
observer.ObserveInt64(replicateBytes, lsm.ReplicateBytes.Load())
observer.ObserveInt64(replicateDuration, lsm.ReplicateDuration.Load())
observer.ObserveInt64(replicateOperations, lsm.ReplicateOperations.Load())
observer.ObserveInt64(replicatePreparationMicroseconds, lsm.ReplicatePreparationMicro.Load())
observer.ObserveInt64(replicateLogs, lsm.ReplicateLogs.Load(), metric.WithAttributeSet(lsm.attrs))
observer.ObserveInt64(replicateBytes, lsm.ReplicateBytes.Load(), metric.WithAttributeSet(lsm.attrs))
observer.ObserveInt64(replicateDuration, lsm.ReplicateDuration.Load(), metric.WithAttributeSet(lsm.attrs))
observer.ObserveInt64(replicateOperations, lsm.ReplicateOperations.Load(), metric.WithAttributeSet(lsm.attrs))
observer.ObserveInt64(replicatePreparationMicroseconds, lsm.ReplicatePreparationMicro.Load(), metric.WithAttributeSet(lsm.attrs))

return true
})
Expand All @@ -346,8 +346,14 @@ func RegisterMetrics(meter metric.Meter, snid types.StorageNodeID) (m *Metrics,
return m, nil
}

func RegisterLogStreamMetrics(m *Metrics, lsid types.LogStreamID) (*LogStreamMetrics, error) {
lsm, loaded := m.metricsMap.LoadOrStore(lsid, &LogStreamMetrics{})
func RegisterLogStreamMetrics(m *Metrics, tpid types.TopicID, lsid types.LogStreamID) (*LogStreamMetrics, error) {
attrs := attribute.NewSet(
telemetry.TopicID(tpid),
telemetry.LogStreamID(lsid),
)
lsm, loaded := m.metricsMap.LoadOrStore(lsid, &LogStreamMetrics{
attrs: attrs,
})
if loaded {
return nil, fmt.Errorf("storagenode: already registered %v", lsid)
}
Expand Down
15 changes: 10 additions & 5 deletions internal/storagenode/telemetry/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,29 @@ import (
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel"
"go.uber.org/goleak"

"github.com/kakao/varlog/pkg/types"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}

func TestRegisterLogStreamMetrics(t *testing.T) {
m, err := RegisterMetrics(otel.Meter("test"), 1)
const tpid = types.TopicID(1)
const lsid = types.LogStreamID(2)

m, err := RegisterMetrics(otel.Meter("test"))
assert.NoError(t, err)

_, err = RegisterLogStreamMetrics(m, 1)
_, err = RegisterLogStreamMetrics(m, tpid, lsid)
assert.NoError(t, err)

_, err = RegisterLogStreamMetrics(m, 1)
_, err = RegisterLogStreamMetrics(m, tpid, lsid)
assert.Error(t, err)

UnregisterLogStreamMetrics(m, 1)
UnregisterLogStreamMetrics(m, lsid)

_, err = RegisterLogStreamMetrics(m, 1)
_, err = RegisterLogStreamMetrics(m, tpid, lsid)
assert.NoError(t, err)
}
25 changes: 25 additions & 0 deletions pkg/util/telemetry/semconv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package telemetry

import (
"go.opentelemetry.io/otel/attribute"

"github.com/kakao/varlog/pkg/types"
)

const (
ClusterIDKey = attribute.Key("varlog.cluster.id")
TopicIDKey = attribute.Key("varlog.topic.id")
LogStreamIDKey = attribute.Key("varlog.logstream.id")
)

func ClusterID(val types.ClusterID) attribute.KeyValue {
return ClusterIDKey.Int(int(val))
}

func TopicID(val types.TopicID) attribute.KeyValue {
return TopicIDKey.Int(int(val))
}

func LogStreamID(val types.LogStreamID) attribute.KeyValue {
return LogStreamIDKey.Int(int(val))
}

0 comments on commit dcff02c

Please sign in to comment.