From 7851ed36876b7841b0be9be3d0a20cb1d8ae32e0 Mon Sep 17 00:00:00 2001 From: Francesco Vigliaturo Date: Mon, 17 Jun 2024 10:47:56 +0200 Subject: [PATCH] feat: add endpoint to return a profile from a list of chunk IDs (#470) * add endpoint to return a profile from a list of chunk IDs --- CHANGELOG.md | 1 + cmd/vroom/chunk.go | 168 ++++++++++++++++++++++++++ cmd/vroom/config.go | 5 +- cmd/vroom/main.go | 16 ++- internal/chunk/chunk.go | 13 +- internal/chunk/chunk_utils.go | 84 +++++++++++++ internal/chunk/chunk_utils_test.go | 97 +++++++++++++++ internal/measurements/measurements.go | 12 ++ 8 files changed, 391 insertions(+), 5 deletions(-) create mode 100644 internal/chunk/chunk_utils.go create mode 100644 internal/chunk/chunk_utils_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 01a7fe9a..40e48c84 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ - Exclude unsymbolicated frames from metrics ingestion ([#441](https://github.com/getsentry/vroom/pull/441)) - Filter out system frames when ingesting functions into generic metrics ([#444](https://github.com/getsentry/vroom/pull/444)) - Store profile chunks. ([#463](https://github.com/getsentry/vroom/pull/463)) +- Add endpoint to return a profile from a list of chunk IDs ([#470](https://github.com/getsentry/vroom/pull/470)) **Bug Fixes**: diff --git a/cmd/vroom/chunk.go b/cmd/vroom/chunk.go index ac5923fb..e3a71aec 100644 --- a/cmd/vroom/chunk.go +++ b/cmd/vroom/chunk.go @@ -4,13 +4,17 @@ import ( "context" "encoding/json" "errors" + "fmt" "io" "net/http" "strconv" "github.com/getsentry/sentry-go" + "github.com/julienschmidt/httprouter" "github.com/segmentio/kafka-go" + "gocloud.dev/blob" "gocloud.dev/gcerrors" + "google.golang.org/api/googleapi" "github.com/getsentry/vroom/internal/chunk" "github.com/getsentry/vroom/internal/storageutil" @@ -99,6 +103,136 @@ func (env *environment) postChunk(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } +type postProfileFromChunkIDsRequest struct { + ProfilerID string `json:"profiler_id"` + ChunkIDs []string `json:"chunk_ids"` +} + +// Instead of returning Chunk directly, we'll return this struct +// that wraps a chunk. +// This way, if we decide to later add a few more utility fields +// (for pagination, etc.) we won't have to change the Chunk struct. +type postProfileFromChunkIDsResponse struct { + Chunk chunk.Chunk `json:"chunk"` +} + +// This is more of a GET method, but since we're receeiving a list of chunk IDs as part of a +// body request, we use a POST method instead (similarly to the flamegraph endpoint). +func (env *environment) postProfileFromChunkIDs(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + hub := sentry.GetHubFromContext(ctx) + ps := httprouter.ParamsFromContext(ctx) + rawOrganizationID := ps.ByName("organization_id") + organizationID, err := strconv.ParseUint(rawOrganizationID, 10, 64) + if err != nil { + hub.CaptureException(err) + w.WriteHeader(http.StatusBadRequest) + return + } + + hub.Scope().SetTag("organization_id", rawOrganizationID) + + rawProjectID := ps.ByName("project_id") + projectID, err := strconv.ParseUint(rawProjectID, 10, 64) + if err != nil { + sentry.CaptureException(err) + w.WriteHeader(http.StatusBadRequest) + return + } + hub.Scope().SetTag("project_id", rawProjectID) + + var requestBody postProfileFromChunkIDsRequest + s := sentry.StartSpan(ctx, "processing") + s.Description = "Decoding data" + err = json.NewDecoder(r.Body).Decode(&requestBody) + s.Finish() + if err != nil { + hub.CaptureException(err) + w.WriteHeader(http.StatusBadRequest) + return + } + + hub.Scope().SetTag("num_chunks", fmt.Sprintf("%d", len(requestBody.ChunkIDs))) + s = sentry.StartSpan(ctx, "chunks.read") + s.Description = "Read profile chunks from GCS" + + results := make(chan TaskOutput, len(requestBody.ChunkIDs)) + // send a task to the workers pool for each chunk + for _, ID := range requestBody.ChunkIDs { + jobs <- TaskInput{ + Ctx: ctx, + ProfilerID: requestBody.ProfilerID, + ChunkID: ID, + OrganizationID: organizationID, + ProjectID: projectID, + Storage: env.storage, + Result: results, + } + } + + chunks := make([]chunk.Chunk, 0, len(requestBody.ChunkIDs)) + // read the output of each tasks + for i := 0; i < len(requestBody.ChunkIDs); i++ { + res := <-results + // if there was an error we assign it to the global error + // so that we can later handle the response appropriately + // and then we skip + if res.Err != nil { + err = res.Err + continue + } else if err != nil { + // if this specific chunk download did not produce an error, + // but a previous one did, we also skip since it doesn't make + // sense to have a final profile with missing chunks + continue + } + chunks = append(chunks, res.Chunk) + } + s.Finish() + close(results) + if err != nil { + if errors.Is(err, storageutil.ErrObjectNotFound) { + w.WriteHeader(http.StatusNotFound) + return + } + var e *googleapi.Error + if ok := errors.As(err, &e); ok { + hub.Scope().SetContext("Google Cloud Storage Error", map[string]interface{}{ + "body": e.Body, + "code": e.Code, + "details": e.Details, + "message": e.Message, + }) + } + hub.CaptureException(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + s = sentry.StartSpan(ctx, "chunks.merge") + s.Description = "Merge profile chunks into a single one" + chunk, err := chunk.MergeChunks(chunks) + s.Finish() + if err != nil { + hub.CaptureException(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + s = sentry.StartSpan(ctx, "json.marshal") + defer s.Finish() + b, err := json.Marshal(postProfileFromChunkIDsResponse{Chunk: chunk}) + if err != nil { + hub.CaptureException(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write(b) +} + type ( ChunkKafkaMessage struct { ProjectID uint64 `json:"project_id"` @@ -125,3 +259,37 @@ func buildChunkKafkaMessage(c *chunk.Chunk) *ChunkKafkaMessage { RetentionDays: c.RetentionDays, } } + +// A worker download a chunk and send it back through +// the Result channel. +func worker(jobs <-chan TaskInput) { + for task := range jobs { + var c chunk.Chunk + err := storageutil.UnmarshalCompressed( + task.Ctx, + task.Storage, + chunk.StoragePath(task.OrganizationID, task.ProjectID, task.ProfilerID, task.ChunkID), + &c, + ) + task.Result <- TaskOutput{Chunk: c, Err: err} + } +} + +// The task the workers expect as input. +// +// Result: the channel used to send back the output. +type TaskInput struct { + Ctx context.Context + ProfilerID string + ChunkID string + OrganizationID uint64 + ProjectID uint64 + Storage *blob.Bucket + Result chan<- TaskOutput +} + +// The output sent back by the worker. +type TaskOutput struct { + Err error + Chunk chunk.Chunk +} diff --git a/cmd/vroom/config.go b/cmd/vroom/config.go index 8920f3a6..f11f7646 100644 --- a/cmd/vroom/config.go +++ b/cmd/vroom/config.go @@ -2,8 +2,9 @@ package main type ( ServiceConfig struct { - Environment string `env:"SENTRY_ENVIRONMENT" env-default:"development"` - Port int `env:"PORT" env-default:"8085"` + Environment string `env:"SENTRY_ENVIRONMENT" env-default:"development"` + Port int `env:"PORT" env-default:"8085"` + WorkerPoolSize int `env:"WORKER_POOL_SIZE" env-default:"100"` SentryDSN string `env:"SENTRY_DSN"` diff --git a/cmd/vroom/main.go b/cmd/vroom/main.go index 93a67446..c2421f5f 100644 --- a/cmd/vroom/main.go +++ b/cmd/vroom/main.go @@ -45,7 +45,10 @@ type environment struct { metricsClient *http.Client } -var release string +var ( + release string + jobs chan TaskInput +) const ( KiB int64 = 1024 @@ -162,6 +165,11 @@ func (e *environment) newRouter() (*httprouter.Router, error) { "/organizations/:organization_id/projects/:project_id/flamegraph", e.postFlamegraphFromProfileIDs, }, + { + http.MethodPost, + "/organizations/:organization_id/projects/:project_id/chunks", + e.postProfileFromChunkIDs, + }, {http.MethodGet, "/health", e.getHealth}, {http.MethodPost, "/chunk", e.postChunk}, {http.MethodPost, "/profile", e.postProfile}, @@ -245,6 +253,11 @@ func main() { slog.Info("vroom started") + jobs = make(chan TaskInput, env.config.WorkerPoolSize) + for i := 0; i < env.config.WorkerPoolSize; i++ { + go worker(jobs) + } + err = server.ListenAndServe() if err != nil && err != http.ErrServerClosed { sentry.CaptureException(err) @@ -254,6 +267,7 @@ func main() { <-waitForShutdown // Shutdown the rest of the environment after the HTTP connections are closed + close(jobs) env.shutdown() slog.Info("vroom graceful shutdown") } diff --git a/internal/chunk/chunk.go b/internal/chunk/chunk.go index aa912bb0..b4cd3370 100644 --- a/internal/chunk/chunk.go +++ b/internal/chunk/chunk.go @@ -49,8 +49,7 @@ type ( ) func (c *Chunk) StoragePath() string { - return fmt.Sprintf( - "%d/%d/%s/%s", + return StoragePath( c.OrganizationID, c.ProjectID, c.ProfilerID, @@ -65,3 +64,13 @@ func (c *Chunk) StartEndTimestamps() (float64, float64) { } return c.Profile.Samples[0].Timestamp, c.Profile.Samples[count-1].Timestamp } + +func StoragePath(OrganizationID uint64, ProjectID uint64, ProfilerID string, ID string) string { + return fmt.Sprintf( + "%d/%d/%s/%s", + OrganizationID, + ProjectID, + ProfilerID, + ID, + ) +} diff --git a/internal/chunk/chunk_utils.go b/internal/chunk/chunk_utils.go new file mode 100644 index 00000000..8287307f --- /dev/null +++ b/internal/chunk/chunk_utils.go @@ -0,0 +1,84 @@ +package chunk + +import ( + "encoding/json" + "sort" + + "github.com/getsentry/vroom/internal/measurements" +) + +func MergeChunks(chunks []Chunk) (Chunk, error) { + if len(chunks) == 0 { + return Chunk{}, nil + } + sort.Slice(chunks, func(i, j int) bool { + _, endFirstChunk := chunks[i].StartEndTimestamps() + startSecondChunk, _ := chunks[j].StartEndTimestamps() + return endFirstChunk <= startSecondChunk + }) + + var mergedMeasurement = make(map[string]measurements.MeasurementV2) + + chunk := chunks[0] + if len(chunk.Measurements) > 0 { + err := json.Unmarshal(chunk.Measurements, &mergedMeasurement) + if err != nil { + return Chunk{}, err + } + } + + for i := 1; i < len(chunks); i++ { + c := chunks[i] + // Update all the frame indices of the chunk we're going to add/merge + // to the first one. + // If the first chunk had a couple of frames, and the second chunk too, + // then all the stacks in the second chunk that refers to frames at index + // fr[0] and fr[1], once merged should refer to frames at index fr[2], fr[3]. + for j, stack := range c.Profile.Stacks { + for z, frameID := range stack { + c.Profile.Stacks[j][z] = frameID + len(chunk.Profile.Frames) + } + } + chunk.Profile.Frames = append(chunk.Profile.Frames, c.Profile.Frames...) + // The same goes for chunk samples stack IDs + for j, sample := range c.Profile.Samples { + c.Profile.Samples[j].StackID = sample.StackID + len(chunk.Profile.Stacks) + } + chunk.Profile.Stacks = append(chunk.Profile.Stacks, c.Profile.Stacks...) + chunk.Profile.Samples = append(chunk.Profile.Samples, c.Profile.Samples...) + + // Update threadMetadata + for k, threadMetadata := range c.Profile.ThreadMetadata { + if _, ok := chunk.Profile.ThreadMetadata[k]; !ok { + chunk.Profile.ThreadMetadata[k] = threadMetadata + } + } + + // In case we have measurements, merge them too + if len(c.Measurements) > 0 { + var chunkMeasurements map[string]measurements.MeasurementV2 + err := json.Unmarshal(c.Measurements, &chunkMeasurements) + if err != nil { + return Chunk{}, err + } + for k, measurement := range chunkMeasurements { + if el, ok := mergedMeasurement[k]; ok { + el.Values = append(el.Values, measurement.Values...) + mergedMeasurement[k] = el + } else { + mergedMeasurement[k] = measurement + } + } + } + } + + if len(mergedMeasurement) > 0 { + jsonRawMesaurement, err := json.Marshal(mergedMeasurement) + if err != nil { + return Chunk{}, err + } + chunk.Measurements = jsonRawMesaurement + } + + return chunk, nil +} diff --git a/internal/chunk/chunk_utils_test.go b/internal/chunk/chunk_utils_test.go new file mode 100644 index 00000000..133bdef6 --- /dev/null +++ b/internal/chunk/chunk_utils_test.go @@ -0,0 +1,97 @@ +package chunk + +import ( + "encoding/json" + "testing" + + "github.com/getsentry/vroom/internal/frame" + "github.com/getsentry/vroom/internal/sample" + "github.com/getsentry/vroom/internal/testutil" +) + +func TestMergeChunks(t *testing.T) { + tests := []struct { + name string + have []Chunk + want Chunk + }{ + { + name: "contiguous chunks", + have: []Chunk{ + { + Profile: Data{ + Frames: []frame.Frame{ + {Function: "c"}, + {Function: "d"}, + }, + Samples: []Sample{ + {StackID: 0, Timestamp: 2.0}, + {StackID: 1, Timestamp: 3.0}, + }, + Stacks: [][]int{ + {0, 1}, + {0, 1}, + }, + ThreadMetadata: map[string]sample.ThreadMetadata{"0x000000016d8fb180": {Name: "com.apple.network.connections"}}, + }, + Measurements: json.RawMessage(`{"first_metric":{"unit":"ms","values":[{"timestamp":2.0,"value":1.2}]}}`), + }, + // other chunk + { + Profile: Data{ + Frames: []frame.Frame{ + {Function: "a"}, + {Function: "b"}, + }, + Samples: []Sample{ + {StackID: 0, Timestamp: 0.0}, + {StackID: 1, Timestamp: 1.0}, + }, + Stacks: [][]int{ + {0, 1}, + {0, 1}, + }, + ThreadMetadata: map[string]sample.ThreadMetadata{"0x0000000102adc700": {Name: "com.apple.main-thread"}}, + }, + Measurements: json.RawMessage(`{"first_metric":{"unit":"ms","values":[{"timestamp":1.0,"value":1}]}}`), + }, + }, + want: Chunk{ + Profile: Data{ + Frames: []frame.Frame{ + {Function: "a"}, + {Function: "b"}, + {Function: "c"}, + {Function: "d"}, + }, + Samples: []Sample{ + {StackID: 0, Timestamp: 0.0}, + {StackID: 1, Timestamp: 1.0}, + {StackID: 2, Timestamp: 2.0}, + {StackID: 3, Timestamp: 3.0}, + }, + Stacks: [][]int{ + {0, 1}, + {0, 1}, + {2, 3}, + {2, 3}, + }, + ThreadMetadata: map[string]sample.ThreadMetadata{"0x0000000102adc700": {Name: "com.apple.main-thread"}, "0x000000016d8fb180": {Name: "com.apple.network.connections"}}, + }, + Measurements: json.RawMessage(`{"first_metric":{"unit":"ms","values":[{"timestamp":1,"value":1},{"timestamp":2,"value":1.2}]}}`), + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + have, err := MergeChunks(test.have) + if err != nil { + t.Fatal(err) + } + if diff := testutil.Diff(have, test.want); diff != "" { + t.Fatalf("Result mismatch: got - want +\n%s", diff) + } + }) + } +} diff --git a/internal/measurements/measurements.go b/internal/measurements/measurements.go index 34ae3292..1595e2a5 100644 --- a/internal/measurements/measurements.go +++ b/internal/measurements/measurements.go @@ -9,3 +9,15 @@ type MeasurementValue struct { ElapsedSinceStartNs uint64 `json:"elapsed_since_start_ns"` Value float64 `json:"value"` } + +type MeasurementV2 struct { + Unit string `json:"unit"` + Values []MeasurementValueV2 `json:"values"` +} + +// https://github.com/getsentry/relay/blob/master/relay-profiling/src/measurements.rs#L23-L29 +type MeasurementValueV2 struct { + // UNIX timestamp in seconds as a float + Timestamp float64 `json:"timestamp"` + Value float64 `json:"value"` +}