Skip to content

Commit

Permalink
Sidecar: use prometheus metrics for min timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaHoffmann committed Oct 13, 2024
1 parent 832d17a commit 51c58f7
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 15 deletions.
43 changes: 28 additions & 15 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ func runSidecar(
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
defer iterCancel()

if err := m.UpdateTimestamps(iterCtx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch timestamps. Is Prometheus running? Retrying",
"err", err,
)
return err
}

if err := m.UpdateLabels(iterCtx); err != nil {
level.Warn(logger).Log(
"msg", "failed to fetch initial external labels. Is Prometheus running? Retrying",
Expand Down Expand Up @@ -266,16 +274,21 @@ func runSidecar(
return runutil.Repeat(conf.prometheus.getConfigInterval, ctx.Done(), func() error {
iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout)
defer iterCancel()
if err := m.UpdateTimestamps(iterCtx); err != nil {
level.Warn(logger).Log("msg", "updating timestamps failed", "err", err)
promUp.Set(0)
statusProber.NotReady(err)
return nil
}

if err := m.UpdateLabels(iterCtx); err != nil {
level.Warn(logger).Log("msg", "heartbeat failed", "err", err)
level.Warn(logger).Log("msg", "updating labels failed", "err", err)
promUp.Set(0)
statusProber.NotReady(err)
} else {
promUp.Set(1)
statusProber.Ready()
return nil
}

promUp.Set(1)
statusProber.Ready()
return nil
})
}, func(error) {
Expand Down Expand Up @@ -317,7 +330,7 @@ func runSidecar(
}),
info.WithStoreInfoFunc(func() (*infopb.StoreInfo, error) {
if httpProbe.IsReady() {
mint, maxt := promStore.Timestamps()
mint, maxt := m.Timestamps()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
Expand Down Expand Up @@ -409,13 +422,6 @@ func runSidecar(
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}

minTime, _, err := s.Timestamps()
if err != nil {
level.Warn(logger).Log("msg", "reading timestamps failed", "err", err)
return nil
}
m.UpdateTimestamps(minTime, math.MaxInt64)
return nil
})
}, func(error) {
Expand Down Expand Up @@ -490,16 +496,23 @@ func (s *promMetadata) UpdateLabels(ctx context.Context) error {
return nil
}

func (s *promMetadata) UpdateTimestamps(mint, maxt int64) {
func (s *promMetadata) UpdateTimestamps(ctx context.Context) error {
s.mtx.Lock()
defer s.mtx.Unlock()

mint, err := s.client.LowestTimestamp(ctx, s.promURL)
if err != nil {
return err
}

if mint < s.limitMinTime.PrometheusTimestamp() {
mint = s.limitMinTime.PrometheusTimestamp()
}

s.mint = mint
s.maxt = maxt
s.maxt = math.MaxInt64

return nil
}

func (s *promMetadata) Labels() labels.Labels {
Expand Down
45 changes: 45 additions & 0 deletions pkg/promclient/promclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"net/url"
"os"
Expand All @@ -24,6 +25,7 @@ import (
"github.com/go-kit/log/level"
"github.com/gogo/status"
"github.com/pkg/errors"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -687,6 +689,49 @@ func (c *Client) BuildVersion(ctx context.Context, base *url.URL) (string, error
return b.Data.Version, nil
}

// LowestTimestamp returns the lowest timestamp in the TSDB by parsing the /metrics endpoint
// and extracting the prometheus_tsdb_lowest_timestamp_seconds metric from it.
func (c *Client) LowestTimestamp(ctx context.Context, base *url.URL) (int64, error) {
u := *base
u.Path = path.Join(u.Path, "/metrics")

level.Debug(c.logger).Log("msg", "lowest timestamp", "url", u.String())

req, err := http.NewRequest(http.MethodGet, u.String(), nil)
if err != nil {
return 0, errors.Wrap(err, "create request")
}

span, ctx := tracing.StartSpan(ctx, "/lowest_timestamp HTTP[client]")
defer span.Finish()

resp, err := c.Do(req.WithContext(ctx))
if err != nil {
return 0, errors.Wrapf(err, "request metric against %s", u.String())
}
defer runutil.ExhaustCloseWithLogOnErr(c.logger, resp.Body, "request body")

var parser expfmt.TextParser
families, err := parser.TextToMetricFamilies(resp.Body)
if err != nil {
return 0, errors.Wrapf(err, "parsing metics families against %s", u.String())

Check failure on line 717 in pkg/promclient/promclient.go

View workflow job for this annotation

GitHub Actions / Check misspelled words

metics ==> metrics
}
mf, ok := families["prometheus_tsdb_lowest_timestamp_seconds"]
if !ok {
return 0, errors.Wrapf(err, "metics families did not contain 'prometheus_tsdb_lowest_timestamp_seconds'")

Check failure on line 721 in pkg/promclient/promclient.go

View workflow job for this annotation

GitHub Actions / Check misspelled words

metics ==> metrics
}
val := 1000 * mf.GetMetric()[0].GetGauge().GetValue()

// in the case that we dont have cut a block yet, TSDB lowest timestamp is
// math.MaxInt64 but its represented as float and truncated so we need to
// do this weird comparison.
// Since we use this for fan-out pruning we use min timestamp here to include this prometheus.
if val == float64(math.MaxInt64) {
return math.MinInt64, nil
}
return int64(val), nil
}

func formatTime(t time.Time) string {
return strconv.FormatFloat(float64(t.Unix())+float64(t.Nanosecond())/1e9, 'f', -1, 64)
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/promclient/promclient_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package promclient
import (
"context"
"fmt"
"math"
"net/url"
"os"
"path"
Expand Down Expand Up @@ -83,6 +84,20 @@ func TestConfiguredFlags_e2e(t *testing.T) {
})
}

func TestLowestTimestamp_e2e(t *testing.T) {
e2eutil.ForeachPrometheus(t, func(t testing.TB, p *e2eutil.Prometheus) {
testutil.Ok(t, p.Start(context.Background(), log.NewNopLogger()))

u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr()))
testutil.Ok(t, err)

ts, err := NewDefaultClient().LowestTimestamp(context.Background(), u)
testutil.Ok(t, err)

testutil.Equals(t, math.MinInt64, int(ts))
})
}

func TestSnapshot_e2e(t *testing.T) {
e2eutil.ForeachPrometheus(t, func(t testing.TB, p *e2eutil.Prometheus) {
now := time.Now()
Expand Down

0 comments on commit 51c58f7

Please sign in to comment.