Skip to content

Commit

Permalink
feat: add endpoint to return a profile from a list of chunk IDs (#470)
Browse files Browse the repository at this point in the history
* add endpoint to return a profile from a list of chunk IDs
  • Loading branch information
viglia authored Jun 17, 2024
1 parent 238f963 commit 7851ed3
Show file tree
Hide file tree
Showing 8 changed files with 391 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- Exclude unsymbolicated frames from metrics ingestion ([#441](https://github.com/getsentry/vroom/pull/441))
- Filter out system frames when ingesting functions into generic metrics ([#444](https://github.com/getsentry/vroom/pull/444))
- Store profile chunks. ([#463](https://github.com/getsentry/vroom/pull/463))
- Add endpoint to return a profile from a list of chunk IDs ([#470](https://github.com/getsentry/vroom/pull/470))

**Bug Fixes**:

Expand Down
168 changes: 168 additions & 0 deletions cmd/vroom/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"

"github.com/getsentry/sentry-go"
"github.com/julienschmidt/httprouter"
"github.com/segmentio/kafka-go"
"gocloud.dev/blob"
"gocloud.dev/gcerrors"
"google.golang.org/api/googleapi"

"github.com/getsentry/vroom/internal/chunk"
"github.com/getsentry/vroom/internal/storageutil"
Expand Down Expand Up @@ -99,6 +103,136 @@ func (env *environment) postChunk(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}

type postProfileFromChunkIDsRequest struct {
ProfilerID string `json:"profiler_id"`
ChunkIDs []string `json:"chunk_ids"`
}

// Instead of returning Chunk directly, we'll return this struct
// that wraps a chunk.
// This way, if we decide to later add a few more utility fields
// (for pagination, etc.) we won't have to change the Chunk struct.
type postProfileFromChunkIDsResponse struct {
Chunk chunk.Chunk `json:"chunk"`
}

// This is more of a GET method, but since we're receeiving a list of chunk IDs as part of a
// body request, we use a POST method instead (similarly to the flamegraph endpoint).
func (env *environment) postProfileFromChunkIDs(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
hub := sentry.GetHubFromContext(ctx)
ps := httprouter.ParamsFromContext(ctx)
rawOrganizationID := ps.ByName("organization_id")
organizationID, err := strconv.ParseUint(rawOrganizationID, 10, 64)
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusBadRequest)
return
}

hub.Scope().SetTag("organization_id", rawOrganizationID)

rawProjectID := ps.ByName("project_id")
projectID, err := strconv.ParseUint(rawProjectID, 10, 64)
if err != nil {
sentry.CaptureException(err)
w.WriteHeader(http.StatusBadRequest)
return
}
hub.Scope().SetTag("project_id", rawProjectID)

var requestBody postProfileFromChunkIDsRequest
s := sentry.StartSpan(ctx, "processing")
s.Description = "Decoding data"
err = json.NewDecoder(r.Body).Decode(&requestBody)
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusBadRequest)
return
}

hub.Scope().SetTag("num_chunks", fmt.Sprintf("%d", len(requestBody.ChunkIDs)))
s = sentry.StartSpan(ctx, "chunks.read")
s.Description = "Read profile chunks from GCS"

results := make(chan TaskOutput, len(requestBody.ChunkIDs))
// send a task to the workers pool for each chunk
for _, ID := range requestBody.ChunkIDs {
jobs <- TaskInput{
Ctx: ctx,
ProfilerID: requestBody.ProfilerID,
ChunkID: ID,
OrganizationID: organizationID,
ProjectID: projectID,
Storage: env.storage,
Result: results,
}
}

chunks := make([]chunk.Chunk, 0, len(requestBody.ChunkIDs))
// read the output of each tasks
for i := 0; i < len(requestBody.ChunkIDs); i++ {
res := <-results
// if there was an error we assign it to the global error
// so that we can later handle the response appropriately
// and then we skip
if res.Err != nil {
err = res.Err
continue
} else if err != nil {
// if this specific chunk download did not produce an error,
// but a previous one did, we also skip since it doesn't make
// sense to have a final profile with missing chunks
continue
}
chunks = append(chunks, res.Chunk)
}
s.Finish()
close(results)
if err != nil {
if errors.Is(err, storageutil.ErrObjectNotFound) {
w.WriteHeader(http.StatusNotFound)
return
}
var e *googleapi.Error
if ok := errors.As(err, &e); ok {
hub.Scope().SetContext("Google Cloud Storage Error", map[string]interface{}{
"body": e.Body,
"code": e.Code,
"details": e.Details,
"message": e.Message,
})
}
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

s = sentry.StartSpan(ctx, "chunks.merge")
s.Description = "Merge profile chunks into a single one"
chunk, err := chunk.MergeChunks(chunks)
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

s = sentry.StartSpan(ctx, "json.marshal")
defer s.Finish()
b, err := json.Marshal(postProfileFromChunkIDsResponse{Chunk: chunk})
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(b)
}

type (
ChunkKafkaMessage struct {
ProjectID uint64 `json:"project_id"`
Expand All @@ -125,3 +259,37 @@ func buildChunkKafkaMessage(c *chunk.Chunk) *ChunkKafkaMessage {
RetentionDays: c.RetentionDays,
}
}

// A worker download a chunk and send it back through
// the Result channel.
func worker(jobs <-chan TaskInput) {
for task := range jobs {
var c chunk.Chunk
err := storageutil.UnmarshalCompressed(
task.Ctx,
task.Storage,
chunk.StoragePath(task.OrganizationID, task.ProjectID, task.ProfilerID, task.ChunkID),
&c,
)
task.Result <- TaskOutput{Chunk: c, Err: err}
}
}

// The task the workers expect as input.
//
// Result: the channel used to send back the output.
type TaskInput struct {
Ctx context.Context
ProfilerID string
ChunkID string
OrganizationID uint64
ProjectID uint64
Storage *blob.Bucket
Result chan<- TaskOutput
}

// The output sent back by the worker.
type TaskOutput struct {
Err error
Chunk chunk.Chunk
}
5 changes: 3 additions & 2 deletions cmd/vroom/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package main

type (
ServiceConfig struct {
Environment string `env:"SENTRY_ENVIRONMENT" env-default:"development"`
Port int `env:"PORT" env-default:"8085"`
Environment string `env:"SENTRY_ENVIRONMENT" env-default:"development"`
Port int `env:"PORT" env-default:"8085"`
WorkerPoolSize int `env:"WORKER_POOL_SIZE" env-default:"100"`

SentryDSN string `env:"SENTRY_DSN"`

Expand Down
16 changes: 15 additions & 1 deletion cmd/vroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ type environment struct {
metricsClient *http.Client
}

var release string
var (
release string
jobs chan TaskInput
)

const (
KiB int64 = 1024
Expand Down Expand Up @@ -162,6 +165,11 @@ func (e *environment) newRouter() (*httprouter.Router, error) {
"/organizations/:organization_id/projects/:project_id/flamegraph",
e.postFlamegraphFromProfileIDs,
},
{
http.MethodPost,
"/organizations/:organization_id/projects/:project_id/chunks",
e.postProfileFromChunkIDs,
},
{http.MethodGet, "/health", e.getHealth},
{http.MethodPost, "/chunk", e.postChunk},
{http.MethodPost, "/profile", e.postProfile},
Expand Down Expand Up @@ -245,6 +253,11 @@ func main() {

slog.Info("vroom started")

jobs = make(chan TaskInput, env.config.WorkerPoolSize)
for i := 0; i < env.config.WorkerPoolSize; i++ {
go worker(jobs)
}

err = server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
sentry.CaptureException(err)
Expand All @@ -254,6 +267,7 @@ func main() {
<-waitForShutdown

// Shutdown the rest of the environment after the HTTP connections are closed
close(jobs)
env.shutdown()
slog.Info("vroom graceful shutdown")
}
Expand Down
13 changes: 11 additions & 2 deletions internal/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ type (
)

func (c *Chunk) StoragePath() string {
return fmt.Sprintf(
"%d/%d/%s/%s",
return StoragePath(
c.OrganizationID,
c.ProjectID,
c.ProfilerID,
Expand All @@ -65,3 +64,13 @@ func (c *Chunk) StartEndTimestamps() (float64, float64) {
}
return c.Profile.Samples[0].Timestamp, c.Profile.Samples[count-1].Timestamp
}

func StoragePath(OrganizationID uint64, ProjectID uint64, ProfilerID string, ID string) string {
return fmt.Sprintf(
"%d/%d/%s/%s",
OrganizationID,
ProjectID,
ProfilerID,
ID,
)
}
84 changes: 84 additions & 0 deletions internal/chunk/chunk_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package chunk

import (
"encoding/json"
"sort"

"github.com/getsentry/vroom/internal/measurements"
)

func MergeChunks(chunks []Chunk) (Chunk, error) {
if len(chunks) == 0 {
return Chunk{}, nil
}
sort.Slice(chunks, func(i, j int) bool {
_, endFirstChunk := chunks[i].StartEndTimestamps()
startSecondChunk, _ := chunks[j].StartEndTimestamps()
return endFirstChunk <= startSecondChunk
})

var mergedMeasurement = make(map[string]measurements.MeasurementV2)

chunk := chunks[0]
if len(chunk.Measurements) > 0 {
err := json.Unmarshal(chunk.Measurements, &mergedMeasurement)
if err != nil {
return Chunk{}, err
}
}

for i := 1; i < len(chunks); i++ {
c := chunks[i]
// Update all the frame indices of the chunk we're going to add/merge
// to the first one.
// If the first chunk had a couple of frames, and the second chunk too,
// then all the stacks in the second chunk that refers to frames at index
// fr[0] and fr[1], once merged should refer to frames at index fr[2], fr[3].
for j, stack := range c.Profile.Stacks {
for z, frameID := range stack {
c.Profile.Stacks[j][z] = frameID + len(chunk.Profile.Frames)
}
}
chunk.Profile.Frames = append(chunk.Profile.Frames, c.Profile.Frames...)
// The same goes for chunk samples stack IDs
for j, sample := range c.Profile.Samples {
c.Profile.Samples[j].StackID = sample.StackID + len(chunk.Profile.Stacks)
}
chunk.Profile.Stacks = append(chunk.Profile.Stacks, c.Profile.Stacks...)
chunk.Profile.Samples = append(chunk.Profile.Samples, c.Profile.Samples...)

// Update threadMetadata
for k, threadMetadata := range c.Profile.ThreadMetadata {
if _, ok := chunk.Profile.ThreadMetadata[k]; !ok {
chunk.Profile.ThreadMetadata[k] = threadMetadata
}
}

// In case we have measurements, merge them too
if len(c.Measurements) > 0 {
var chunkMeasurements map[string]measurements.MeasurementV2
err := json.Unmarshal(c.Measurements, &chunkMeasurements)
if err != nil {
return Chunk{}, err
}
for k, measurement := range chunkMeasurements {
if el, ok := mergedMeasurement[k]; ok {
el.Values = append(el.Values, measurement.Values...)
mergedMeasurement[k] = el
} else {
mergedMeasurement[k] = measurement
}
}
}
}

if len(mergedMeasurement) > 0 {
jsonRawMesaurement, err := json.Marshal(mergedMeasurement)
if err != nil {
return Chunk{}, err
}
chunk.Measurements = jsonRawMesaurement
}

return chunk, nil
}
Loading

0 comments on commit 7851ed3

Please sign in to comment.