From 1d576eca2ab4f7808e77e244ff116421c743190f Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 8 Nov 2024 08:37:40 -0800 Subject: [PATCH 1/2] Replaced local trace store with telemetry server store. --- go/core/action_test.go | 12 +- go/core/core.go | 13 -- go/core/tracing/file_store.go | 157 ------------------ go/core/tracing/file_store_test.go | 138 --------------- go/core/tracing/store.go | 52 +----- go/core/tracing/telemetry.go | 87 ++++++++++ ...e_exporter.go => trace_server_exporter.go} | 25 ++- ..._test.go => trace_server_exporter_test.go} | 0 go/core/tracing/tracing.go | 12 +- go/genkit/conformance_test.go | 9 +- go/genkit/servers.go | 85 +++------- go/genkit/servers_test.go | 27 +-- go/internal/registry/registry.go | 41 +---- 13 files changed, 149 insertions(+), 509 deletions(-) delete mode 100644 go/core/tracing/file_store.go delete mode 100644 go/core/tracing/file_store_test.go create mode 100644 go/core/tracing/telemetry.go rename go/core/tracing/{trace_store_exporter.go => trace_server_exporter.go} (87%) rename go/core/tracing/{trace_store_exporter_test.go => trace_server_exporter_test.go} (100%) diff --git a/go/core/action_test.go b/go/core/action_test.go index dbd232845..e8c5edfd9 100644 --- a/go/core/action_test.go +++ b/go/core/action_test.go @@ -20,6 +20,7 @@ import ( "slices" "testing" + "github.com/firebase/genkit/go/core/tracing" "github.com/firebase/genkit/go/internal/atype" "github.com/firebase/genkit/go/internal/registry" ) @@ -97,21 +98,16 @@ func TestActionStreaming(t *testing.T) { } func TestActionTracing(t *testing.T) { - ctx := context.Background() + tc := tracing.NewTestOnlyTelemetryClient() + registry.Global.TracingState().WriteTelemetryImmediate(tc) const actionName = "TestTracing-inc" a := newAction(actionName, atype.Custom, nil, nil, inc) if _, err := a.Run(context.Background(), 3, nil); err != nil { t.Fatal(err) } - // The dev TraceStore is registered by Init, called from TestMain. - ts := registry.Global.LookupTraceStore(registry.EnvironmentDev) - tds, _, err := ts.List(ctx, nil) - if err != nil { - t.Fatal(err) - } // The same trace store is used for all tests, so there might be several traces. // Look for this one, which has a unique name. - for _, td := range tds { + for _, td := range tc.Traces { if td.DisplayName == actionName { // Spot check: expect a single span. if g, w := len(td.Spans), 1; g != w { diff --git a/go/core/core.go b/go/core/core.go index dd1b8f79e..a5ff8c385 100644 --- a/go/core/core.go +++ b/go/core/core.go @@ -25,23 +25,10 @@ package core import ( - "context" - - "github.com/firebase/genkit/go/core/tracing" "github.com/firebase/genkit/go/internal/registry" sdktrace "go.opentelemetry.io/otel/sdk/trace" ) -// RegisterTraceStore uses the given trace.Store to record traces in the prod environment. -// (A trace.Store that writes to the local filesystem is always installed in the dev environment.) -// The returned function should be called before the program ends to ensure that -// all pending data is stored. -// RegisterTraceStore panics if called more than once. -func RegisterTraceStore(ts tracing.Store) (shutdown func(context.Context) error) { - registry.Global.RegisterTraceStore(registry.EnvironmentProd, ts) - return registry.Global.TracingState().AddTraceStoreBatch(ts) -} - // RegisterSpanProcessor registers an OpenTelemetry SpanProcessor for tracing. func RegisterSpanProcessor(sp sdktrace.SpanProcessor) { registry.Global.RegisterSpanProcessor(sp) diff --git a/go/core/tracing/file_store.go b/go/core/tracing/file_store.go deleted file mode 100644 index bdca86b40..000000000 --- a/go/core/tracing/file_store.go +++ /dev/null @@ -1,157 +0,0 @@ -// Copyright 2024 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tracing - -import ( - "context" - "errors" - "fmt" - "io/fs" - "os" - "path/filepath" - "slices" - "strconv" - "time" - - "github.com/firebase/genkit/go/internal/base" -) - -// A FileStore is a Store that writes traces to files. -type FileStore struct { - dir string -} - -// NewFileStore creates a FileStore that writes traces to the given -// directory. The directory is created if it does not exist. -func NewFileStore(dir string) (*FileStore, error) { - if err := os.MkdirAll(dir, 0o755); err != nil { - return nil, err - } - return &FileStore{dir: dir}, nil -} - -// Save implements [Store.Save]. -// It is not safe to call Save concurrently with the same ID. -func (s *FileStore) Save(ctx context.Context, id string, td *Data) error { - existing, err := s.Load(ctx, id) - if err == nil { - // Merge the existing spans with the incoming ones. - // Mutate existing because we know it has no other references. - for k, v := range td.Spans { - existing.Spans[k] = v - } - existing.TraceID = id - existing.DisplayName = td.DisplayName - existing.StartTime = td.StartTime - existing.EndTime = td.EndTime - td = existing - } else if !errors.Is(err, fs.ErrNotExist) { - return err - } - return base.WriteJSONFile(filepath.Join(s.dir, base.Clean(id)), td) -} - -// Load implements [Store.Load]. -func (s *FileStore) Load(ctx context.Context, id string) (*Data, error) { - var td *Data - if err := base.ReadJSONFile(filepath.Join(s.dir, base.Clean(id)), &td); err != nil { - return nil, err - } - return td, nil -} - -// List implements [Store.List]. -// The traces are returned in the order they were written, newest first. -// The default limit is 10. -func (s *FileStore) List(ctx context.Context, q *Query) ([]*Data, string, error) { - entries, err := os.ReadDir(s.dir) - if err != nil { - return nil, "", err - } - // Sort by modified time. - modTimes := map[string]time.Time{} - for _, e := range entries { - info, err := e.Info() - if err != nil { - return nil, "", err - } - modTimes[e.Name()] = info.ModTime() - } - slices.SortFunc(entries, func(e1, e2 os.DirEntry) int { - return modTimes[e2.Name()].Compare(modTimes[e1.Name()]) - }) - - // Determine subsequence to return. - start, end, err := listRange(q, len(entries)) - if err != nil { - return nil, "", err - } - - var ts []*Data - for _, e := range entries[start:end] { - var t *Data - if err := base.ReadJSONFile(filepath.Join(s.dir, e.Name()), &t); err != nil { - return nil, "", err - } - ts = append(ts, t) - } - ctoken := "" - if end < len(entries) { - ctoken = strconv.Itoa(end) - } - return ts, ctoken, nil -} - -// listRange returns the range of elements to return from a List call. -func listRange(q *Query, total int) (start, end int, err error) { - const defaultLimit = 10 - start = 0 - end = total - limit := 0 - ctoken := "" - if q != nil { - limit = q.Limit - ctoken = q.ContinuationToken - } - if ctoken != "" { - // A continuation token is just an integer index in string form. - // This doesn't work well with newest-first order if files are added during listing, - // because the indexes will change. - // But we use it for consistency with the javascript implementation. - // TODO: consider using distance from the end (len(entries) - end). - start, err = strconv.Atoi(ctoken) - if err != nil { - return 0, 0, fmt.Errorf("%w: parsing continuation token: %v", ErrBadQuery, err) - } - if start < 0 || start >= total { - return 0, 0, fmt.Errorf("%w: continuation token out of range", ErrBadQuery) - } - } - if limit < 0 { - return 0, 0, fmt.Errorf("%w: negative limit", ErrBadQuery) - } - if limit == 0 { - limit = defaultLimit - } - end = start + limit - if end > total { - end = total - } - return start, end, nil -} - -func (s *FileStore) LoadAny(id string, p any) error { - return base.ReadJSONFile(filepath.Join(s.dir, base.Clean(id)), p) -} diff --git a/go/core/tracing/file_store_test.go b/go/core/tracing/file_store_test.go deleted file mode 100644 index 431e89de0..000000000 --- a/go/core/tracing/file_store_test.go +++ /dev/null @@ -1,138 +0,0 @@ -// Copyright 2024 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tracing - -import ( - "context" - "errors" - "testing" - "time" - - "github.com/google/go-cmp/cmp" -) - -func TestFileStore(t *testing.T) { - ctx := context.Background() - td1 := &Data{ - DisplayName: "td1", - StartTime: 10, - EndTime: 20, - Spans: map[string]*SpanData{ - "s1": {SpanID: "sid1"}, - "s2": {SpanID: "sid2"}, - }, - } - ts, err := NewFileStore(t.TempDir()) - if err != nil { - t.Fatal(err) - } - if err := ts.Save(ctx, "id1", td1); err != nil { - t.Fatal(err) - } - got, err := ts.Load(ctx, "id1") - if err != nil { - t.Fatal(err) - } - if diff := cmp.Diff(td1, got); diff != "" { - t.Errorf("mismatch (-want, +got):\n%s", diff) - } - - // Saving a span with the same ID merges spans and overrides the other - // fields. - td2 := &Data{ - DisplayName: "td2", - StartTime: 30, - EndTime: 40, - Spans: map[string]*SpanData{ - "s3": {SpanID: "sid3"}, - }, - } - if err := ts.Save(ctx, "id1", td2); err != nil { - t.Fatal(err) - } - want := &Data{ - TraceID: "id1", - DisplayName: "td2", - StartTime: 30, - EndTime: 40, - Spans: map[string]*SpanData{ - "s1": {SpanID: "sid1"}, - "s2": {SpanID: "sid2"}, - "s3": {SpanID: "sid3"}, - }, - } - got, err = ts.Load(ctx, "id1") - if err != nil { - t.Fatal(err) - } - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("mismatch (-want, +got):\n%s", diff) - } - - // Test List. - td3 := &Data{DisplayName: "td3"} - time.Sleep(50 * time.Millisecond) // force different mtimes - if err := ts.Save(ctx, "id3", td3); err != nil { - t.Fatal(err) - } - - gotTDs, gotCT, err := ts.List(ctx, nil) - // All the Datas, in the expected order. - wantTDs := []*Data{td3, want} - if diff := cmp.Diff(wantTDs, gotTDs); diff != "" { - t.Errorf("mismatch (-want, +got):\n%s", diff) - } - if gotCT != "" { - t.Errorf("continuation token: got %q, want %q", gotCT, "") - } -} - -func TestListRange(t *testing.T) { - // These tests assume the default limit is 10. - total := 20 - for _, test := range []struct { - q *Query - wantStart, wantEnd int - wantErr bool - }{ - {nil, 0, 10, false}, - { - &Query{Limit: 1}, - 0, 1, false, - }, - { - &Query{Limit: 5, ContinuationToken: "1"}, - 1, 6, false, - }, - { - &Query{ContinuationToken: "5"}, - 5, 15, false, - }, - {&Query{Limit: -1}, 0, 0, true}, // negative limit - {&Query{ContinuationToken: "x"}, 0, 0, true}, // not a number - {&Query{ContinuationToken: "-1"}, 0, 0, true}, // too small - {&Query{ContinuationToken: "21"}, 0, 0, true}, // too large - } { - gotStart, gotEnd, err := listRange(test.q, total) - if test.wantErr { - if !errors.Is(err, ErrBadQuery) { - t.Errorf("%+v: got err %v, want errBadQuery", test.q, err) - } - } else if gotStart != test.wantStart || gotEnd != test.wantEnd || err != nil { - t.Errorf("%+v: got (%d, %d, %v), want (%d, %d, nil)", - test.q, gotStart, gotEnd, err, test.wantStart, test.wantEnd) - } - } -} diff --git a/go/core/tracing/store.go b/go/core/tracing/store.go index ff5885acb..fbbab092b 100644 --- a/go/core/tracing/store.go +++ b/go/core/tracing/store.go @@ -14,54 +14,12 @@ package tracing -import ( - "context" - "errors" - -) - -// A Store stores trace information. -// Every trace has a unique ID. -type Store interface { - // Save saves the Data to the store. If a Data with the id already exists, - // the two are merged. - Save(ctx context.Context, id string, td *Data) error - // Load reads the Data with the given ID from the store. - // It returns an error that is fs.ErrNotExist if there isn't one. - Load(ctx context.Context, id string) (*Data, error) - // List returns all the Datas in the store that satisfy q, in some deterministic - // order. - // It also returns a continuation token: an opaque string that can be passed - // to the next call to List to resume the listing from where it left off. If - // the listing reached the end, this is the empty string. - // If the Query is malformed, List returns an error that is errBadQuery. - List(ctx context.Context, q *Query) (tds []*Data, contToken string, err error) - - // LoadAny is like Load, but accepts a pointer to any type. - // It is for testing (see conformance_test.go). - // TODO: replace Load with this. - LoadAny(id string, p any) error -} - -var ErrBadQuery = errors.New("bad trace.Query") - -// A Query filters the result of [Store.List]. -type Query struct { - // Maximum number of traces to return. If zero, a default value may be used. - // Callers should not assume they will get the entire list; they should always - // check the returned continuation token. - Limit int - // Where to continue the listing from. Must be either empty to start from the - // beginning, or the result of a recent, previous call to List. - ContinuationToken string -} - // Data is information about a trace. type Data struct { TraceID string `json:"traceId"` DisplayName string `json:"displayName"` - StartTime Milliseconds `json:"startTime"` - EndTime Milliseconds `json:"endTime"` + StartTime Milliseconds `json:"startTime"` + EndTime Milliseconds `json:"endTime"` Spans map[string]*SpanData `json:"spans"` } @@ -74,8 +32,8 @@ type SpanData struct { SpanID string `json:"spanId"` TraceID string `json:"traceId,omitempty"` ParentSpanID string `json:"parentSpanId,omitempty"` - StartTime Milliseconds `json:"startTime"` - EndTime Milliseconds `json:"endTime"` + StartTime Milliseconds `json:"startTime"` + EndTime Milliseconds `json:"endTime"` Attributes map[string]any `json:"attributes,omitempty"` DisplayName string `json:"displayName"` Links []*Link `json:"links,omitempty"` @@ -97,7 +55,7 @@ type BoolValue struct { type TimeEvent struct { Time Milliseconds `json:"time,omitempty"` - Annotation Annotation `json:"annotation,omitempty"` + Annotation Annotation `json:"annotation,omitempty"` } type Annotation struct { diff --git a/go/core/tracing/telemetry.go b/go/core/tracing/telemetry.go new file mode 100644 index 000000000..b99ef8063 --- /dev/null +++ b/go/core/tracing/telemetry.go @@ -0,0 +1,87 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tracing + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" +) + +type TelemetryClient interface { + Save(ctx context.Context, trace *Data) error +} + +// TestOnlyTelemetryClient is a test-only implementation of TelemetryClient that stores traces in memory. +type TestOnlyTelemetryClient struct { + Traces map[string]*Data +} + +// NewTestOnlyTelemetryClient creates a new in-memory telemetry client for testing. +func NewTestOnlyTelemetryClient() *TestOnlyTelemetryClient { + return &TestOnlyTelemetryClient{ + Traces: make(map[string]*Data), + } +} + +// Save saves the data to an in-memory store. +func (c *TestOnlyTelemetryClient) Save(ctx context.Context, trace *Data) error { + if trace == nil { + return fmt.Errorf("trace cannot be nil") + } + if trace.TraceID == "" { + return fmt.Errorf("trace ID cannot be empty") + } + c.Traces[trace.TraceID] = trace + return nil +} + +type httpTelemetryClient struct { + url string +} + +// NewHTTPTelemetryClient creates a new telemetry client that sends traces to a telemetry server at the given URL. +func NewHTTPTelemetryClient(url string) *httpTelemetryClient { + return &httpTelemetryClient{url: url} +} + +// Save saves the trace data by making a call to the telemetry server. +func (c *httpTelemetryClient) Save(ctx context.Context, trace *Data) error { + if c.url == "" { + return nil + } + body, err := json.Marshal(trace) + if err != nil { + return fmt.Errorf("failed to marshal trace data: %w", err) + } + req, err := http.NewRequestWithContext(ctx, "POST", c.url+"/api/traces", bytes.NewBuffer(body)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + req.Header.Set("Accept", "application/json") + req.Header.Set("Content-Type", "application/json") + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to send request: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("unexpected status code: %d", resp.StatusCode) + } + return nil +} diff --git a/go/core/tracing/trace_store_exporter.go b/go/core/tracing/trace_server_exporter.go similarity index 87% rename from go/core/tracing/trace_store_exporter.go rename to go/core/tracing/trace_server_exporter.go index 93c4e93a4..e33e4a504 100644 --- a/go/core/tracing/trace_store_exporter.go +++ b/go/core/tracing/trace_server_exporter.go @@ -17,6 +17,7 @@ package tracing import ( "context" "errors" + "log/slog" "strings" "go.opentelemetry.io/otel/attribute" @@ -24,20 +25,25 @@ import ( otrace "go.opentelemetry.io/otel/trace" ) -// A traceStoreExporter is an OpenTelemetry SpanExporter that -// writes spans to a TraceStore. -type traceStoreExporter struct { - store Store +// A traceServerExporter is an OpenTelemetry SpanExporter that +// writes spans to the telemetry server. +type traceServerExporter struct { + client TelemetryClient } -func newTraceStoreExporter(store Store) *traceStoreExporter { - return &traceStoreExporter{store} +func newTraceServerExporter(client TelemetryClient) *traceServerExporter { + return &traceServerExporter{client} } // ExportSpans implements [go.opentelemetry.io/otel/sdk/trace.SpanExporter.ExportSpans]. // It saves the spans to e's TraceStore. // Saving is not atomic: it is possible that some but not all spans will be saved. -func (e *traceStoreExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error { +func (e *traceServerExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error { + if e.client == nil { + slog.Debug("telemetry server is not configured, trace not saved") + return nil + } + // Group spans by trace ID. spansByTrace := map[otrace.TraceID][]sdktrace.ReadOnlySpan{} for _, span := range spans { @@ -54,7 +60,8 @@ func (e *traceStoreExporter) ExportSpans(ctx context.Context, spans []sdktrace.R if err != nil { return err } - if err := e.store.Save(ctx, tid.String(), td); err != nil { + td.TraceID = tid.String() + if err := e.client.Save(ctx, td); err != nil { return err } } @@ -157,7 +164,7 @@ func convertStatus(s sdktrace.Status) Status { } // ExportSpans implements [go.opentelemetry.io/otel/sdk/trace.SpanExporter.Shutdown]. -func (e *traceStoreExporter) Shutdown(ctx context.Context) error { +func (e *traceServerExporter) Shutdown(ctx context.Context) error { // NOTE: In the current implementation, this function is never called on the store in the // dev environment. To get that to happen, the Shutdown method on the TracerProvider must // be called. See tracing.go. diff --git a/go/core/tracing/trace_store_exporter_test.go b/go/core/tracing/trace_server_exporter_test.go similarity index 100% rename from go/core/tracing/trace_store_exporter_test.go rename to go/core/tracing/trace_server_exporter_test.go diff --git a/go/core/tracing/tracing.go b/go/core/tracing/tracing.go index 057287370..3869c8dbf 100644 --- a/go/core/tracing/tracing.go +++ b/go/core/tracing/tracing.go @@ -45,12 +45,12 @@ func (ts *State) RegisterSpanProcessor(sp sdktrace.SpanProcessor) { ts.tp.RegisterSpanProcessor(sp) } -// AddTraceStoreImmediate adds tstore to the tracingState. +// WriteTelemetryImmediate adds a telemetry server to the tracingState. // Traces are saved immediately as they are finshed. // Use this for a gtrace.Store with a fast Save method, // such as one that writes to a file. -func (ts *State) AddTraceStoreImmediate(tstore Store) { - e := newTraceStoreExporter(tstore) +func (ts *State) WriteTelemetryImmediate(client TelemetryClient) { + e := newTraceServerExporter(client) // Adding a SimpleSpanProcessor is like using the WithSyncer option. ts.RegisterSpanProcessor(sdktrace.NewSimpleSpanProcessor(e)) // Ignore tracerProvider.Shutdown. It shouldn't be needed when using WithSyncer. @@ -58,14 +58,14 @@ func (ts *State) AddTraceStoreImmediate(tstore Store) { // Also requires traceStoreExporter.Shutdown to be a no-op. } -// AddTraceStoreBatch adds ts to the tracingState. +// WriteTelemetryBatch adds a telemetry server to the tracingState. // Traces are batched before being sent for processing. // Use this for a gtrace.Store with a potentially expensive Save method, // such as one that makes an RPC. // Callers must invoke the returned function at the end of the program to flush the final batch // and perform other cleanup. -func (ts *State) AddTraceStoreBatch(tstore Store) (shutdown func(context.Context) error) { - e := newTraceStoreExporter(tstore) +func (ts *State) WriteTelemetryBatch(client TelemetryClient) (shutdown func(context.Context) error) { + e := newTraceServerExporter(client) // Adding a BatchSpanProcessor is like using the WithBatcher option. ts.RegisterSpanProcessor(sdktrace.NewBatchSpanProcessor(e)) return ts.tp.Shutdown diff --git a/go/genkit/conformance_test.go b/go/genkit/conformance_test.go index 245654ba8..eb9ffcadd 100644 --- a/go/genkit/conformance_test.go +++ b/go/genkit/conformance_test.go @@ -28,6 +28,7 @@ import ( "time" "github.com/firebase/genkit/go/core" + "github.com/firebase/genkit/go/core/tracing" "github.com/firebase/genkit/go/internal/base" "github.com/firebase/genkit/go/internal/registry" "golang.org/x/exp/maps" @@ -80,6 +81,8 @@ func (c *command) run(ctx context.Context, input string) (string, error) { } func TestFlowConformance(t *testing.T) { + tc := tracing.NewTestOnlyTelemetryClient() + registry.Global.TracingState().WriteTelemetryImmediate(tc) testFiles, err := filepath.Glob(filepath.FromSlash("testdata/conformance/*.json")) if err != nil { t.Fatal(err) @@ -115,11 +118,7 @@ func TestFlowConformance(t *testing.T) { if test.Trace == nil { return } - ts := r.LookupTraceStore(registry.EnvironmentDev) - var gotTrace any - if err := ts.LoadAny(resp.Telemetry.TraceID, &gotTrace); err != nil { - t.Fatal(err) - } + gotTrace := tc.Traces[resp.Telemetry.TraceID] renameSpans(t, gotTrace) renameSpans(t, test.Trace) if diff := compareJSON(gotTrace, test.Trace); diff != "" { diff --git a/go/genkit/servers.go b/go/genkit/servers.go index 64f71d2b7..4453c25ab 100644 --- a/go/genkit/servers.go +++ b/go/genkit/servers.go @@ -27,7 +27,6 @@ import ( "encoding/json" "errors" "fmt" - "io/fs" "log/slog" "net/http" "os" @@ -233,10 +232,7 @@ func newDevServeMux(s *devServer) *http.ServeMux { }) handle(mux, "POST /api/runAction", s.handleRunAction) handle(mux, "GET /api/actions", s.handleListActions) - handle(mux, "GET /api/envs/{env}/traces/{traceID}", s.handleGetTrace) - handle(mux, "GET /api/envs/{env}/traces", s.handleListTraces) - handle(mux, "GET /api/envs/{env}/flowStates", s.handleListFlowStates) - + handle(mux, "POST /api/notify", s.handleNotify) return mux } @@ -282,6 +278,24 @@ func (s *devServer) handleRunAction(w http.ResponseWriter, r *http.Request) erro return writeJSON(ctx, w, resp) } +// handleNotify configures the telemetry server URL from the request. +func (s *devServer) handleNotify(w http.ResponseWriter, r *http.Request) error { + var body struct { + TelemetryServerURL string `json:"telemetryServerUrl"` + } + defer r.Body.Close() + if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + return &base.HTTPError{Code: http.StatusBadRequest, Err: err} + } + if body.TelemetryServerURL != "" { + s.reg.TracingState().WriteTelemetryImmediate(tracing.NewHTTPTelemetryClient(body.TelemetryServerURL)) + slog.Debug("connected to telemetry server", "url", body.TelemetryServerURL) + } + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte("OK")) + return err +} + type runActionResponse struct { Result json.RawMessage `json:"result"` Telemetry telemetry `json:"telemetry"` @@ -321,67 +335,6 @@ func (s *devServer) handleListActions(w http.ResponseWriter, r *http.Request) er return writeJSON(r.Context(), w, descMap) } -// handleGetTrace returns a single trace from a TraceStore. -func (s *devServer) handleGetTrace(w http.ResponseWriter, r *http.Request) error { - env := r.PathValue("env") - ts := s.reg.LookupTraceStore(registry.Environment(env)) - if ts == nil { - return &base.HTTPError{Code: http.StatusNotFound, Err: fmt.Errorf("no TraceStore for environment %q", env)} - } - tid := r.PathValue("traceID") - td, err := ts.Load(r.Context(), tid) - if errors.Is(err, fs.ErrNotExist) { - return &base.HTTPError{Code: http.StatusNotFound, Err: fmt.Errorf("no %s trace with ID %q", env, tid)} - } - if err != nil { - return err - } - return writeJSON(r.Context(), w, td) -} - -// handleListTraces returns a list of traces from a TraceStore. -func (s *devServer) handleListTraces(w http.ResponseWriter, r *http.Request) error { - env := r.PathValue("env") - ts := s.reg.LookupTraceStore(registry.Environment(env)) - if ts == nil { - return &base.HTTPError{Code: http.StatusNotFound, Err: fmt.Errorf("no TraceStore for environment %q", env)} - } - limit := 0 - if lim := r.FormValue("limit"); lim != "" { - var err error - limit, err = strconv.Atoi(lim) - if err != nil { - return &base.HTTPError{Code: http.StatusBadRequest, Err: err} - } - } - ctoken := r.FormValue("continuationToken") - tds, ctoken, err := ts.List(r.Context(), &tracing.Query{Limit: limit, ContinuationToken: ctoken}) - if errors.Is(err, tracing.ErrBadQuery) { - return &base.HTTPError{Code: http.StatusBadRequest, Err: err} - } - if err != nil { - return err - } - if tds == nil { - tds = []*tracing.Data{} - } - return writeJSON(r.Context(), w, listTracesResult{tds, ctoken}) -} - -type listTracesResult struct { - Traces []*tracing.Data `json:"traces"` - ContinuationToken string `json:"continuationToken"` -} - -func (s *devServer) handleListFlowStates(w http.ResponseWriter, r *http.Request) error { - return writeJSON(r.Context(), w, listFlowStatesResult{[]base.FlowStater{}, ""}) -} - -type listFlowStatesResult struct { - FlowStates []base.FlowStater `json:"flowStates"` - ContinuationToken string `json:"continuationToken"` -} - // NewFlowServeMux constructs a [net/http.ServeMux]. // If flows is non-empty, the each of the named flows is registered as a route. // Otherwise, all defined flows are registered. diff --git a/go/genkit/servers_test.go b/go/genkit/servers_test.go index f590e2986..a5aee537e 100644 --- a/go/genkit/servers_test.go +++ b/go/genkit/servers_test.go @@ -55,6 +55,8 @@ func TestDevServer(t *testing.T) { }, nil, dec) srv := httptest.NewServer(newDevServeMux(&devServer{reg: r})) defer srv.Close() + tc := tracing.NewTestOnlyTelemetryClient() + registry.Global.TracingState().WriteTelemetryImmediate(tc) t.Run("runAction", func(t *testing.T) { body := `{"key": "/custom/devServer/inc", "input": 3}` @@ -77,7 +79,7 @@ func TestDevServer(t *testing.T) { if len(tid) != 32 { t.Errorf("trace ID is %q, wanted 32-byte string", tid) } - checkActionTrace(t, r, tid, "inc") + checkActionTrace(t, tc, tid, "inc") }) t.Run("list actions", func(t *testing.T) { res, err := http.Get(srv.URL + "/api/actions") @@ -113,20 +115,6 @@ func TestDevServer(t *testing.T) { t.Errorf("mismatch (-want, +got):\n%s", diff) } }) - t.Run("list traces", func(t *testing.T) { - res, err := http.Get(srv.URL + "/api/envs/dev/traces") - if err != nil { - t.Fatal(err) - } - if res.StatusCode != 200 { - t.Fatalf("got status %d, wanted 200", res.StatusCode) - } - _, err = readJSON[listTracesResult](res.Body) - if err != nil { - t.Fatal(err) - } - // We may have any result, including internal.Zero traces, so don't check anything else. - }) } func TestProdServer(t *testing.T) { @@ -178,11 +166,10 @@ func TestProdServer(t *testing.T) { t.Run("bad", func(t *testing.T) { check(t, "true", 400, 0) }) } -func checkActionTrace(t *testing.T, reg *registry.Registry, tid, name string) { - ts := reg.LookupTraceStore(registry.EnvironmentDev) - td, err := ts.Load(context.Background(), tid) - if err != nil { - t.Fatal(err) +func checkActionTrace(t *testing.T, tc *tracing.TestOnlyTelemetryClient, tid, name string) { + td := tc.Traces[tid] + if td == nil { + t.Fatalf("trace %q not found", tid) } rootSpan := findRootSpan(t, td.Spans) want := &tracing.SpanData{ diff --git a/go/internal/registry/registry.go b/go/internal/registry/registry.go index c57fb7331..4edfed350 100644 --- a/go/internal/registry/registry.go +++ b/go/internal/registry/registry.go @@ -15,12 +15,10 @@ package registry import ( - "crypto/md5" "fmt" "log" "log/slog" "os" - "path/filepath" "slices" "sync" @@ -52,40 +50,18 @@ type Registry struct { frozen bool // when true, no more additions actions map[string]action.Action flows []Flow - // TraceStores, at most one for each [Environment]. - // Only the prod trace store is actually registered; the dev one is - // always created automatically. But it's simpler if we keep them together here. - traceStores map[Environment]tracing.Store } func New() (*Registry, error) { r := &Registry{ - actions: map[string]action.Action{}, - traceStores: map[Environment]tracing.Store{}, + actions: map[string]action.Action{}, } - tstore, err := newDevStore() - if err != nil { - return nil, err - } - r.RegisterTraceStore(EnvironmentDev, tstore) r.tstate = tracing.NewState() - r.tstate.AddTraceStoreImmediate(tstore) return r, nil } func (r *Registry) TracingState() *tracing.State { return r.tstate } -func newDevStore() (tracing.Store, error) { - programName := filepath.Base(os.Args[0]) - rootHash := fmt.Sprintf("%02x", md5.Sum([]byte(programName))) - dir := filepath.Join(os.TempDir(), ".genkit", rootHash, "traces") - if err := os.MkdirAll(dir, 0o755); err != nil { - return nil, err - } - // Don't remove the temp directory, for post-mortem debugging. - return tracing.NewFileStore(dir) -} - // RegisterAction records the action in the registry. // It panics if an action with the same type, provider and name is already // registered. @@ -155,21 +131,6 @@ func (r *Registry) ListFlows() []Flow { return r.flows } -func (r *Registry) RegisterTraceStore(env Environment, ts tracing.Store) { - r.mu.Lock() - defer r.mu.Unlock() - if _, ok := r.traceStores[env]; ok { - panic(fmt.Sprintf("RegisterTraceStore called twice for environment %q", env)) - } - r.traceStores[env] = ts -} - -func (r *Registry) LookupTraceStore(env Environment) tracing.Store { - r.mu.Lock() - defer r.mu.Unlock() - return r.traceStores[env] -} - func (r *Registry) RegisterSpanProcessor(sp sdktrace.SpanProcessor) { r.tstate.RegisterSpanProcessor(sp) } From 61afeb45b6bbc0e3d3d601515385776da3e5b87e Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Mon, 11 Nov 2024 11:42:35 -0800 Subject: [PATCH 2/2] Fixed tests. --- go/core/tracing/telemetry.go | 11 ++++++++++- go/genkit/conformance_test.go | 16 ++++++++++++---- go/genkit/servers_test.go | 8 ++++++-- 3 files changed, 28 insertions(+), 7 deletions(-) diff --git a/go/core/tracing/telemetry.go b/go/core/tracing/telemetry.go index b99ef8063..6025a54a2 100644 --- a/go/core/tracing/telemetry.go +++ b/go/core/tracing/telemetry.go @@ -46,7 +46,16 @@ func (c *TestOnlyTelemetryClient) Save(ctx context.Context, trace *Data) error { if trace.TraceID == "" { return fmt.Errorf("trace ID cannot be empty") } - c.Traces[trace.TraceID] = trace + if existing, ok := c.Traces[trace.TraceID]; ok { + for _, span := range trace.Spans { + existing.Spans[span.SpanID] = span + } + if existing.DisplayName == "" { + existing.DisplayName = trace.DisplayName + } + } else { + c.Traces[trace.TraceID] = trace + } return nil } diff --git a/go/genkit/conformance_test.go b/go/genkit/conformance_test.go index eb9ffcadd..fc10c02b3 100644 --- a/go/genkit/conformance_test.go +++ b/go/genkit/conformance_test.go @@ -81,8 +81,6 @@ func (c *command) run(ctx context.Context, input string) (string, error) { } func TestFlowConformance(t *testing.T) { - tc := tracing.NewTestOnlyTelemetryClient() - registry.Global.TracingState().WriteTelemetryImmediate(tc) testFiles, err := filepath.Glob(filepath.FromSlash("testdata/conformance/*.json")) if err != nil { t.Fatal(err) @@ -101,6 +99,8 @@ func TestFlowConformance(t *testing.T) { if err != nil { t.Fatal(err) } + tc := tracing.NewTestOnlyTelemetryClient() + r.TracingState().WriteTelemetryImmediate(tc) _ = defineFlow(r, test.Name, flowFunction(test.Commands)) key := fmt.Sprintf("/flow/%s", test.Name) resp, err := runAction(context.Background(), r, key, test.Input, nil) @@ -119,9 +119,17 @@ func TestFlowConformance(t *testing.T) { return } gotTrace := tc.Traces[resp.Telemetry.TraceID] - renameSpans(t, gotTrace) + var gotTraceAny map[string]any + gotTraceBytes, err := json.Marshal(gotTrace) + if err != nil { + t.Fatal(err) + } + if err := json.Unmarshal(gotTraceBytes, &gotTraceAny); err != nil { + t.Fatal(err) + } + renameSpans(t, gotTraceAny) renameSpans(t, test.Trace) - if diff := compareJSON(gotTrace, test.Trace); diff != "" { + if diff := compareJSON(gotTraceAny, test.Trace); diff != "" { t.Errorf("trace:\n%s", diff) } }) diff --git a/go/genkit/servers_test.go b/go/genkit/servers_test.go index a5aee537e..8017a94b5 100644 --- a/go/genkit/servers_test.go +++ b/go/genkit/servers_test.go @@ -47,6 +47,9 @@ func TestDevServer(t *testing.T) { if err != nil { t.Fatal(err) } + tc := tracing.NewTestOnlyTelemetryClient() + r.TracingState().WriteTelemetryImmediate(tc) + core.DefineActionInRegistry(r, "devServer", "inc", atype.Custom, map[string]any{ "foo": "bar", }, nil, inc) @@ -55,8 +58,6 @@ func TestDevServer(t *testing.T) { }, nil, dec) srv := httptest.NewServer(newDevServeMux(&devServer{reg: r})) defer srv.Close() - tc := tracing.NewTestOnlyTelemetryClient() - registry.Global.TracingState().WriteTelemetryImmediate(tc) t.Run("runAction", func(t *testing.T) { body := `{"key": "/custom/devServer/inc", "input": 3}` @@ -122,6 +123,9 @@ func TestProdServer(t *testing.T) { if err != nil { t.Fatal(err) } + tc := tracing.NewTestOnlyTelemetryClient() + r.TracingState().WriteTelemetryImmediate(tc) + defineFlow(r, "inc", func(_ context.Context, i int, _ noStream) (int, error) { return i + 1, nil })