From 1821247f78754199facd4754b42e4ea152e920ee Mon Sep 17 00:00:00 2001 From: Muhammad Azeez Date: Wed, 10 Jan 2024 15:54:03 +0300 Subject: [PATCH 1/9] feat: add span_enter and span_exit to go sdk --- go/trace_ctx.go | 135 ++++++++++++++++++++++++++++++------------------ 1 file changed, 86 insertions(+), 49 deletions(-) diff --git a/go/trace_ctx.go b/go/trace_ctx.go index 8520650..1b06a7e 100644 --- a/go/trace_ctx.go +++ b/go/trace_ctx.go @@ -90,66 +90,43 @@ func (t *TraceCtx) init(ctx context.Context, r wazero.Runtime) error { functions.WithFunc(func(ctx context.Context, m api.Module, i int32) { start := time.Now() ev := <-t.raw - if ev.Kind != RawEnter { - log.Println("Expected event", RawEnter, "but got", ev.Kind) - } - t.pushFunction(CallEvent{Raw: []RawEvent{ev}, Time: start}) + + t.enter(ev, start) }).Export("instrument_enter") - functions.WithFunc(func(ctx context.Context, i int32) { - end := time.Now() - ev := <-t.raw - if ev.Kind != RawExit { - log.Println("Expected event", RawExit, "but got", ev.Kind) - return - } - fn, ok := t.peekFunction() + functions.WithFunc(func(ctx context.Context, m api.Module, ptr uint64, len uint32) { + start := time.Now() + + functionName, ok := m.Memory().Read(uint32(ptr), len) if !ok { - log.Println("Expected values on started function stack, but none were found") - return + log.Printf("span_enter: failed to read memory at offset %v with length %v\n", ptr, len) } - if ev.FunctionIndex != fn.FunctionIndex() { - log.Println("Expected call to", ev.FunctionIndex, "but found call to", fn.FunctionIndex()) - return + + ev := RawEvent{ + Kind: RawEnter, + FunctionName: string(functionName), + FunctionIndex: 0, } - fn, _ = t.popFunction() - fn.Stop(end) - fn.Raw = append(fn.Raw, ev) + t.enter(ev, start) + }).Export("span_enter") - // if there is no function left to pop, we are exiting the root function of the trace - f, ok := t.peekFunction() - if !ok { - t.events = append(t.events, fn) - return - } + functions.WithFunc(func(ctx context.Context, i int32) { + end := time.Now() + ev := <-t.raw - // if the function duration is less than minimum duration, disregard - funcDuration := fn.Duration.Microseconds() - minSpanDuration := t.Options.SpanFilter.MinDuration.Microseconds() - if funcDuration < minSpanDuration { - // check for memory allocations and attribute them to the parent span - f, ok = t.popFunction() - if ok { - for _, ev := range fn.within { - switch e := ev.(type) { - case MemoryGrowEvent: - f.within = append(f.within, e) - } - } - t.pushFunction(f) - } - return - } + t.exit(ev, end) + }).Export("instrument_exit") - // the function is within another function - f, ok = t.popFunction() - if ok { - f.within = append(f.within, fn) - t.pushFunction(f) + functions.WithFunc(func(ctx context.Context, m api.Module) { + end := time.Now() + ev := RawEvent{ + Kind: RawExit, + FunctionIndex: 0, } - }).Export("instrument_exit") + t.exit(ev, end) + }).Export("span_exit") functions.WithFunc(func(ctx context.Context, amt int32) { ev := <-t.raw @@ -185,6 +162,66 @@ func (t *TraceCtx) init(ctx context.Context, r wazero.Runtime) error { return nil } +func (t *TraceCtx) enter(ev RawEvent, start time.Time) { + if ev.Kind != RawEnter { + log.Println("Expected event", RawEnter, "but got", ev.Kind) + } + t.pushFunction(CallEvent{Raw: []RawEvent{ev}, Time: start}) +} + +func (t *TraceCtx) exit(ev RawEvent, end time.Time) { + + if ev.Kind != RawExit { + log.Println("Expected event", RawExit, "but got", ev.Kind) + return + } + fn, ok := t.peekFunction() + if !ok { + log.Println("Expected values on started function stack, but none were found") + return + } + if ev.FunctionIndex != fn.FunctionIndex() { + log.Println("Expected call to", ev.FunctionIndex, "but found call to", fn.FunctionIndex()) + return + } + + fn, _ = t.popFunction() + fn.Stop(end) + fn.Raw = append(fn.Raw, ev) + + // if there is no function left to pop, we are exiting the root function of the trace + f, ok := t.peekFunction() + if !ok { + t.events = append(t.events, fn) + return + } + + // if the function duration is less than minimum duration, disregard + funcDuration := fn.Duration.Microseconds() + minSpanDuration := t.Options.SpanFilter.MinDuration.Microseconds() + if funcDuration < minSpanDuration { + // check for memory allocations and attribute them to the parent span + f, ok = t.popFunction() + if ok { + for _, ev := range fn.within { + switch e := ev.(type) { + case MemoryGrowEvent: + f.within = append(f.within, e) + } + } + t.pushFunction(f) + } + return + } + + // the function is within another function + f, ok = t.popFunction() + if ok { + f.within = append(f.within, fn) + t.pushFunction(f) + } +} + // Pushes a function onto the stack func (t *TraceCtx) pushFunction(ev CallEvent) { t.stack = append(t.stack, ev) From 2c64bb1dfea57dc830b95daf7be24ec14ed88192 Mon Sep 17 00:00:00 2001 From: Muhammad Azeez Date: Sun, 14 Jan 2024 17:49:03 +0300 Subject: [PATCH 2/9] add more functions --- go/adapter.go | 15 ++++ go/adapter/datadog/adapter.go | 19 +++++ go/adapter/datadog_formatter/format.go | 4 + go/event.go | 53 +++++++++++++ go/listener.go | 17 ++++ go/trace_ctx.go | 106 ++++++++++++++++++++++--- 6 files changed, 205 insertions(+), 9 deletions(-) diff --git a/go/adapter.go b/go/adapter.go index 2d96149..3ba0a1a 100644 --- a/go/adapter.go +++ b/go/adapter.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log" + "strings" "time" "github.com/tetratelabs/wazero" @@ -109,6 +110,20 @@ func (b *AdapterBase) MakeOtelCallSpans(event CallEvent, parentId []byte, traceI last.Attributes = append(last.Attributes, kv) } } + if tags, ok := ev.(SpanTagsEvent); ok { + last := spans[len(spans)-1] + + for _, tag := range tags.Tags { + parts := strings.Split(tag, ":") + if len(parts) != 2 { + log.Printf("Invalid tag: %s\n", tag) + continue + } + + kv := NewOtelKeyValueString(parts[0], parts[1]) + last.Attributes = append(last.Attributes, kv) + } + } } return spans } diff --git a/go/adapter/datadog/adapter.go b/go/adapter/datadog/adapter.go index bc569a6..1f8d394 100644 --- a/go/adapter/datadog/adapter.go +++ b/go/adapter/datadog/adapter.go @@ -8,6 +8,7 @@ import ( "log" "net/http" "net/url" + "strings" "time" observe "github.com/dylibso/observe-sdk/go" @@ -84,6 +85,8 @@ func (d *DatadogAdapter) Flush(evts []observe.TraceEvent) error { log.Println("MemoryGrowEvent should be attached to a span") case observe.CustomEvent: log.Println("Datadog adapter does not respect custom events") + case observe.MetricEvent: + log.Println("MetricEvent should be attached to a span") } } @@ -193,6 +196,22 @@ func (d *DatadogAdapter) makeCallSpans(event observe.CallEvent, parentId *uint64 span := spans[len(spans)-1] span.AddAllocation(alloc.MemoryGrowAmount()) } + if metric, ok := ev.(observe.MetricEvent); ok { + _ = metric + // TODO: implement + } + if tags, ok := ev.(observe.SpanTagsEvent); ok { + span := spans[len(spans)-1] + for _, tag := range tags.Tags { + parts := strings.Split(tag, ":") + if len(parts) != 2 { + log.Printf("Invalid tag: %s\n", tag) + continue + } + + span.AddTag(parts[0], parts[1]) + } + } } return spans diff --git a/go/adapter/datadog_formatter/format.go b/go/adapter/datadog_formatter/format.go index be866ba..8928fcc 100644 --- a/go/adapter/datadog_formatter/format.go +++ b/go/adapter/datadog_formatter/format.go @@ -55,6 +55,10 @@ func (s *Span) AddAllocation(amount uint32) { } } +func (s *Span) AddTag(key, value string) { + s.Meta[key] = value +} + func New() *DatadogFormatter { return &DatadogFormatter{} } diff --git a/go/event.go b/go/event.go index 02784cb..0076f37 100644 --- a/go/event.go +++ b/go/event.go @@ -13,6 +13,9 @@ const ( RawEnter RawEventKind = iota RawExit RawMemoryGrow + RawMetric + RawSpanTags + RawLog ) type EventKind int @@ -21,6 +24,15 @@ const ( Call EventKind = iota MemoryGrow Custom + Metric + SpanTags + Log +) + +type MetricFormat int + +const ( + Statsd MetricFormat = 1 ) // Represents the raw event in our Observe form. @@ -77,11 +89,40 @@ func (e CustomEvent) RawEvents() []RawEvent { return []RawEvent{} } +type MetricEvent struct { + Raw RawEvent + Time time.Time + Format MetricFormat + Message string +} + +type SpanTagsEvent struct { + Raw RawEvent + Time time.Time + Tags []string +} + type MemoryGrowEvent struct { Raw RawEvent Time time.Time } +type LogLevel int + +const ( + Error LogLevel = 1 + Warn = 2 + Info = 3 + Debug = 4 +) + +type LogEvent struct { + Raw RawEvent + Time time.Time + Message string + Level LogLevel +} + func (e MemoryGrowEvent) RawEvents() []RawEvent { return []RawEvent{e.Raw} } @@ -113,3 +154,15 @@ func (e CallEvent) FunctionIndex() uint32 { func (e MemoryGrowEvent) MemoryGrowAmount() uint32 { return e.Raw.MemoryGrowAmount } + +func (e MetricEvent) RawEvents() []RawEvent { + return []RawEvent{e.Raw} +} + +func (e SpanTagsEvent) RawEvents() []RawEvent { + return []RawEvent{e.Raw} +} + +func (e LogEvent) RawEvents() []RawEvent { + return []RawEvent{e.Raw} +} diff --git a/go/listener.go b/go/listener.go index 0b81922..523ec48 100644 --- a/go/listener.go +++ b/go/listener.go @@ -38,6 +38,23 @@ func (t *TraceCtx) Before(ctx context.Context, _ api.Module, def api.FunctionDef case "instrument_memory_grow": event.Kind = RawMemoryGrow event.MemoryGrowAmount = uint32(inputs[0]) + case "span_enter": + event.Kind = RawEnter + event.FunctionIndex = uint32(inputs[0]) + + // manual events + case "span_exit": + event.Kind = RawExit + + case "metric": + event.Kind = RawMetric + + case "span_tags": + event.Kind = RawSpanTags + + case "log": + event.Kind = RawLog + default: return } diff --git a/go/trace_ctx.go b/go/trace_ctx.go index 1b06a7e..59bac66 100644 --- a/go/trace_ctx.go +++ b/go/trace_ctx.go @@ -3,6 +3,7 @@ package observe import ( "context" "log" + "strings" "time" "github.com/tetratelabs/wazero" @@ -96,17 +97,14 @@ func (t *TraceCtx) init(ctx context.Context, r wazero.Runtime) error { functions.WithFunc(func(ctx context.Context, m api.Module, ptr uint64, len uint32) { start := time.Now() + ev := <-t.raw functionName, ok := m.Memory().Read(uint32(ptr), len) if !ok { log.Printf("span_enter: failed to read memory at offset %v with length %v\n", ptr, len) } - ev := RawEvent{ - Kind: RawEnter, - FunctionName: string(functionName), - FunctionIndex: 0, - } + ev.FunctionName = string(functionName) t.enter(ev, start) }).Export("span_enter") @@ -120,10 +118,7 @@ func (t *TraceCtx) init(ctx context.Context, r wazero.Runtime) error { functions.WithFunc(func(ctx context.Context, m api.Module) { end := time.Now() - ev := RawEvent{ - Kind: RawExit, - FunctionIndex: 0, - } + ev := <-t.raw t.exit(ev, end) }).Export("span_exit") @@ -155,6 +150,99 @@ func (t *TraceCtx) init(ctx context.Context, r wazero.Runtime) error { t.pushFunction(fn) }).Export("instrument_memory_grow") + functions.WithFunc(func(ctx context.Context, m api.Module, f int32, ptr int64, len int32) { + format := MetricFormat(f) + buffer, ok := m.Memory().Read(uint32(ptr), uint32(len)) + if !ok { + log.Printf("metric: failed to read memory at offset %v with length %v\n", ptr, len) + } + + ev := <-t.raw + if ev.Kind != RawMetric { + log.Println("Expected event", Metric, "but got", ev.Kind) + return + } + + event := MetricEvent{ + Time: time.Now(), + Format: format, + Message: string(buffer), + Raw: ev, + } + + fn, ok := t.popFunction() + if !ok { + t.events = append(t.events, event) + return + } + fn.within = append(fn.within, event) + t.pushFunction(fn) + + }).Export("metric") + + functions.WithFunc(func(ctx context.Context, m api.Module, ptr int64, len int32) { + buffer, ok := m.Memory().Read(uint32(ptr), uint32(len)) + if !ok { + log.Printf("metric: failed to read memory at offset %v with length %v\n", ptr, len) + } + + ev := <-t.raw + if ev.Kind != RawSpanTags { + log.Println("Expected event", Metric, "but got", ev.Kind) + return + } + + event := SpanTagsEvent{ + Time: time.Now(), + Raw: ev, + Tags: strings.Split(string(buffer), ","), + } + + fn, ok := t.popFunction() + if !ok { + t.events = append(t.events, event) + return + } + fn.within = append(fn.within, event) + t.pushFunction(fn) + + }).Export("span_tags") + + functions.WithFunc(func(ctx context.Context, m api.Module, l int32, ptr int64, len int32) { + if l < int32(Error) || l > int32(Debug) { + log.Printf("log: invalid log level %v\n", l) + } + + level := LogLevel(l) + + buffer, ok := m.Memory().Read(uint32(ptr), uint32(len)) + if !ok { + log.Printf("metric: failed to read memory at offset %v with length %v\n", ptr, len) + } + + ev := <-t.raw + if ev.Kind != RawLog { + log.Println("Expected event", Metric, "but got", ev.Kind) + return + } + + event := LogEvent{ + Time: time.Now(), + Raw: ev, + Level: level, + Message: string(buffer), + } + + fn, ok := t.popFunction() + if !ok { + t.events = append(t.events, event) + return + } + fn.within = append(fn.within, event) + t.pushFunction(fn) + + }).Export("log") + _, err := observe.Instantiate(ctx) if err != nil { return err From f8f95717e41165371c9b335b0a483d8e2217c4c2 Mon Sep 17 00:00:00 2001 From: Muhammad Azeez Date: Mon, 15 Jan 2024 10:47:04 +0300 Subject: [PATCH 3/9] test stdout adapter --- go/adapter/stdout/adapter.go | 9 +++++++++ go/bin/stdout/main.go | 11 ++++++++++- go/listener.go | 9 ++------- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/go/adapter/stdout/adapter.go b/go/adapter/stdout/adapter.go index 2a5f1cb..6b18a23 100644 --- a/go/adapter/stdout/adapter.go +++ b/go/adapter/stdout/adapter.go @@ -55,6 +55,15 @@ func (s *StdoutAdapter) printEvents(event observe.CallEvent, indentation int) { if alloc, ok := event.(observe.MemoryGrowEvent); ok { log.Println(strings.Repeat(" ", indentation), "Allocated", alloc.MemoryGrowAmount(), "pages of memory in", name) } + if metric, ok := event.(observe.MetricEvent); ok { + log.Println(strings.Repeat(" ", indentation), "Metric", metric.Message, "Format", metric.Format) + } + if l, ok := event.(observe.LogEvent); ok { + log.Println(strings.Repeat(" ", indentation), "Log", l.Message) + } + if spanTags, ok := event.(observe.SpanTagsEvent); ok { + log.Println(strings.Repeat(" ", indentation), "Span tags:", spanTags.Tags) + } } } diff --git a/go/bin/stdout/main.go b/go/bin/stdout/main.go index b084a79..f0e4254 100644 --- a/go/bin/stdout/main.go +++ b/go/bin/stdout/main.go @@ -6,6 +6,7 @@ import ( "os" "time" + observe "github.com/dylibso/observe-sdk/go" "github.com/dylibso/observe-sdk/go/adapter/stdout" "github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" @@ -27,7 +28,15 @@ func main() { cfg := wazero.NewRuntimeConfig().WithCustomSections(true) rt := wazero.NewRuntimeWithConfig(ctx, cfg) - traceCtx, err := adapter.NewTraceCtx(ctx, rt, wasm, nil) + + opts := observe.Options{ + ChannelBufferSize: 1024, + SpanFilter: &observe.SpanFilter{ + MinDuration: 0, + }, + } + + traceCtx, err := adapter.NewTraceCtx(ctx, rt, wasm, &opts) if err != nil { log.Panicln(err) } diff --git a/go/listener.go b/go/listener.go index 523ec48..2bd4329 100644 --- a/go/listener.go +++ b/go/listener.go @@ -38,23 +38,18 @@ func (t *TraceCtx) Before(ctx context.Context, _ api.Module, def api.FunctionDef case "instrument_memory_grow": event.Kind = RawMemoryGrow event.MemoryGrowAmount = uint32(inputs[0]) - case "span_enter": - event.Kind = RawEnter - event.FunctionIndex = uint32(inputs[0]) // manual events + case "span_enter": + event.Kind = RawEnter case "span_exit": event.Kind = RawExit - case "metric": event.Kind = RawMetric - case "span_tags": event.Kind = RawSpanTags - case "log": event.Kind = RawLog - default: return } From 99ccdae622859017a1dc4897c9bad57571298e34 Mon Sep 17 00:00:00 2001 From: Muhammad Azeez Date: Mon, 15 Jan 2024 10:56:48 +0300 Subject: [PATCH 4/9] test otelstdout --- go/bin/otelstdout/main.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/go/bin/otelstdout/main.go b/go/bin/otelstdout/main.go index 8746f43..e429c6a 100644 --- a/go/bin/otelstdout/main.go +++ b/go/bin/otelstdout/main.go @@ -6,6 +6,7 @@ import ( "os" "time" + observe "github.com/dylibso/observe-sdk/go" "github.com/dylibso/observe-sdk/go/adapter/otel_stdout" "github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" @@ -27,7 +28,14 @@ func main() { cfg := wazero.NewRuntimeConfig().WithCustomSections(true) rt := wazero.NewRuntimeWithConfig(ctx, cfg) - traceCtx, err := adapter.NewTraceCtx(ctx, rt, wasm, nil) + opts := observe.Options{ + ChannelBufferSize: 1024, + SpanFilter: &observe.SpanFilter{ + MinDuration: 0, + }, + } + + traceCtx, err := adapter.NewTraceCtx(ctx, rt, wasm, &opts) if err != nil { log.Panicln(err) } From a286c0919825620160aa7ef840bb327fbcc9580e Mon Sep 17 00:00:00 2001 From: Muhammad Azeez Date: Tue, 16 Jan 2024 16:52:41 +0300 Subject: [PATCH 5/9] logs and metrics --- go/adapter.go | 49 ++++++- go/adapter/honeycomb/adapter.go | 4 +- go/adapter/lightstep/adapter.go | 2 +- go/adapter/opentelemetry/adapter.go | 4 +- go/adapter/otel_stdout/adapter.go | 7 +- go/bin/otelstdout/main.go | 52 ++++++- go/go.mod | 14 +- go/go.sum | 17 +++ go/statsd.go | 114 +++++++++++++++ go/statsd_test.go | 206 ++++++++++++++++++++++++++++ 10 files changed, 454 insertions(+), 15 deletions(-) create mode 100644 go/statsd.go create mode 100644 go/statsd_test.go diff --git a/go/adapter.go b/go/adapter.go index 3ba0a1a..46cd5ea 100644 --- a/go/adapter.go +++ b/go/adapter.go @@ -8,6 +8,7 @@ import ( "time" "github.com/tetratelabs/wazero" + "go.opentelemetry.io/otel/metric" trace "go.opentelemetry.io/proto/otlp/trace/v1" ) @@ -89,7 +90,7 @@ func (b *AdapterBase) Stop(wait bool) { } // MakeOtelCallSpans recursively constructs call spans in open telemetry format -func (b *AdapterBase) MakeOtelCallSpans(event CallEvent, parentId []byte, traceId string) []*trace.Span { +func (b *AdapterBase) MakeOtelCallSpans(event CallEvent, parentId []byte, traceId string, meter *metric.Meter) []*trace.Span { name := event.FunctionName() span := NewOtelSpan(traceId, parentId, name, event.Time, event.Time.Add(event.Duration)) span.Attributes = append(span.Attributes, NewOtelKeyValueString("function-name", fmt.Sprintf("function-call-%s", name))) @@ -97,7 +98,7 @@ func (b *AdapterBase) MakeOtelCallSpans(event CallEvent, parentId []byte, traceI spans := []*trace.Span{span} for _, ev := range event.Within() { if call, ok := ev.(CallEvent); ok { - spans = append(spans, b.MakeOtelCallSpans(call, span.SpanId, traceId)...) + spans = append(spans, b.MakeOtelCallSpans(call, span.SpanId, traceId, meter)...) } if alloc, ok := ev.(MemoryGrowEvent); ok { last := spans[len(spans)-1] @@ -124,6 +125,50 @@ func (b *AdapterBase) MakeOtelCallSpans(event CallEvent, parentId []byte, traceI last.Attributes = append(last.Attributes, kv) } } + if metric, ok := ev.(MetricEvent); ok && meter != nil { + + if metric.Format != Statsd { + log.Printf("Unsupported metric format: %v\n", metric.Format) + continue + } + + datagram, err := parseStatsdDataGram(metric.Message) + if err != nil { + log.Printf("Failed to parse statsd datagram: %v\n", err) + continue + } + + ctx := context.Background() + + m := *meter + + // TODO: maybe we should also support int64 metrics? + // TODO: double check this + // TODO: timestamps? + switch datagram.Type { + case StatsdCounter: + counter, _ := m.Float64Counter(datagram.Name) + counter.Add(ctx, datagram.Value) + case StatsdGauge: + gauge, _ := m.Float64UpDownCounter(datagram.Name) + gauge.Add(ctx, datagram.Value) + case StatsdTiming, StatsdHistogram: + histogram, _ := m.Float64Histogram(datagram.Name) + histogram.Record(ctx, datagram.Value) + case StatsdSet: + // TODO: how to support sets? + } + } + if log, ok := ev.(LogEvent); ok { + // TODO: since logs are not implemented in otel go, can we use span events instead? + last := spans[len(spans)-1] + event := trace.Span_Event{ + Name: log.Message, + TimeUnixNano: uint64(log.Time.UnixNano()), + } + + last.Events = append(last.Events, &event) + } } return spans } diff --git a/go/adapter/honeycomb/adapter.go b/go/adapter/honeycomb/adapter.go index 549a2d4..38ca6f5 100644 --- a/go/adapter/honeycomb/adapter.go +++ b/go/adapter/honeycomb/adapter.go @@ -9,6 +9,7 @@ import ( "time" observe "github.com/dylibso/observe-sdk/go" + "go.opentelemetry.io/otel/metric" trace "go.opentelemetry.io/proto/otlp/trace/v1" proto "google.golang.org/protobuf/proto" ) @@ -19,6 +20,7 @@ type HoneycombConfig struct { EmitTracesInterval time.Duration TraceBatchMax uint32 Host string + Meter *metric.Meter } type HoneycombAdapter struct { @@ -54,7 +56,7 @@ func (h *HoneycombAdapter) Flush(evts []observe.TraceEvent) error { for _, e := range te.Events { switch event := e.(type) { case observe.CallEvent: // TODO: consider renaming to FunctionCall for consistency across Rust & JS - spans := h.MakeOtelCallSpans(event, nil, traceId) + spans := h.MakeOtelCallSpans(event, nil, traceId, h.Config.Meter) if len(spans) > 0 { allSpans = append(allSpans, spans...) } diff --git a/go/adapter/lightstep/adapter.go b/go/adapter/lightstep/adapter.go index f5168df..ce499c1 100644 --- a/go/adapter/lightstep/adapter.go +++ b/go/adapter/lightstep/adapter.go @@ -54,7 +54,7 @@ func (l *LightstepAdapter) Flush(evts []observe.TraceEvent) error { for _, e := range te.Events { switch event := e.(type) { case observe.CallEvent: // TODO: consider renaming to FunctionCall for consistency across Rust & JS - spans := l.MakeOtelCallSpans(event, nil, traceId) + spans := l.MakeOtelCallSpans(event, nil, traceId, nil) if len(spans) > 0 { allSpans = append(allSpans, spans...) } diff --git a/go/adapter/opentelemetry/adapter.go b/go/adapter/opentelemetry/adapter.go index 1a396f8..36b08ac 100644 --- a/go/adapter/opentelemetry/adapter.go +++ b/go/adapter/opentelemetry/adapter.go @@ -37,7 +37,7 @@ type OTelAdapter struct { Config *OTelConfig } -// UseCustomClient accepts a pre-initialized client to allow for customization of how to get data into a collector +// UseCustomClient accepts a pre-initialized client to allow for customization of how to get data into a collector func (a *OTelAdapter) UseCustomClient(client otlptrace.Client) { if a.Config != nil { a.Config.client = client @@ -114,7 +114,7 @@ func (o *OTelAdapter) Flush(evts []observe.TraceEvent) error { for _, e := range te.Events { switch event := e.(type) { case observe.CallEvent: // TODO: consider renaming to FunctionCall for consistency across Rust & JS - spans := o.MakeOtelCallSpans(event, nil, traceId) + spans := o.MakeOtelCallSpans(event, nil, traceId, nil) if len(spans) > 0 { allSpans = append(allSpans, spans...) } diff --git a/go/adapter/otel_stdout/adapter.go b/go/adapter/otel_stdout/adapter.go index ada060b..b383fe1 100644 --- a/go/adapter/otel_stdout/adapter.go +++ b/go/adapter/otel_stdout/adapter.go @@ -7,17 +7,20 @@ import ( "log" observe "github.com/dylibso/observe-sdk/go" + "go.opentelemetry.io/otel/metric" trace "go.opentelemetry.io/proto/otlp/trace/v1" ) type OtelStdoutAdapter struct { *observe.AdapterBase + Meter *metric.Meter } -func NewOtelStdoutAdapter() *OtelStdoutAdapter { +func NewOtelStdoutAdapter(meter *metric.Meter) *OtelStdoutAdapter { base := observe.NewAdapterBase(1, 0) adapter := &OtelStdoutAdapter{ AdapterBase: &base, + Meter: meter, } adapter.AdapterBase.SetFlusher(adapter) @@ -37,7 +40,7 @@ func (o *OtelStdoutAdapter) Flush(evts []observe.TraceEvent) error { for _, e := range te.Events { switch event := e.(type) { case observe.CallEvent: - spans := o.MakeOtelCallSpans(event, nil, traceId) + spans := o.MakeOtelCallSpans(event, nil, traceId, o.Meter) if len(spans) > 0 { allSpans = append(allSpans, spans...) } diff --git a/go/bin/otelstdout/main.go b/go/bin/otelstdout/main.go index e429c6a..f08f199 100644 --- a/go/bin/otelstdout/main.go +++ b/go/bin/otelstdout/main.go @@ -10,13 +10,40 @@ import ( "github.com/dylibso/observe-sdk/go/adapter/otel_stdout" "github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.19.0" ) func main() { ctx := context.Background() + res, err := newResource() + if err != nil { + panic(err) + } + + // Create a meter provider. + // You can pass this instance directly to your instrumented code if it + // accepts a MeterProvider instance. + meterProvider, err := newMeterProvider(res) + if err != nil { + panic(err) + } + + // Handle shutdown properly so nothing leaks. + defer func() { + if err := meterProvider.Shutdown(context.Background()); err != nil { + log.Println(err) + } + }() + + var meter = otel.Meter("my-service-meter") + // we only need to create and start once per instance of our host app - adapter := otel_stdout.NewOtelStdoutAdapter() + adapter := otel_stdout.NewOtelStdoutAdapter(&meter) defer adapter.Stop(true) adapter.Start(ctx) @@ -56,3 +83,26 @@ func main() { traceCtx.Finish() time.Sleep(2 * time.Second) } + +func newResource() (*resource.Resource, error) { + return resource.Merge(resource.Default(), + resource.NewWithAttributes(semconv.SchemaURL, + semconv.ServiceName("my-service"), + semconv.ServiceVersion("0.1.0"), + )) +} + +func newMeterProvider(res *resource.Resource) (*metric.MeterProvider, error) { + metricExporter, err := stdoutmetric.New() + if err != nil { + return nil, err + } + + meterProvider := metric.NewMeterProvider( + metric.WithResource(res), + metric.WithReader(metric.NewPeriodicReader(metricExporter, + // Default is 1m. Set to 3s for demonstrative purposes. + metric.WithInterval(1*time.Second))), + ) + return meterProvider, nil +} diff --git a/go/go.mod b/go/go.mod index 4178589..cb00c3d 100644 --- a/go/go.mod +++ b/go/go.mod @@ -18,16 +18,18 @@ require ( require ( github.com/cenkalti/backoff/v4 v4.2.1 // indirect - github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/logr v1.3.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect - go.opentelemetry.io/otel v1.18.0 // indirect - go.opentelemetry.io/otel/metric v1.18.0 // indirect - go.opentelemetry.io/otel/sdk v1.18.0 // indirect - go.opentelemetry.io/otel/trace v1.18.0 // indirect + go.opentelemetry.io/otel v1.21.0 // indirect + go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.44.0 // indirect + go.opentelemetry.io/otel/metric v1.21.0 // indirect + go.opentelemetry.io/otel/sdk v1.21.0 // indirect + go.opentelemetry.io/otel/sdk/metric v1.21.0 // indirect + go.opentelemetry.io/otel/trace v1.21.0 // indirect golang.org/x/net v0.17.0 // indirect - golang.org/x/sys v0.13.0 // indirect + golang.org/x/sys v0.14.0 // indirect golang.org/x/text v0.13.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect diff --git a/go/go.sum b/go/go.sum index 3f28ebc..8e8587f 100644 --- a/go/go.sum +++ b/go/go.sum @@ -4,6 +4,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= +github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/golang/glog v1.1.0 h1:/d3pCKDPWNnvIWe0vVUpNP32qc8U3PDVxySP/y360qE= @@ -12,6 +14,7 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab h1:BA4a7pe6ZTd9F8kXETBoijjFJ/ntaa//1wiH9BZu4zU= @@ -24,18 +27,30 @@ github.com/tetratelabs/wazero v1.2.1 h1:J4X2hrGzJvt+wqltuvcSjHQ7ujQxA9gb6PeMs4ql github.com/tetratelabs/wazero v1.2.1/go.mod h1:wYx2gNRg8/WihJfSDxA1TIL8H+GkfLYm+bIfbblu9VQ= go.opentelemetry.io/otel v1.18.0 h1:TgVozPGZ01nHyDZxK5WGPFB9QexeTMXEH7+tIClWfzs= go.opentelemetry.io/otel v1.18.0/go.mod h1:9lWqYO0Db579XzVuCKFNPDl4s73Voa+zEck3wHaAYQI= +go.opentelemetry.io/otel v1.21.0 h1:hzLeKBZEL7Okw2mGzZ0cc4k/A7Fta0uoPgaJCr8fsFc= +go.opentelemetry.io/otel v1.21.0/go.mod h1:QZzNPQPm1zLX4gZK4cMi+71eaorMSGT3A4znnUvNNEo= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.18.0 h1:IAtl+7gua134xcV3NieDhJHjjOVeJhXAnYf/0hswjUY= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.18.0/go.mod h1:w+pXobnBzh95MNIkeIuAKcHe/Uu/CX2PKIvBP6ipKRA= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.18.0 h1:yE32ay7mJG2leczfREEhoW3VfSZIvHaB+gvVo1o8DQ8= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.18.0/go.mod h1:G17FHPDLt74bCI7tJ4CMitEk4BXTYG4FW6XUpkPBXa4= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.18.0 h1:6pu8ttx76BxHf+xz/H77AUZkPF3cwWzXqAUsXhVKI18= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.18.0/go.mod h1:IOmXxPrxoxFMXdNy7lfDmE8MzE61YPcurbUm0SMjerI= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.44.0 h1:dEZWPjVN22urgYCza3PXRUGEyCB++y1sAqm6guWFesk= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v0.44.0/go.mod h1:sTt30Evb7hJB/gEk27qLb1+l9n4Tb8HvHkR0Wx3S6CU= go.opentelemetry.io/otel/metric v1.18.0 h1:JwVzw94UYmbx3ej++CwLUQZxEODDj/pOuTCvzhtRrSQ= go.opentelemetry.io/otel/metric v1.18.0/go.mod h1:nNSpsVDjWGfb7chbRLUNW+PBNdcSTHD4Uu5pfFMOI0k= +go.opentelemetry.io/otel/metric v1.21.0 h1:tlYWfeo+Bocx5kLEloTjbcDwBuELRrIFxwdQ36PlJu4= +go.opentelemetry.io/otel/metric v1.21.0/go.mod h1:o1p3CA8nNHW8j5yuQLdc1eeqEaPfzug24uvsyIEJRWM= go.opentelemetry.io/otel/sdk v1.18.0 h1:e3bAB0wB3MljH38sHzpV/qWrOTCFrdZF2ct9F8rBkcY= go.opentelemetry.io/otel/sdk v1.18.0/go.mod h1:1RCygWV7plY2KmdskZEDDBs4tJeHG92MdHZIluiYs/M= +go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8= +go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E= +go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6RfAY4ICcR0= +go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q= go.opentelemetry.io/otel/trace v1.18.0 h1:NY+czwbHbmndxojTEKiSMHkG2ClNH2PwmcHrdo0JY10= go.opentelemetry.io/otel/trace v1.18.0/go.mod h1:T2+SGJGuYZY3bjj5rgh/hN7KIrlpWC5nS8Mjvzckz+0= +go.opentelemetry.io/otel/trace v1.21.0 h1:WD9i5gzvoUPuXIXH24ZNBudiarZDKuekPqi/E8fpfLc= +go.opentelemetry.io/otel/trace v1.21.0/go.mod h1:LGbsEB0f9LGjN+OZaQQ26sohbOmiMR+BaslueVtS/qQ= go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= @@ -43,6 +58,8 @@ golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q= +golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/go/statsd.go b/go/statsd.go new file mode 100644 index 0000000..4136631 --- /dev/null +++ b/go/statsd.go @@ -0,0 +1,114 @@ +package observe + +import ( + "fmt" + "strconv" + "strings" +) + +type StatsdCounterType int + +const ( + StatsdCounter StatsdCounterType = iota + StatsdGauge + StatsdTiming + StatsdHistogram + StatsdSet +) + +type StatsdDataGram struct { + Name string + Type StatsdCounterType + Value float64 + SampleRate float64 + HasSampleRate bool + Tags map[string]string +} + +func parseStatsdDataGram(message string) (StatsdDataGram, error) { + // https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/?tab=metrics#the-dogstatsd-protocol + // :||@|#:, + + m := StatsdDataGram{ + Tags: make(map[string]string), + } + + parts := strings.SplitN(message, ":", 2) + if len(parts) != 2 { + return m, fmt.Errorf("Expected a ':' in %s", message) + } + + // 1. read the metric name + m.Name = parts[0] + + parts = strings.Split(parts[1], "|") + if len(parts) < 2 { + return m, fmt.Errorf("Expected '|' in %s", message) + } + + // 2. read the metric type + rawType := parts[1] + var rawValue string + + // 3. read the metric value + // TODO: add support for int64 metrics + // TODO: add support for rate and distribution metrics: https://docs.datadoghq.com/metrics/types/?tab=gauge#metric-types + switch rawType { + case "c": + m.Type = StatsdCounter + rawValue = parts[0] + case "g": + m.Type = StatsdGauge + part := parts[0] + + // Guages can have a + or - prefix + if part[0] == '+' || part[0] == '-' { + rawValue = part[1:] + } else { + rawValue = part + } + case "ms": + m.Type = StatsdTiming + rawValue = parts[0] + case "h": + m.Type = StatsdHistogram + rawValue = parts[0] + case "s": + m.Type = StatsdSet + rawValue = parts[0] + default: + return m, fmt.Errorf("Unknown type %s", rawType) + } + + value, err := strconv.ParseFloat(rawValue, 64) + if err != nil { + return m, fmt.Errorf("Failed to parse %s as a float: %e", rawValue, err) + } + + m.Value = value + + // 4. read the sample rate and tags + for _, part := range parts[2:] { + if part[0] == '@' { + sampleRate, err := strconv.ParseFloat(part[1:], 64) + if err != nil { + return m, fmt.Errorf("Failed to parse %s as a float: %e", part[1:], err) + } + + m.SampleRate = sampleRate + m.HasSampleRate = true + } else if part[0] == '#' { + tags := strings.Split(part[1:], ",") + for _, tag := range tags { + tagParts := strings.Split(tag, ":") + if len(tagParts) != 2 { + return m, fmt.Errorf("Expected a ':' in %s", tag) + } + + m.Tags[tagParts[0]] = tagParts[1] + } + } + } + + return m, nil +} diff --git a/go/statsd_test.go b/go/statsd_test.go new file mode 100644 index 0000000..a3333f5 --- /dev/null +++ b/go/statsd_test.go @@ -0,0 +1,206 @@ +package observe + +import ( + "testing" +) + +func TestParseStatsdDataGram_Simple(t *testing.T) { + message := "my-counter:1.98|c" + datagram, err := parseStatsdDataGram(message) + if err != nil { + t.Errorf("parseStatsdDataGram() error = %v", err) + } + + if datagram.Name != "my-counter" { + t.Errorf("expected name to be 'my-counter', got %s", datagram.Name) + } else if datagram.Type != StatsdCounter { + t.Errorf("expected type to be StatsdCounter, got %d", datagram.Type) + } else if datagram.Value != 1.98 { + t.Errorf("expected value to be 1.98, got %f", datagram.Value) + } else if datagram.HasSampleRate == true { + t.Errorf("expected HasSampleRate to be false, got %v", datagram.HasSampleRate) + } else if len(datagram.Tags) != 0 { + t.Errorf("expected 0 tags, got %d", len(datagram.Tags)) + } +} + +func TestParseStatsdDataGram_Counter_NoTags(t *testing.T) { + message := "my-counter:1.98|c|@0.1" + datagram, err := parseStatsdDataGram(message) + if err != nil { + t.Errorf("parseStatsdDataGram() error = %v", err) + } + + if datagram.Name != "my-counter" { + t.Errorf("expected name to be 'my-counter', got %s", datagram.Name) + } else if datagram.Type != StatsdCounter { + t.Errorf("expected type to be StatsdCounter, got %d", datagram.Type) + } else if datagram.Value != 1.98 { + t.Errorf("expected value to be 1.98, got %f", datagram.Value) + } else if datagram.HasSampleRate == false { + t.Errorf("expected HasSampleRate to be true, got %v", datagram.HasSampleRate) + } else if datagram.SampleRate != 0.1 { + t.Errorf("expected sample rate to be 0.1, got %f", datagram.SampleRate) + } else if len(datagram.Tags) != 0 { + t.Errorf("expected 0 tags, got %d", len(datagram.Tags)) + } +} +func TestParseStatsdDataGram_Counter_NoSampleRate(t *testing.T) { + message := "my-counter:1.98|c|#tag1:value1,tag2:value2" + datagram, err := parseStatsdDataGram(message) + if err != nil { + t.Errorf("parseStatsdDataGram() error = %v", err) + } + + if datagram.Name != "my-counter" { + t.Errorf("expected name to be 'my-counter', got %s", datagram.Name) + } else if datagram.Type != StatsdCounter { + t.Errorf("expected type to be StatsdCounter, got %d", datagram.Type) + } else if datagram.Value != 1.98 { + t.Errorf("expected value to be 1.98, got %f", datagram.Value) + } else if datagram.HasSampleRate == true { + t.Errorf("expected HasSampleRate to be false, got %v", datagram.HasSampleRate) + } else if len(datagram.Tags) != 2 { + t.Errorf("expected 2 tags, got %d", len(datagram.Tags)) + } else if datagram.Tags["tag1"] != "value1" { + t.Errorf("expected tag1 to be 'value1', got %s", datagram.Tags["tag1"]) + } else if datagram.Tags["tag2"] != "value2" { + t.Errorf("expected tag2 to be 'value2', got %s", datagram.Tags["tag2"]) + } +} + +func TestParseStatsdDataGram_Counter(t *testing.T) { + message := "my-counter:1.98|c|@0.1|#tag1:value1,tag2:value2" + datagram, err := parseStatsdDataGram(message) + if err != nil { + t.Errorf("parseStatsdDataGram() error = %v", err) + } + + if datagram.Name != "my-counter" { + t.Errorf("expected name to be 'my-counter', got %s", datagram.Name) + } else if datagram.Type != StatsdCounter { + t.Errorf("expected type to be StatsdCounter, got %d", datagram.Type) + } else if datagram.Value != 1.98 { + t.Errorf("expected value to be 1.98, got %f", datagram.Value) + } else if datagram.SampleRate != 0.1 { + t.Errorf("expected sample rate to be 0.1, got %f", datagram.SampleRate) + } else if len(datagram.Tags) != 2 { + t.Errorf("expected 2 tags, got %d", len(datagram.Tags)) + } else if datagram.Tags["tag1"] != "value1" { + t.Errorf("expected tag1 to be 'value1', got %s", datagram.Tags["tag1"]) + } else if datagram.Tags["tag2"] != "value2" { + t.Errorf("expected tag2 to be 'value2', got %s", datagram.Tags["tag2"]) + } +} + +func TestParseStatsdDataGram_Gauge(t *testing.T) { + message := "my-guage:8723.042|g|@0.001|#tag1:value1,tag2:value2" + datagram, err := parseStatsdDataGram(message) + if err != nil { + t.Errorf("parseStatsdDataGram() error = %v", err) + } + + if datagram.Name != "my-guage" { + t.Errorf("expected name to be 'my-guage', got %s", datagram.Name) + } else if datagram.Type != StatsdGauge { + t.Errorf("expected type to be StatsdGauge, got %d", datagram.Type) + } else if datagram.Value != 8723.042 { + t.Errorf("expected value to be 8723.042, got %f", datagram.Value) + } else if datagram.SampleRate != 0.001 { + t.Errorf("expected sample rate to be 0.001, got %f", datagram.SampleRate) + } else if len(datagram.Tags) != 2 { + t.Errorf("expected 2 tags, got %d", len(datagram.Tags)) + } else if datagram.Tags["tag1"] != "value1" { + t.Errorf("expected tag1 to be 'value1', got %s", datagram.Tags["tag1"]) + } else if datagram.Tags["tag2"] != "value2" { + t.Errorf("expected tag2 to be 'value2', got %s", datagram.Tags["tag2"]) + } +} + +func TestParseStatsdDataGram_Timing(t *testing.T) { + message := "my-timing:1|ms|@0.1|#tag1:value1,tag2:value2" + datagram, err := parseStatsdDataGram(message) + if err != nil { + t.Errorf("parseStatsdDataGram() error = %v", err) + } + + if datagram.Name != "my-timing" { + t.Errorf("expected name to be 'my-timing', got %s", datagram.Name) + } else if datagram.Type != StatsdTiming { + t.Errorf("expected type to be StatsdTiming, got %d", datagram.Type) + } else if datagram.Value != 1 { + t.Errorf("expected value to be 1, got %f", datagram.Value) + } else if datagram.SampleRate != 0.1 { + t.Errorf("expected sample rate to be 0.1, got %f", datagram.SampleRate) + } else if len(datagram.Tags) != 2 { + t.Errorf("expected 2 tags, got %d", len(datagram.Tags)) + } else if datagram.Tags["tag1"] != "value1" { + t.Errorf("expected tag1 to be 'value1', got %s", datagram.Tags["tag1"]) + } else if datagram.Tags["tag2"] != "value2" { + t.Errorf("expected tag2 to be 'value2', got %s", datagram.Tags["tag2"]) + } +} + +func TestParseStatsdDataGram_Histogram(t *testing.T) { + message := "my-histogram:1|h|@0.1|#tag1:value1,tag2:value2" + datagram, err := parseStatsdDataGram(message) + if err != nil { + t.Errorf("parseStatsdDataGram() error = %v", err) + } + + if datagram.Name != "my-histogram" { + t.Errorf("expected name to be 'my-histogram', got %s", datagram.Name) + } else if datagram.Type != StatsdHistogram { + t.Errorf("expected type to be StatsdHistogram, got %d", datagram.Type) + } else if datagram.Value != 1 { + t.Errorf("expected value to be 1, got %f", datagram.Value) + } else if datagram.SampleRate != 0.1 { + t.Errorf("expected sample rate to be 0.1, got %f", datagram.SampleRate) + } else if len(datagram.Tags) != 2 { + t.Errorf("expected 2 tags, got %d", len(datagram.Tags)) + } else if datagram.Tags["tag1"] != "value1" { + t.Errorf("expected tag1 to be 'value1', got %s", datagram.Tags["tag1"]) + } else if datagram.Tags["tag2"] != "value2" { + t.Errorf("expected tag2 to be 'value2', got %s", datagram.Tags["tag2"]) + } +} + +func TestParseStatsdDataGram_Set(t *testing.T) { + message := "my-set:1|s|@0.1|#tag1:value1,tag2:value2" + datagram, err := parseStatsdDataGram(message) + if err != nil { + t.Errorf("parseStatsdDataGram() error = %v", err) + } + + if datagram.Name != "my-set" { + t.Errorf("expected name to be 'my-set', got %s", datagram.Name) + } else if datagram.Type != StatsdSet { + t.Errorf("expected type to be StatsdSet, got %d", datagram.Type) + } else if datagram.Value != 1 { + t.Errorf("expected value to be 1, got %f", datagram.Value) + } else if datagram.SampleRate != 0.1 { + t.Errorf("expected sample rate to be 0.1, got %f", datagram.SampleRate) + } else if len(datagram.Tags) != 2 { + t.Errorf("expected 2 tags, got %d", len(datagram.Tags)) + } else if datagram.Tags["tag1"] != "value1" { + t.Errorf("expected tag1 to be 'value1', got %s", datagram.Tags["tag1"]) + } else if datagram.Tags["tag2"] != "value2" { + t.Errorf("expected tag2 to be 'value2', got %s", datagram.Tags["tag2"]) + } +} + +func TestParseStatsdDataGram_InvalidType(t *testing.T) { + message := "invalid:1|x" + _, err := parseStatsdDataGram(message) + if err == nil { + t.Errorf("parseStatsdDataGram() expected error, got nil") + } +} + +func TestParseStatsdDataGram_InvalidFormat(t *testing.T) { + message := "invalid" + _, err := parseStatsdDataGram(message) + if err == nil { + t.Errorf("parseStatsdDataGram() expected error, got nil") + } +} From 6b4e00b010bc3359635379d4170de1c7afe9e839 Mon Sep 17 00:00:00 2001 From: Muhammad Azeez Date: Tue, 16 Jan 2024 17:10:16 +0300 Subject: [PATCH 6/9] stub out log and metric impl --- go/adapter.go | 48 ++----- go/adapter/datadog/adapter.go | 15 +- go/adapter/honeycomb/adapter.go | 2 +- go/adapter/lightstep/adapter.go | 2 +- go/adapter/opentelemetry/adapter.go | 2 +- go/adapter/otel_stdout/adapter.go | 7 +- go/bin/otelstdout/main.go | 25 +--- go/statsd.go | 114 --------------- go/statsd_test.go | 206 ---------------------------- 9 files changed, 25 insertions(+), 396 deletions(-) delete mode 100644 go/statsd.go delete mode 100644 go/statsd_test.go diff --git a/go/adapter.go b/go/adapter.go index 46cd5ea..f361c90 100644 --- a/go/adapter.go +++ b/go/adapter.go @@ -8,7 +8,6 @@ import ( "time" "github.com/tetratelabs/wazero" - "go.opentelemetry.io/otel/metric" trace "go.opentelemetry.io/proto/otlp/trace/v1" ) @@ -90,7 +89,7 @@ func (b *AdapterBase) Stop(wait bool) { } // MakeOtelCallSpans recursively constructs call spans in open telemetry format -func (b *AdapterBase) MakeOtelCallSpans(event CallEvent, parentId []byte, traceId string, meter *metric.Meter) []*trace.Span { +func (b *AdapterBase) MakeOtelCallSpans(event CallEvent, parentId []byte, traceId string) []*trace.Span { name := event.FunctionName() span := NewOtelSpan(traceId, parentId, name, event.Time, event.Time.Add(event.Duration)) span.Attributes = append(span.Attributes, NewOtelKeyValueString("function-name", fmt.Sprintf("function-call-%s", name))) @@ -98,7 +97,7 @@ func (b *AdapterBase) MakeOtelCallSpans(event CallEvent, parentId []byte, traceI spans := []*trace.Span{span} for _, ev := range event.Within() { if call, ok := ev.(CallEvent); ok { - spans = append(spans, b.MakeOtelCallSpans(call, span.SpanId, traceId, meter)...) + spans = append(spans, b.MakeOtelCallSpans(call, span.SpanId, traceId)...) } if alloc, ok := ev.(MemoryGrowEvent); ok { last := spans[len(spans)-1] @@ -125,49 +124,18 @@ func (b *AdapterBase) MakeOtelCallSpans(event CallEvent, parentId []byte, traceI last.Attributes = append(last.Attributes, kv) } } - if metric, ok := ev.(MetricEvent); ok && meter != nil { - + if metric, ok := ev.(MetricEvent); ok { if metric.Format != Statsd { log.Printf("Unsupported metric format: %v\n", metric.Format) continue } - datagram, err := parseStatsdDataGram(metric.Message) - if err != nil { - log.Printf("Failed to parse statsd datagram: %v\n", err) - continue - } - - ctx := context.Background() - - m := *meter - - // TODO: maybe we should also support int64 metrics? - // TODO: double check this - // TODO: timestamps? - switch datagram.Type { - case StatsdCounter: - counter, _ := m.Float64Counter(datagram.Name) - counter.Add(ctx, datagram.Value) - case StatsdGauge: - gauge, _ := m.Float64UpDownCounter(datagram.Name) - gauge.Add(ctx, datagram.Value) - case StatsdTiming, StatsdHistogram: - histogram, _ := m.Float64Histogram(datagram.Name) - histogram.Record(ctx, datagram.Value) - case StatsdSet: - // TODO: how to support sets? - } + // TODO: properly report metrics + fmt.Printf("metric: %s\n", metric.Message) } - if log, ok := ev.(LogEvent); ok { - // TODO: since logs are not implemented in otel go, can we use span events instead? - last := spans[len(spans)-1] - event := trace.Span_Event{ - Name: log.Message, - TimeUnixNano: uint64(log.Time.UnixNano()), - } - - last.Events = append(last.Events, &event) + if l, ok := ev.(LogEvent); ok { + // TODO: otel Go doesn't support logs yet + log.Printf("metric: %s\n", l.Message) } } return spans diff --git a/go/adapter/datadog/adapter.go b/go/adapter/datadog/adapter.go index 1f8d394..9825547 100644 --- a/go/adapter/datadog/adapter.go +++ b/go/adapter/datadog/adapter.go @@ -85,8 +85,6 @@ func (d *DatadogAdapter) Flush(evts []observe.TraceEvent) error { log.Println("MemoryGrowEvent should be attached to a span") case observe.CustomEvent: log.Println("Datadog adapter does not respect custom events") - case observe.MetricEvent: - log.Println("MetricEvent should be attached to a span") } } @@ -197,8 +195,13 @@ func (d *DatadogAdapter) makeCallSpans(event observe.CallEvent, parentId *uint64 span.AddAllocation(alloc.MemoryGrowAmount()) } if metric, ok := ev.(observe.MetricEvent); ok { - _ = metric - // TODO: implement + if metric.Format != observe.Statsd { + log.Printf("Unsupported metric format: %v\n", metric.Format) + continue + } + + // TODO: properly report metrics + fmt.Printf("metric: %s\n", metric.Message) } if tags, ok := ev.(observe.SpanTagsEvent); ok { span := spans[len(spans)-1] @@ -212,6 +215,10 @@ func (d *DatadogAdapter) makeCallSpans(event observe.CallEvent, parentId *uint64 span.AddTag(parts[0], parts[1]) } } + if l, ok := ev.(observe.LogEvent); ok { + // TODO: otel Go doesn't support logs yet + log.Printf("metric: %s\n", l.Message) + } } return spans diff --git a/go/adapter/honeycomb/adapter.go b/go/adapter/honeycomb/adapter.go index 38ca6f5..7dacac6 100644 --- a/go/adapter/honeycomb/adapter.go +++ b/go/adapter/honeycomb/adapter.go @@ -56,7 +56,7 @@ func (h *HoneycombAdapter) Flush(evts []observe.TraceEvent) error { for _, e := range te.Events { switch event := e.(type) { case observe.CallEvent: // TODO: consider renaming to FunctionCall for consistency across Rust & JS - spans := h.MakeOtelCallSpans(event, nil, traceId, h.Config.Meter) + spans := h.MakeOtelCallSpans(event, nil, traceId) if len(spans) > 0 { allSpans = append(allSpans, spans...) } diff --git a/go/adapter/lightstep/adapter.go b/go/adapter/lightstep/adapter.go index ce499c1..f5168df 100644 --- a/go/adapter/lightstep/adapter.go +++ b/go/adapter/lightstep/adapter.go @@ -54,7 +54,7 @@ func (l *LightstepAdapter) Flush(evts []observe.TraceEvent) error { for _, e := range te.Events { switch event := e.(type) { case observe.CallEvent: // TODO: consider renaming to FunctionCall for consistency across Rust & JS - spans := l.MakeOtelCallSpans(event, nil, traceId, nil) + spans := l.MakeOtelCallSpans(event, nil, traceId) if len(spans) > 0 { allSpans = append(allSpans, spans...) } diff --git a/go/adapter/opentelemetry/adapter.go b/go/adapter/opentelemetry/adapter.go index 36b08ac..5e86410 100644 --- a/go/adapter/opentelemetry/adapter.go +++ b/go/adapter/opentelemetry/adapter.go @@ -114,7 +114,7 @@ func (o *OTelAdapter) Flush(evts []observe.TraceEvent) error { for _, e := range te.Events { switch event := e.(type) { case observe.CallEvent: // TODO: consider renaming to FunctionCall for consistency across Rust & JS - spans := o.MakeOtelCallSpans(event, nil, traceId, nil) + spans := o.MakeOtelCallSpans(event, nil, traceId) if len(spans) > 0 { allSpans = append(allSpans, spans...) } diff --git a/go/adapter/otel_stdout/adapter.go b/go/adapter/otel_stdout/adapter.go index b383fe1..ada060b 100644 --- a/go/adapter/otel_stdout/adapter.go +++ b/go/adapter/otel_stdout/adapter.go @@ -7,20 +7,17 @@ import ( "log" observe "github.com/dylibso/observe-sdk/go" - "go.opentelemetry.io/otel/metric" trace "go.opentelemetry.io/proto/otlp/trace/v1" ) type OtelStdoutAdapter struct { *observe.AdapterBase - Meter *metric.Meter } -func NewOtelStdoutAdapter(meter *metric.Meter) *OtelStdoutAdapter { +func NewOtelStdoutAdapter() *OtelStdoutAdapter { base := observe.NewAdapterBase(1, 0) adapter := &OtelStdoutAdapter{ AdapterBase: &base, - Meter: meter, } adapter.AdapterBase.SetFlusher(adapter) @@ -40,7 +37,7 @@ func (o *OtelStdoutAdapter) Flush(evts []observe.TraceEvent) error { for _, e := range te.Events { switch event := e.(type) { case observe.CallEvent: - spans := o.MakeOtelCallSpans(event, nil, traceId, o.Meter) + spans := o.MakeOtelCallSpans(event, nil, traceId) if len(spans) > 0 { allSpans = append(allSpans, spans...) } diff --git a/go/bin/otelstdout/main.go b/go/bin/otelstdout/main.go index f08f199..4c0de8a 100644 --- a/go/bin/otelstdout/main.go +++ b/go/bin/otelstdout/main.go @@ -10,7 +10,6 @@ import ( "github.com/dylibso/observe-sdk/go/adapter/otel_stdout" "github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" @@ -20,30 +19,8 @@ import ( func main() { ctx := context.Background() - res, err := newResource() - if err != nil { - panic(err) - } - - // Create a meter provider. - // You can pass this instance directly to your instrumented code if it - // accepts a MeterProvider instance. - meterProvider, err := newMeterProvider(res) - if err != nil { - panic(err) - } - - // Handle shutdown properly so nothing leaks. - defer func() { - if err := meterProvider.Shutdown(context.Background()); err != nil { - log.Println(err) - } - }() - - var meter = otel.Meter("my-service-meter") - // we only need to create and start once per instance of our host app - adapter := otel_stdout.NewOtelStdoutAdapter(&meter) + adapter := otel_stdout.NewOtelStdoutAdapter() defer adapter.Stop(true) adapter.Start(ctx) diff --git a/go/statsd.go b/go/statsd.go deleted file mode 100644 index 4136631..0000000 --- a/go/statsd.go +++ /dev/null @@ -1,114 +0,0 @@ -package observe - -import ( - "fmt" - "strconv" - "strings" -) - -type StatsdCounterType int - -const ( - StatsdCounter StatsdCounterType = iota - StatsdGauge - StatsdTiming - StatsdHistogram - StatsdSet -) - -type StatsdDataGram struct { - Name string - Type StatsdCounterType - Value float64 - SampleRate float64 - HasSampleRate bool - Tags map[string]string -} - -func parseStatsdDataGram(message string) (StatsdDataGram, error) { - // https://docs.datadoghq.com/developers/dogstatsd/datagram_shell/?tab=metrics#the-dogstatsd-protocol - // :||@|#:, - - m := StatsdDataGram{ - Tags: make(map[string]string), - } - - parts := strings.SplitN(message, ":", 2) - if len(parts) != 2 { - return m, fmt.Errorf("Expected a ':' in %s", message) - } - - // 1. read the metric name - m.Name = parts[0] - - parts = strings.Split(parts[1], "|") - if len(parts) < 2 { - return m, fmt.Errorf("Expected '|' in %s", message) - } - - // 2. read the metric type - rawType := parts[1] - var rawValue string - - // 3. read the metric value - // TODO: add support for int64 metrics - // TODO: add support for rate and distribution metrics: https://docs.datadoghq.com/metrics/types/?tab=gauge#metric-types - switch rawType { - case "c": - m.Type = StatsdCounter - rawValue = parts[0] - case "g": - m.Type = StatsdGauge - part := parts[0] - - // Guages can have a + or - prefix - if part[0] == '+' || part[0] == '-' { - rawValue = part[1:] - } else { - rawValue = part - } - case "ms": - m.Type = StatsdTiming - rawValue = parts[0] - case "h": - m.Type = StatsdHistogram - rawValue = parts[0] - case "s": - m.Type = StatsdSet - rawValue = parts[0] - default: - return m, fmt.Errorf("Unknown type %s", rawType) - } - - value, err := strconv.ParseFloat(rawValue, 64) - if err != nil { - return m, fmt.Errorf("Failed to parse %s as a float: %e", rawValue, err) - } - - m.Value = value - - // 4. read the sample rate and tags - for _, part := range parts[2:] { - if part[0] == '@' { - sampleRate, err := strconv.ParseFloat(part[1:], 64) - if err != nil { - return m, fmt.Errorf("Failed to parse %s as a float: %e", part[1:], err) - } - - m.SampleRate = sampleRate - m.HasSampleRate = true - } else if part[0] == '#' { - tags := strings.Split(part[1:], ",") - for _, tag := range tags { - tagParts := strings.Split(tag, ":") - if len(tagParts) != 2 { - return m, fmt.Errorf("Expected a ':' in %s", tag) - } - - m.Tags[tagParts[0]] = tagParts[1] - } - } - } - - return m, nil -} diff --git a/go/statsd_test.go b/go/statsd_test.go deleted file mode 100644 index a3333f5..0000000 --- a/go/statsd_test.go +++ /dev/null @@ -1,206 +0,0 @@ -package observe - -import ( - "testing" -) - -func TestParseStatsdDataGram_Simple(t *testing.T) { - message := "my-counter:1.98|c" - datagram, err := parseStatsdDataGram(message) - if err != nil { - t.Errorf("parseStatsdDataGram() error = %v", err) - } - - if datagram.Name != "my-counter" { - t.Errorf("expected name to be 'my-counter', got %s", datagram.Name) - } else if datagram.Type != StatsdCounter { - t.Errorf("expected type to be StatsdCounter, got %d", datagram.Type) - } else if datagram.Value != 1.98 { - t.Errorf("expected value to be 1.98, got %f", datagram.Value) - } else if datagram.HasSampleRate == true { - t.Errorf("expected HasSampleRate to be false, got %v", datagram.HasSampleRate) - } else if len(datagram.Tags) != 0 { - t.Errorf("expected 0 tags, got %d", len(datagram.Tags)) - } -} - -func TestParseStatsdDataGram_Counter_NoTags(t *testing.T) { - message := "my-counter:1.98|c|@0.1" - datagram, err := parseStatsdDataGram(message) - if err != nil { - t.Errorf("parseStatsdDataGram() error = %v", err) - } - - if datagram.Name != "my-counter" { - t.Errorf("expected name to be 'my-counter', got %s", datagram.Name) - } else if datagram.Type != StatsdCounter { - t.Errorf("expected type to be StatsdCounter, got %d", datagram.Type) - } else if datagram.Value != 1.98 { - t.Errorf("expected value to be 1.98, got %f", datagram.Value) - } else if datagram.HasSampleRate == false { - t.Errorf("expected HasSampleRate to be true, got %v", datagram.HasSampleRate) - } else if datagram.SampleRate != 0.1 { - t.Errorf("expected sample rate to be 0.1, got %f", datagram.SampleRate) - } else if len(datagram.Tags) != 0 { - t.Errorf("expected 0 tags, got %d", len(datagram.Tags)) - } -} -func TestParseStatsdDataGram_Counter_NoSampleRate(t *testing.T) { - message := "my-counter:1.98|c|#tag1:value1,tag2:value2" - datagram, err := parseStatsdDataGram(message) - if err != nil { - t.Errorf("parseStatsdDataGram() error = %v", err) - } - - if datagram.Name != "my-counter" { - t.Errorf("expected name to be 'my-counter', got %s", datagram.Name) - } else if datagram.Type != StatsdCounter { - t.Errorf("expected type to be StatsdCounter, got %d", datagram.Type) - } else if datagram.Value != 1.98 { - t.Errorf("expected value to be 1.98, got %f", datagram.Value) - } else if datagram.HasSampleRate == true { - t.Errorf("expected HasSampleRate to be false, got %v", datagram.HasSampleRate) - } else if len(datagram.Tags) != 2 { - t.Errorf("expected 2 tags, got %d", len(datagram.Tags)) - } else if datagram.Tags["tag1"] != "value1" { - t.Errorf("expected tag1 to be 'value1', got %s", datagram.Tags["tag1"]) - } else if datagram.Tags["tag2"] != "value2" { - t.Errorf("expected tag2 to be 'value2', got %s", datagram.Tags["tag2"]) - } -} - -func TestParseStatsdDataGram_Counter(t *testing.T) { - message := "my-counter:1.98|c|@0.1|#tag1:value1,tag2:value2" - datagram, err := parseStatsdDataGram(message) - if err != nil { - t.Errorf("parseStatsdDataGram() error = %v", err) - } - - if datagram.Name != "my-counter" { - t.Errorf("expected name to be 'my-counter', got %s", datagram.Name) - } else if datagram.Type != StatsdCounter { - t.Errorf("expected type to be StatsdCounter, got %d", datagram.Type) - } else if datagram.Value != 1.98 { - t.Errorf("expected value to be 1.98, got %f", datagram.Value) - } else if datagram.SampleRate != 0.1 { - t.Errorf("expected sample rate to be 0.1, got %f", datagram.SampleRate) - } else if len(datagram.Tags) != 2 { - t.Errorf("expected 2 tags, got %d", len(datagram.Tags)) - } else if datagram.Tags["tag1"] != "value1" { - t.Errorf("expected tag1 to be 'value1', got %s", datagram.Tags["tag1"]) - } else if datagram.Tags["tag2"] != "value2" { - t.Errorf("expected tag2 to be 'value2', got %s", datagram.Tags["tag2"]) - } -} - -func TestParseStatsdDataGram_Gauge(t *testing.T) { - message := "my-guage:8723.042|g|@0.001|#tag1:value1,tag2:value2" - datagram, err := parseStatsdDataGram(message) - if err != nil { - t.Errorf("parseStatsdDataGram() error = %v", err) - } - - if datagram.Name != "my-guage" { - t.Errorf("expected name to be 'my-guage', got %s", datagram.Name) - } else if datagram.Type != StatsdGauge { - t.Errorf("expected type to be StatsdGauge, got %d", datagram.Type) - } else if datagram.Value != 8723.042 { - t.Errorf("expected value to be 8723.042, got %f", datagram.Value) - } else if datagram.SampleRate != 0.001 { - t.Errorf("expected sample rate to be 0.001, got %f", datagram.SampleRate) - } else if len(datagram.Tags) != 2 { - t.Errorf("expected 2 tags, got %d", len(datagram.Tags)) - } else if datagram.Tags["tag1"] != "value1" { - t.Errorf("expected tag1 to be 'value1', got %s", datagram.Tags["tag1"]) - } else if datagram.Tags["tag2"] != "value2" { - t.Errorf("expected tag2 to be 'value2', got %s", datagram.Tags["tag2"]) - } -} - -func TestParseStatsdDataGram_Timing(t *testing.T) { - message := "my-timing:1|ms|@0.1|#tag1:value1,tag2:value2" - datagram, err := parseStatsdDataGram(message) - if err != nil { - t.Errorf("parseStatsdDataGram() error = %v", err) - } - - if datagram.Name != "my-timing" { - t.Errorf("expected name to be 'my-timing', got %s", datagram.Name) - } else if datagram.Type != StatsdTiming { - t.Errorf("expected type to be StatsdTiming, got %d", datagram.Type) - } else if datagram.Value != 1 { - t.Errorf("expected value to be 1, got %f", datagram.Value) - } else if datagram.SampleRate != 0.1 { - t.Errorf("expected sample rate to be 0.1, got %f", datagram.SampleRate) - } else if len(datagram.Tags) != 2 { - t.Errorf("expected 2 tags, got %d", len(datagram.Tags)) - } else if datagram.Tags["tag1"] != "value1" { - t.Errorf("expected tag1 to be 'value1', got %s", datagram.Tags["tag1"]) - } else if datagram.Tags["tag2"] != "value2" { - t.Errorf("expected tag2 to be 'value2', got %s", datagram.Tags["tag2"]) - } -} - -func TestParseStatsdDataGram_Histogram(t *testing.T) { - message := "my-histogram:1|h|@0.1|#tag1:value1,tag2:value2" - datagram, err := parseStatsdDataGram(message) - if err != nil { - t.Errorf("parseStatsdDataGram() error = %v", err) - } - - if datagram.Name != "my-histogram" { - t.Errorf("expected name to be 'my-histogram', got %s", datagram.Name) - } else if datagram.Type != StatsdHistogram { - t.Errorf("expected type to be StatsdHistogram, got %d", datagram.Type) - } else if datagram.Value != 1 { - t.Errorf("expected value to be 1, got %f", datagram.Value) - } else if datagram.SampleRate != 0.1 { - t.Errorf("expected sample rate to be 0.1, got %f", datagram.SampleRate) - } else if len(datagram.Tags) != 2 { - t.Errorf("expected 2 tags, got %d", len(datagram.Tags)) - } else if datagram.Tags["tag1"] != "value1" { - t.Errorf("expected tag1 to be 'value1', got %s", datagram.Tags["tag1"]) - } else if datagram.Tags["tag2"] != "value2" { - t.Errorf("expected tag2 to be 'value2', got %s", datagram.Tags["tag2"]) - } -} - -func TestParseStatsdDataGram_Set(t *testing.T) { - message := "my-set:1|s|@0.1|#tag1:value1,tag2:value2" - datagram, err := parseStatsdDataGram(message) - if err != nil { - t.Errorf("parseStatsdDataGram() error = %v", err) - } - - if datagram.Name != "my-set" { - t.Errorf("expected name to be 'my-set', got %s", datagram.Name) - } else if datagram.Type != StatsdSet { - t.Errorf("expected type to be StatsdSet, got %d", datagram.Type) - } else if datagram.Value != 1 { - t.Errorf("expected value to be 1, got %f", datagram.Value) - } else if datagram.SampleRate != 0.1 { - t.Errorf("expected sample rate to be 0.1, got %f", datagram.SampleRate) - } else if len(datagram.Tags) != 2 { - t.Errorf("expected 2 tags, got %d", len(datagram.Tags)) - } else if datagram.Tags["tag1"] != "value1" { - t.Errorf("expected tag1 to be 'value1', got %s", datagram.Tags["tag1"]) - } else if datagram.Tags["tag2"] != "value2" { - t.Errorf("expected tag2 to be 'value2', got %s", datagram.Tags["tag2"]) - } -} - -func TestParseStatsdDataGram_InvalidType(t *testing.T) { - message := "invalid:1|x" - _, err := parseStatsdDataGram(message) - if err == nil { - t.Errorf("parseStatsdDataGram() expected error, got nil") - } -} - -func TestParseStatsdDataGram_InvalidFormat(t *testing.T) { - message := "invalid" - _, err := parseStatsdDataGram(message) - if err == nil { - t.Errorf("parseStatsdDataGram() expected error, got nil") - } -} From 37537de31697c583f9e7489842198f995044bdf3 Mon Sep 17 00:00:00 2001 From: Muhammad Azeez Date: Tue, 16 Jan 2024 17:14:37 +0300 Subject: [PATCH 7/9] cleanup --- go/adapter.go | 4 ++-- go/adapter/datadog/adapter.go | 4 ++-- go/adapter/honeycomb/adapter.go | 2 -- go/bin/otelstdout/main.go | 27 --------------------------- go/event.go | 2 +- 5 files changed, 5 insertions(+), 34 deletions(-) diff --git a/go/adapter.go b/go/adapter.go index f361c90..5815ab7 100644 --- a/go/adapter.go +++ b/go/adapter.go @@ -125,7 +125,7 @@ func (b *AdapterBase) MakeOtelCallSpans(event CallEvent, parentId []byte, traceI } } if metric, ok := ev.(MetricEvent); ok { - if metric.Format != Statsd { + if metric.Format != StatsdFormat { log.Printf("Unsupported metric format: %v\n", metric.Format) continue } @@ -135,7 +135,7 @@ func (b *AdapterBase) MakeOtelCallSpans(event CallEvent, parentId []byte, traceI } if l, ok := ev.(LogEvent); ok { // TODO: otel Go doesn't support logs yet - log.Printf("metric: %s\n", l.Message) + log.Println(l.Message) } } return spans diff --git a/go/adapter/datadog/adapter.go b/go/adapter/datadog/adapter.go index 9825547..cfb7eb1 100644 --- a/go/adapter/datadog/adapter.go +++ b/go/adapter/datadog/adapter.go @@ -195,7 +195,7 @@ func (d *DatadogAdapter) makeCallSpans(event observe.CallEvent, parentId *uint64 span.AddAllocation(alloc.MemoryGrowAmount()) } if metric, ok := ev.(observe.MetricEvent); ok { - if metric.Format != observe.Statsd { + if metric.Format != observe.StatsdFormat { log.Printf("Unsupported metric format: %v\n", metric.Format) continue } @@ -217,7 +217,7 @@ func (d *DatadogAdapter) makeCallSpans(event observe.CallEvent, parentId *uint64 } if l, ok := ev.(observe.LogEvent); ok { // TODO: otel Go doesn't support logs yet - log.Printf("metric: %s\n", l.Message) + log.Println(l.Message) } } diff --git a/go/adapter/honeycomb/adapter.go b/go/adapter/honeycomb/adapter.go index 7dacac6..549a2d4 100644 --- a/go/adapter/honeycomb/adapter.go +++ b/go/adapter/honeycomb/adapter.go @@ -9,7 +9,6 @@ import ( "time" observe "github.com/dylibso/observe-sdk/go" - "go.opentelemetry.io/otel/metric" trace "go.opentelemetry.io/proto/otlp/trace/v1" proto "google.golang.org/protobuf/proto" ) @@ -20,7 +19,6 @@ type HoneycombConfig struct { EmitTracesInterval time.Duration TraceBatchMax uint32 Host string - Meter *metric.Meter } type HoneycombAdapter struct { diff --git a/go/bin/otelstdout/main.go b/go/bin/otelstdout/main.go index 4c0de8a..e429c6a 100644 --- a/go/bin/otelstdout/main.go +++ b/go/bin/otelstdout/main.go @@ -10,10 +10,6 @@ import ( "github.com/dylibso/observe-sdk/go/adapter/otel_stdout" "github.com/tetratelabs/wazero" "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" - "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" - "go.opentelemetry.io/otel/sdk/metric" - "go.opentelemetry.io/otel/sdk/resource" - semconv "go.opentelemetry.io/otel/semconv/v1.19.0" ) func main() { @@ -60,26 +56,3 @@ func main() { traceCtx.Finish() time.Sleep(2 * time.Second) } - -func newResource() (*resource.Resource, error) { - return resource.Merge(resource.Default(), - resource.NewWithAttributes(semconv.SchemaURL, - semconv.ServiceName("my-service"), - semconv.ServiceVersion("0.1.0"), - )) -} - -func newMeterProvider(res *resource.Resource) (*metric.MeterProvider, error) { - metricExporter, err := stdoutmetric.New() - if err != nil { - return nil, err - } - - meterProvider := metric.NewMeterProvider( - metric.WithResource(res), - metric.WithReader(metric.NewPeriodicReader(metricExporter, - // Default is 1m. Set to 3s for demonstrative purposes. - metric.WithInterval(1*time.Second))), - ) - return meterProvider, nil -} diff --git a/go/event.go b/go/event.go index 0076f37..ef650f8 100644 --- a/go/event.go +++ b/go/event.go @@ -32,7 +32,7 @@ const ( type MetricFormat int const ( - Statsd MetricFormat = 1 + StatsdFormat MetricFormat = 1 ) // Represents the raw event in our Observe form. From 1cf0fa7b0d8905907e293a8884cbb61b768924d4 Mon Sep 17 00:00:00 2001 From: Muhammad Azeez Date: Tue, 16 Jan 2024 17:30:32 +0300 Subject: [PATCH 8/9] feat: don't associate logs and metrics with spans --- go/adapter.go | 13 ------------ go/adapter/honeycomb/adapter.go | 4 ++++ go/adapter/lightstep/adapter.go | 4 ++++ go/adapter/opentelemetry/adapter.go | 4 ++++ go/adapter/otel_stdout/adapter.go | 4 ++++ go/adapter/stdout/adapter.go | 10 ++++----- go/event.go | 6 ++---- go/listener.go | 4 ---- go/trace_ctx.go | 32 ++--------------------------- 9 files changed, 24 insertions(+), 57 deletions(-) diff --git a/go/adapter.go b/go/adapter.go index 5815ab7..3ba0a1a 100644 --- a/go/adapter.go +++ b/go/adapter.go @@ -124,19 +124,6 @@ func (b *AdapterBase) MakeOtelCallSpans(event CallEvent, parentId []byte, traceI last.Attributes = append(last.Attributes, kv) } } - if metric, ok := ev.(MetricEvent); ok { - if metric.Format != StatsdFormat { - log.Printf("Unsupported metric format: %v\n", metric.Format) - continue - } - - // TODO: properly report metrics - fmt.Printf("metric: %s\n", metric.Message) - } - if l, ok := ev.(LogEvent); ok { - // TODO: otel Go doesn't support logs yet - log.Println(l.Message) - } } return spans } diff --git a/go/adapter/honeycomb/adapter.go b/go/adapter/honeycomb/adapter.go index 549a2d4..9c99a73 100644 --- a/go/adapter/honeycomb/adapter.go +++ b/go/adapter/honeycomb/adapter.go @@ -62,6 +62,10 @@ func (h *HoneycombAdapter) Flush(evts []observe.TraceEvent) error { log.Println("MemoryGrowEvent should be attached to a span") case observe.CustomEvent: log.Println("Honeycomb adapter does not respect custom events") + case observe.MetricEvent: + log.Println("Honeycomb adapter does not respect metric events") + case observe.LogEvent: + log.Println("Honeycomb adapter does not respect log events") } } diff --git a/go/adapter/lightstep/adapter.go b/go/adapter/lightstep/adapter.go index f5168df..0caacd3 100644 --- a/go/adapter/lightstep/adapter.go +++ b/go/adapter/lightstep/adapter.go @@ -62,6 +62,10 @@ func (l *LightstepAdapter) Flush(evts []observe.TraceEvent) error { log.Println("MemoryGrowEvent should be attached to a span") case observe.CustomEvent: log.Println("lightstep adapter does not respect custom events") + case observe.MetricEvent: + log.Println("lightstep adapter does not respect metric events") + case observe.LogEvent: + log.Println("lightstep adapter does not respect log events") } } diff --git a/go/adapter/opentelemetry/adapter.go b/go/adapter/opentelemetry/adapter.go index 5e86410..c37bcd8 100644 --- a/go/adapter/opentelemetry/adapter.go +++ b/go/adapter/opentelemetry/adapter.go @@ -122,6 +122,10 @@ func (o *OTelAdapter) Flush(evts []observe.TraceEvent) error { log.Println("MemoryGrowEvent should be attached to a span") case observe.CustomEvent: log.Println("opentelemetry adapter does not respect custom events") + case observe.MetricEvent: + log.Println("opentelemetry adapter does not respect metric events") + case observe.LogEvent: + log.Println("opentelemetry adapter does not respect log events") } } diff --git a/go/adapter/otel_stdout/adapter.go b/go/adapter/otel_stdout/adapter.go index ada060b..9e3e60f 100644 --- a/go/adapter/otel_stdout/adapter.go +++ b/go/adapter/otel_stdout/adapter.go @@ -45,6 +45,10 @@ func (o *OtelStdoutAdapter) Flush(evts []observe.TraceEvent) error { log.Println("MemoryGrowEvent should be attached to a span") case observe.CustomEvent: log.Println("Otel adapter does not respect custom events") + case observe.MetricEvent: + log.Printf("metric: %s\n", event.Message) + case observe.LogEvent: + log.Println(event.Message) } } diff --git a/go/adapter/stdout/adapter.go b/go/adapter/stdout/adapter.go index 6b18a23..f6ce93e 100644 --- a/go/adapter/stdout/adapter.go +++ b/go/adapter/stdout/adapter.go @@ -38,6 +38,10 @@ func (s *StdoutAdapter) Flush(evts []observe.TraceEvent) error { log.Println("Allocated", event.MemoryGrowAmount(), "pages of memory in", name) case observe.CustomEvent: log.Println(event.Name, event.Time) + case observe.MetricEvent: + log.Printf("metric: %s\n", event.Message) + case observe.LogEvent: + log.Println(event.Message) } } } @@ -55,12 +59,6 @@ func (s *StdoutAdapter) printEvents(event observe.CallEvent, indentation int) { if alloc, ok := event.(observe.MemoryGrowEvent); ok { log.Println(strings.Repeat(" ", indentation), "Allocated", alloc.MemoryGrowAmount(), "pages of memory in", name) } - if metric, ok := event.(observe.MetricEvent); ok { - log.Println(strings.Repeat(" ", indentation), "Metric", metric.Message, "Format", metric.Format) - } - if l, ok := event.(observe.LogEvent); ok { - log.Println(strings.Repeat(" ", indentation), "Log", l.Message) - } if spanTags, ok := event.(observe.SpanTagsEvent); ok { log.Println(strings.Repeat(" ", indentation), "Span tags:", spanTags.Tags) } diff --git a/go/event.go b/go/event.go index ef650f8..2865cf7 100644 --- a/go/event.go +++ b/go/event.go @@ -90,7 +90,6 @@ func (e CustomEvent) RawEvents() []RawEvent { } type MetricEvent struct { - Raw RawEvent Time time.Time Format MetricFormat Message string @@ -117,7 +116,6 @@ const ( ) type LogEvent struct { - Raw RawEvent Time time.Time Message string Level LogLevel @@ -156,7 +154,7 @@ func (e MemoryGrowEvent) MemoryGrowAmount() uint32 { } func (e MetricEvent) RawEvents() []RawEvent { - return []RawEvent{e.Raw} + return []RawEvent{} } func (e SpanTagsEvent) RawEvents() []RawEvent { @@ -164,5 +162,5 @@ func (e SpanTagsEvent) RawEvents() []RawEvent { } func (e LogEvent) RawEvents() []RawEvent { - return []RawEvent{e.Raw} + return []RawEvent{} } diff --git a/go/listener.go b/go/listener.go index 2bd4329..9aeeb9b 100644 --- a/go/listener.go +++ b/go/listener.go @@ -44,12 +44,8 @@ func (t *TraceCtx) Before(ctx context.Context, _ api.Module, def api.FunctionDef event.Kind = RawEnter case "span_exit": event.Kind = RawExit - case "metric": - event.Kind = RawMetric case "span_tags": event.Kind = RawSpanTags - case "log": - event.Kind = RawLog default: return } diff --git a/go/trace_ctx.go b/go/trace_ctx.go index 59bac66..b1af61b 100644 --- a/go/trace_ctx.go +++ b/go/trace_ctx.go @@ -157,27 +157,13 @@ func (t *TraceCtx) init(ctx context.Context, r wazero.Runtime) error { log.Printf("metric: failed to read memory at offset %v with length %v\n", ptr, len) } - ev := <-t.raw - if ev.Kind != RawMetric { - log.Println("Expected event", Metric, "but got", ev.Kind) - return - } - event := MetricEvent{ Time: time.Now(), Format: format, Message: string(buffer), - Raw: ev, } - fn, ok := t.popFunction() - if !ok { - t.events = append(t.events, event) - return - } - fn.within = append(fn.within, event) - t.pushFunction(fn) - + t.events = append(t.events, event) }).Export("metric") functions.WithFunc(func(ctx context.Context, m api.Module, ptr int64, len int32) { @@ -220,27 +206,13 @@ func (t *TraceCtx) init(ctx context.Context, r wazero.Runtime) error { log.Printf("metric: failed to read memory at offset %v with length %v\n", ptr, len) } - ev := <-t.raw - if ev.Kind != RawLog { - log.Println("Expected event", Metric, "but got", ev.Kind) - return - } - event := LogEvent{ Time: time.Now(), - Raw: ev, Level: level, Message: string(buffer), } - fn, ok := t.popFunction() - if !ok { - t.events = append(t.events, event) - return - } - fn.within = append(fn.within, event) - t.pushFunction(fn) - + t.events = append(t.events, event) }).Export("log") _, err := observe.Instantiate(ctx) From 51e1086771edeacfe65f380317f0075358cd9fd5 Mon Sep 17 00:00:00 2001 From: Muhammad Azeez Date: Tue, 16 Jan 2024 17:32:08 +0300 Subject: [PATCH 9/9] cleanup --- go/adapter/datadog/adapter.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/go/adapter/datadog/adapter.go b/go/adapter/datadog/adapter.go index cfb7eb1..7cff86d 100644 --- a/go/adapter/datadog/adapter.go +++ b/go/adapter/datadog/adapter.go @@ -194,15 +194,6 @@ func (d *DatadogAdapter) makeCallSpans(event observe.CallEvent, parentId *uint64 span := spans[len(spans)-1] span.AddAllocation(alloc.MemoryGrowAmount()) } - if metric, ok := ev.(observe.MetricEvent); ok { - if metric.Format != observe.StatsdFormat { - log.Printf("Unsupported metric format: %v\n", metric.Format) - continue - } - - // TODO: properly report metrics - fmt.Printf("metric: %s\n", metric.Message) - } if tags, ok := ev.(observe.SpanTagsEvent); ok { span := spans[len(spans)-1] for _, tag := range tags.Tags { @@ -215,10 +206,6 @@ func (d *DatadogAdapter) makeCallSpans(event observe.CallEvent, parentId *uint64 span.AddTag(parts[0], parts[1]) } } - if l, ok := ev.(observe.LogEvent); ok { - // TODO: otel Go doesn't support logs yet - log.Println(l.Message) - } } return spans