Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Rename ingestor.WriteSeries -> ingestor.PopulateOrCreateSeries
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesGuthrie committed Aug 29, 2022
1 parent 6412ef4 commit ac265a8
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 9 deletions.
4 changes: 2 additions & 2 deletions pkg/pgmodel/ingestor/copier.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func persistBatch(ctx context.Context, conn pgxconn.PgxConn, sw *seriesWriter, e
ctx, span := tracer.Default().Start(ctx, "persist-batch")
defer span.End()
batch := copyBatch(insertBatch)
err := sw.WriteSeries(ctx, batch)
err := sw.PopulateOrCreateSeries(ctx, batch)
if err != nil {
return fmt.Errorf("copier: writing series: %w", err)
}
Expand Down Expand Up @@ -366,7 +366,7 @@ func insertSeries(ctx context.Context, conn pgxconn.PgxConn, onConflict bool, re
req := &reqs[r]
// Since seriesId order is not guaranteed we need to sort it to avoid row deadlock when duplicates are sent (eg. Prometheus retry)
// Samples inside series should be sorted by Prometheus
// We sort after WriteSeries call because we now have guarantees that all seriesIDs have been populated
// We sort after PopulateOrCreateSeries call because we now have guarantees that all seriesIDs have been populated
sort.Sort(&req.data.batch)
numSamples, numExemplars := req.data.batch.Count()
metrics.IngestorRowsPerInsert.With(labelsCopier).Observe(float64(numSamples + numExemplars))
Expand Down
8 changes: 4 additions & 4 deletions pkg/pgmodel/ingestor/ingestor_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func TestPGXInserterInsertSeries(t *testing.T) {
lsi = append(lsi, model.NewPromExemplars(ls, nil))
}

err := sw.WriteSeries(context.Background(), sVisitor(lsi))
err := sw.PopulateOrCreateSeries(context.Background(), sVisitor(lsi))
if err != nil {
foundErr := false
for _, q := range c.sqlQueries {
Expand Down Expand Up @@ -455,7 +455,7 @@ func TestPGXInserterCacheReset(t *testing.T) {
}

samples := makeSamples(series)
err := sw.WriteSeries(context.Background(), sVisitor(samples))
err := sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples))
if err != nil {
t.Fatal(err)
}
Expand All @@ -481,7 +481,7 @@ func TestPGXInserterCacheReset(t *testing.T) {
require.NoError(t, err)

samples = makeSamples(series)
err = sw.WriteSeries(context.Background(), sVisitor(samples))
err = sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples))
if err != nil {
t.Fatal(err)
}
Expand All @@ -503,7 +503,7 @@ func TestPGXInserterCacheReset(t *testing.T) {

// retrying rechecks the DB and uses the new IDs
samples = makeSamples(series)
err = sw.WriteSeries(context.Background(), sVisitor(samples))
err = sw.PopulateOrCreateSeries(context.Background(), sVisitor(samples))
if err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/pgmodel/ingestor/series_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ type perMetricInfo struct {
metricInfo *pgmodel.MetricInfo
}

// Set all seriesIds for a samples, fetching any missing ones from the DB,
// and repopulating the cache accordingly.
func (h *seriesWriter) WriteSeries(ctx context.Context, sv SeriesVisitor) error {
// PopulateOrCreateSeries examines all series in SeriesVisitor, checking if the labels or
// series are present in the inverted label or series caches. If not present,
// it creates them in the database (if they have not been created), or fetches
// their IDs if already created populating the respective caches.
func (h *seriesWriter) PopulateOrCreateSeries(ctx context.Context, sv SeriesVisitor) error {
ctx, span := tracer.Default().Start(ctx, "write-series")
defer span.End()
infos := make(map[string]*perMetricInfo)
Expand Down

0 comments on commit ac265a8

Please sign in to comment.