Skip to content

Commit

Permalink
feat: improve trace id lookup by defining a time range (#3874)
Browse files Browse the repository at this point in the history
* implement new method, added tests

* client tests

* use QueryTraceWithRange in Tempo Vulture

* fix linting

* adding PR link to changelog
  • Loading branch information
javiermolinar authored Jul 17, 2024
1 parent dc638ed commit cb4b1fc
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
* [ENHANCEMENT] Added a Tempo CLI command to drop traces by id by rewriting blocks. [#3856](https://github.com/grafana/tempo/pull/3856) (@joe-elliott)
* [ENHANCEMENT] Mixin, make recording rule range interval configurable and increase range interval in alert to support scrape interval of 1 minute [#3851](https://github.com/grafana/tempo/pull/3851) (@jmichalek132)
* [ENHANCEMENT] Add vParquet4 support to the tempo-cli analyse blocks command [#3868](https://github.com/grafana/tempo/pull/3868) (@stoewer)
* [ENHANCEMENT] Improve trace id lookup from Tempo Vulture by selecting a date range [#3874](https://github.com/grafana/tempo/pull/3874) (@javiermolinar)
* [BUGFIX] Fix panic in certain metrics queries using `rate()` with `by` [#3847](https://github.com/grafana/tempo/pull/3847) (@stoewer)
* [BUGFIX] Fix metrics queries when grouping by attributes that may not exist [#3734](https://github.com/grafana/tempo/pull/3734) (@mdisibio)
* [BUGFIX] Fix frontend parsing error on cached responses [#3759](https://github.com/grafana/tempo/pull/3759) (@mdisibio)
Expand Down
17 changes: 10 additions & 7 deletions cmd/tempo-vulture/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func queueFutureBatches(client util.JaegerClient, info *util.TraceInfo, config v
}()
}

func doRead(httpClient httpclient.HTTPClient, tickerRead *time.Ticker, startTime time.Time, interval time.Duration, r *rand.Rand, config vultureConfiguration, l *zap.Logger) {
func doRead(httpClient httpclient.TempoHTTPClient, tickerRead *time.Ticker, startTime time.Time, interval time.Duration, r *rand.Rand, config vultureConfiguration, l *zap.Logger) {
if tickerRead != nil {
go func() {
for now := range tickerRead.C {
Expand Down Expand Up @@ -269,7 +269,7 @@ func doRead(httpClient httpclient.HTTPClient, tickerRead *time.Ticker, startTime
}
}

func doSearch(httpClient httpclient.HTTPClient, tickerSearch *time.Ticker, startTime time.Time, interval time.Duration, r *rand.Rand, config vultureConfiguration, l *zap.Logger) {
func doSearch(httpClient httpclient.TempoHTTPClient, tickerSearch *time.Ticker, startTime time.Time, interval time.Duration, r *rand.Rand, config vultureConfiguration, l *zap.Logger) {
if tickerSearch != nil {
go func() {
for now := range tickerSearch.C {
Expand Down Expand Up @@ -394,7 +394,7 @@ func traceInTraces(traceID string, traces []*tempopb.TraceSearchMetadata) bool {
return false
}

func searchTag(client httpclient.HTTPClient, seed time.Time, config vultureConfiguration, l *zap.Logger) (traceMetrics, error) {
func searchTag(client httpclient.TempoHTTPClient, seed time.Time, config vultureConfiguration, l *zap.Logger) (traceMetrics, error) {
tm := traceMetrics{
requested: 1,
}
Expand Down Expand Up @@ -443,7 +443,7 @@ func searchTag(client httpclient.HTTPClient, seed time.Time, config vultureConfi
return tm, nil
}

func searchTraceql(client httpclient.HTTPClient, seed time.Time, config vultureConfiguration, l *zap.Logger) (traceMetrics, error) {
func searchTraceql(client httpclient.TempoHTTPClient, seed time.Time, config vultureConfiguration, l *zap.Logger) (traceMetrics, error) {
tm := traceMetrics{
requested: 1,
}
Expand Down Expand Up @@ -490,21 +490,24 @@ func searchTraceql(client httpclient.HTTPClient, seed time.Time, config vultureC
return tm, nil
}

func queryTrace(client httpclient.HTTPClient, info *util.TraceInfo, l *zap.Logger) (traceMetrics, error) {
func queryTrace(client httpclient.TempoHTTPClient, info *util.TraceInfo, l *zap.Logger) (traceMetrics, error) {
tm := traceMetrics{
requested: 1,
}

hexID := info.HexID()
start := info.Timestamp().Add(-30 * time.Minute).Unix()
end := info.Timestamp().Add(30 * time.Minute).Unix()

logger := l.With(
zap.Int64("seed", info.Timestamp().Unix()),
zap.String("hexID", hexID),
zap.Duration("ago", time.Since(info.Timestamp())),
)
logger.Info("querying Tempo")
logger.Info("querying Tempo trace")

trace, err := client.QueryTrace(hexID)
// We want to define a time range to reduce the number of lookups
trace, err := client.QueryTraceWithRange(hexID, start, end)
if err != nil {
if errors.Is(err, util.ErrTraceNotFound) {
tm.notFoundByID++
Expand Down
11 changes: 11 additions & 0 deletions cmd/tempo-vulture/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ func (m *MockHTTPClient) QueryTrace(id string) (*tempopb.Trace, error) {
return m.traceResp, m.err
}

//nolint:all
func (m *MockHTTPClient) QueryTraceWithRange(id string, start int64, end int64) (*tempopb.Trace, error) {
if m.err != nil {
return nil, m.err
}
m.m.Lock()
defer m.m.Unlock()
m.requestsCount++
return m.traceResp, m.err
}

func (m *MockHTTPClient) GetRequestsCount() int {
m.m.Lock()
defer m.m.Unlock()
Expand Down
38 changes: 35 additions & 3 deletions pkg/httpclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
applicationJSON = "application/json"
)

type HTTPClient interface {
type TempoHTTPClient interface {
WithTransport(t http.RoundTripper)
Do(req *http.Request) (*http.Response, error)
SearchTags() (*tempopb.SearchTagsResponse, error)
Expand All @@ -46,6 +46,7 @@ type HTTPClient interface {
Search(tags string) (*tempopb.SearchResponse, error)
SearchWithRange(tags string, start int64, end int64) (*tempopb.SearchResponse, error)
QueryTrace(id string) (*tempopb.Trace, error)
QueryTraceWithRange(id string, start int64, end int64) (*tempopb.Trace, error)
SearchTraceQL(query string) (*tempopb.SearchResponse, error)
SearchTraceQLWithRange(query string, start int64, end int64) (*tempopb.SearchResponse, error)
MetricsSummary(query string, groupBy string, start int64, end int64) (*tempopb.SpanMetricsSummaryResponse, error)
Expand Down Expand Up @@ -112,8 +113,7 @@ func (c *Client) getFor(url string, m proto.Message) (*http.Response, error) {
if err = jsonpb.UnmarshalString(string(body), m); err != nil {
return resp, fmt.Errorf("error decoding %T json, err: %v body: %s", m, err, string(body))
}
case applicationProtobuf:

default:
if err = proto.Unmarshal(body, m); err != nil {
return nil, fmt.Errorf("error decoding %T proto, err: %w body: %s", m, err, string(body))
}
Expand Down Expand Up @@ -257,6 +257,26 @@ func (c *Client) QueryTrace(id string) (*tempopb.Trace, error) {
return m, nil
}

func (c *Client) QueryTraceWithRange(id string, start int64, end int64) (*tempopb.Trace, error) {
m := &tempopb.Trace{}
if start > end {
return nil, errors.New("start time can not be greater than end time")
}
url := c.getURLWithQueryParams(QueryTraceEndpoint+"/"+id, map[string]string{
"start": strconv.FormatInt(start, 10),
"end": strconv.FormatInt(end, 10),
})
resp, err := c.getFor(url, m)
if err != nil {
if resp != nil && resp.StatusCode == http.StatusNotFound {
return nil, util.ErrTraceNotFound
}
return nil, err
}

return m, nil
}

func (c *Client) SearchTraceQL(query string) (*tempopb.SearchResponse, error) {
m := &tempopb.SearchResponse{}
_, err := c.getFor(c.buildSearchQueryURL("q", query, 0, 0), m)
Expand Down Expand Up @@ -418,3 +438,15 @@ func (c *Client) DeleteOverrides(version string) error {
_, _, err = c.doRequest(req)
return err
}

func (c *Client) getURLWithQueryParams(endpoint string, queryParams map[string]string) string {
joinURL, _ := url.Parse(c.BaseURL + endpoint + "?")
q := joinURL.Query()

for k, v := range queryParams {
q.Set(k, v)
}
joinURL.RawQuery = q.Encode()

return fmt.Sprint(joinURL)
}
103 changes: 103 additions & 0 deletions pkg/httpclient/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package httpclient

import (
"bytes"
"fmt"
"io"
"net/http"
"testing"

"github.com/gogo/protobuf/proto"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/stretchr/testify/assert"
)

type MockRoundTripper func(r *http.Request) *http.Response

func (f MockRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
return f(r), nil
}

func TestQueryTrace(t *testing.T) {
trace := &tempopb.Trace{}
t.Run("returns a trace when is found", func(t *testing.T) {
mockTransport := MockRoundTripper(func(req *http.Request) *http.Response {
assert.Equal(t, "www.tempo.com/api/traces/100", req.URL.Path)
assert.Equal(t, "application/protobuf", req.Header.Get("Accept"))
response, _ := proto.Marshal(trace)
return &http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader(response)),
}
})

client := New("www.tempo.com", "1000")
client.WithTransport(mockTransport)
response, err := client.QueryTrace("100")

assert.NoError(t, err)
assert.True(t, proto.Equal(trace, response))
})

t.Run("returns a trace not found error on 404", func(t *testing.T) {
mockTransport := MockRoundTripper(func(_ *http.Request) *http.Response {
return &http.Response{
StatusCode: 404,
Body: nil,
}
})

client := New("www.tempo.com", "1000")
client.WithTransport(mockTransport)
response, err := client.QueryTrace("notfound")

assert.Error(t, err)
assert.Nil(t, response)
})
}

func TestQueryTraceWithRance(t *testing.T) {
trace := &tempopb.Trace{}
t.Run("returns an error if start time is greater than end time", func(t *testing.T) {
client := New("www.tempo.com", "1000")
response, err := client.QueryTraceWithRange("notfound", 3000, 2000)

assert.Error(t, err)
assert.Nil(t, response)
})

t.Run("returns a trace with range when is found", func(t *testing.T) {
mockTransport := MockRoundTripper(func(req *http.Request) *http.Response {
assert.Equal(t, "www.tempo.com/api/traces/100?end=2000&start=1000", fmt.Sprint(req.URL))
assert.Equal(t, "application/protobuf", req.Header.Get("Accept"))
response, _ := proto.Marshal(trace)
return &http.Response{
StatusCode: 200,
Body: io.NopCloser(bytes.NewReader(response)),
}
})

client := New("www.tempo.com", "1000")
client.WithTransport(mockTransport)
response, err := client.QueryTraceWithRange("100", 1000, 2000)

assert.NoError(t, err)
assert.True(t, proto.Equal(trace, response))
})

t.Run("returns a trace with range not found error on 404", func(t *testing.T) {
mockTransport := MockRoundTripper(func(_ *http.Request) *http.Response {
return &http.Response{
StatusCode: 404,
Body: nil,
}
})

client := New("www.tempo.com", "1000")
client.WithTransport(mockTransport)
response, err := client.QueryTraceWithRange("notfound", 1000, 2000)

assert.Error(t, err)
assert.Nil(t, response)
})
}

0 comments on commit cb4b1fc

Please sign in to comment.