From c4d1de9c99b780122c715931b679480c0debef1e Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Sun, 13 Oct 2024 18:54:11 +0200 Subject: [PATCH] Sidecar: use prometheus metrics for min timestamp Read "minT" from prometheus metrics so that we also set it for sidecars that are not uploading blocks. Signed-off-by: Michael Hoffmann --- cmd/thanos/sidecar.go | 45 ++++++++++++++++----------- pkg/promclient/promclient.go | 44 ++++++++++++++++++++++++++ pkg/promclient/promclient_e2e_test.go | 15 +++++++++ 3 files changed, 86 insertions(+), 18 deletions(-) diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 014b9fc9df..78514ed102 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -235,6 +235,14 @@ func runSidecar( iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout) defer iterCancel() + if err := m.UpdateTimestamps(iterCtx); err != nil { + level.Warn(logger).Log( + "msg", "failed to fetch timestamps. Is Prometheus running? Retrying", + "err", err, + ) + return err + } + if err := m.UpdateLabels(iterCtx); err != nil { level.Warn(logger).Log( "msg", "failed to fetch initial external labels. Is Prometheus running? Retrying", @@ -266,16 +274,21 @@ func runSidecar( return runutil.Repeat(conf.prometheus.getConfigInterval, ctx.Done(), func() error { iterCtx, iterCancel := context.WithTimeout(context.Background(), conf.prometheus.getConfigTimeout) defer iterCancel() + if err := m.UpdateTimestamps(iterCtx); err != nil { + level.Warn(logger).Log("msg", "updating timestamps failed", "err", err) + promUp.Set(0) + statusProber.NotReady(err) + return nil + } if err := m.UpdateLabels(iterCtx); err != nil { - level.Warn(logger).Log("msg", "heartbeat failed", "err", err) + level.Warn(logger).Log("msg", "updating labels failed", "err", err) promUp.Set(0) statusProber.NotReady(err) - } else { - promUp.Set(1) - statusProber.Ready() + return nil } - + promUp.Set(1) + statusProber.Ready() return nil }) }, func(error) { @@ -317,7 +330,7 @@ func runSidecar( }), info.WithStoreInfoFunc(func() (*infopb.StoreInfo, error) { if httpProbe.IsReady() { - mint, maxt := promStore.Timestamps() + mint, maxt := m.Timestamps() return &infopb.StoreInfo{ MinTime: mint, MaxTime: maxt, @@ -409,13 +422,6 @@ func runSidecar( if uploaded, err := s.Sync(ctx); err != nil { level.Warn(logger).Log("err", err, "uploaded", uploaded) } - - minTime, _, err := s.Timestamps() - if err != nil { - level.Warn(logger).Log("msg", "reading timestamps failed", "err", err) - return nil - } - m.UpdateTimestamps(minTime, math.MaxInt64) return nil }) }, func(error) { @@ -490,16 +496,19 @@ func (s *promMetadata) UpdateLabels(ctx context.Context) error { return nil } -func (s *promMetadata) UpdateTimestamps(mint, maxt int64) { +func (s *promMetadata) UpdateTimestamps(ctx context.Context) error { s.mtx.Lock() defer s.mtx.Unlock() - if mint < s.limitMinTime.PrometheusTimestamp() { - mint = s.limitMinTime.PrometheusTimestamp() + mint, err := s.client.LowestTimestamp(ctx, s.promURL) + if err != nil { + return err } - s.mint = mint - s.maxt = maxt + s.mint = min(s.limitMinTime.PrometheusTimestamp(), mint) + s.maxt = math.MaxInt64 + + return nil } func (s *promMetadata) Labels() labels.Labels { diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index 6f124136b4..6cde007cd0 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -10,6 +10,7 @@ import ( "encoding/json" "fmt" "io" + "math" "net/http" "net/url" "os" @@ -24,6 +25,7 @@ import ( "github.com/go-kit/log/level" "github.com/gogo/status" "github.com/pkg/errors" + "github.com/prometheus/common/expfmt" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/labels" @@ -687,6 +689,48 @@ func (c *Client) BuildVersion(ctx context.Context, base *url.URL) (string, error return b.Data.Version, nil } +// LowestTimestamp returns the lowest timestamp in the TSDB by parsing the /metrics endpoint +// and extracting the prometheus_tsdb_lowest_timestamp_seconds metric from it. +func (c *Client) LowestTimestamp(ctx context.Context, base *url.URL) (int64, error) { + u := *base + u.Path = path.Join(u.Path, "/metrics") + + level.Debug(c.logger).Log("msg", "lowest timestamp", "url", u.String()) + + req, err := http.NewRequest(http.MethodGet, u.String(), nil) + if err != nil { + return 0, errors.Wrap(err, "create request") + } + + span, ctx := tracing.StartSpan(ctx, "/lowest_timestamp HTTP[client]") + defer span.Finish() + + resp, err := c.Do(req.WithContext(ctx)) + if err != nil { + return 0, errors.Wrapf(err, "request metric against %s", u.String()) + } + defer runutil.ExhaustCloseWithLogOnErr(c.logger, resp.Body, "request body") + + var parser expfmt.TextParser + families, err := parser.TextToMetricFamilies(resp.Body) + if err != nil { + return 0, errors.Wrapf(err, "parsing metric families against %s", u.String()) + } + mf, ok := families["prometheus_tsdb_lowest_timestamp_seconds"] + if !ok { + return 0, errors.Wrapf(err, "metric families did not contain 'prometheus_tsdb_lowest_timestamp_seconds'") + } + val := 1000 * mf.GetMetric()[0].GetGauge().GetValue() + + // in the case that we dont have cut a block yet, TSDB lowest timestamp is math.MaxInt64 + // but its represented as float and truncated so we need to do this weird comparison. + // Since we use this for fan-out pruning we use min timestamp here to include this prometheus. + if val == float64(math.MaxInt64) { + return math.MinInt64, nil + } + return int64(val), nil +} + func formatTime(t time.Time) string { return strconv.FormatFloat(float64(t.Unix())+float64(t.Nanosecond())/1e9, 'f', -1, 64) } diff --git a/pkg/promclient/promclient_e2e_test.go b/pkg/promclient/promclient_e2e_test.go index a253ce2485..409f1e5148 100644 --- a/pkg/promclient/promclient_e2e_test.go +++ b/pkg/promclient/promclient_e2e_test.go @@ -6,6 +6,7 @@ package promclient import ( "context" "fmt" + "math" "net/url" "os" "path" @@ -83,6 +84,20 @@ func TestConfiguredFlags_e2e(t *testing.T) { }) } +func TestLowestTimestamp_e2e(t *testing.T) { + e2eutil.ForeachPrometheus(t, func(t testing.TB, p *e2eutil.Prometheus) { + testutil.Ok(t, p.Start(context.Background(), log.NewNopLogger())) + + u, err := url.Parse(fmt.Sprintf("http://%s", p.Addr())) + testutil.Ok(t, err) + + ts, err := NewDefaultClient().LowestTimestamp(context.Background(), u) + testutil.Ok(t, err) + + testutil.Equals(t, math.MinInt64, int(ts)) + }) +} + func TestSnapshot_e2e(t *testing.T) { e2eutil.ForeachPrometheus(t, func(t testing.TB, p *e2eutil.Prometheus) { now := time.Now()