Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Do not check for ingestor in ingestAdapter.
Browse files Browse the repository at this point in the history
  • Loading branch information
Harkishen-Singh committed Jul 13, 2022
1 parent de42717 commit 3aa15d1
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 18 deletions.
36 changes: 24 additions & 12 deletions pkg/rules/adapters/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,17 @@ import (
var samplesIngested = metrics.IngestorItems.With(map[string]string{"type": "metric", "kind": "sample", "subsystem": "rules"})

type ingestAdapter struct {
ingestor *ingestor.DBIngestor
inserter ingestor.DBInserter
}

// NewIngestAdapter acts as an adapter to make Promscale's DBIngestor compatible with storage.Appendable
func NewIngestAdapter(inserter ingestor.DBInserter) (*ingestAdapter, error) {
dbIngestor, ok := inserter.(*ingestor.DBIngestor)
if !ok {
return nil, fmt.Errorf("unable to ingest: DBIngestor not found. Received %T", inserter)
}
return &ingestAdapter{dbIngestor}, nil
func NewIngestAdapter(inserter ingestor.DBInserter) *ingestAdapter {
return &ingestAdapter{inserter}
}

type appenderAdapter struct {
data map[string][]model.Insertable
ingestor *ingestor.DBIngestor
inserter ingestor.DBInserter
closed bool
}

Expand All @@ -52,15 +48,19 @@ type appenderAdapter struct {
func (a ingestAdapter) Appender(_ context.Context) storage.Appender {
return &appenderAdapter{
data: make(map[string][]model.Insertable),
ingestor: a.ingestor,
inserter: a.inserter,
}
}

func (app *appenderAdapter) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if err := app.shouldAppend(); err != nil {
return 0, err
}
series, metricName, err := app.ingestor.SeriesCache().GetSeriesFromProtos(util.LabelToPrompbLabels(l))
dbIngestor, err := getIngestor(app.inserter)
if err != nil {
return 0, fmt.Errorf("get ingestor: %w", err)
}
series, metricName, err := dbIngestor.SeriesCache().GetSeriesFromProtos(util.LabelToPrompbLabels(l))
if err != nil {
return 0, fmt.Errorf("get series from protos: %w", err)
}
Expand Down Expand Up @@ -92,7 +92,11 @@ func (app *appenderAdapter) Commit() error {
//
// An error might occur while ingesting samples, so Prometheus will call the app.Rollback(). Do note that we cannot
// rollback the ingested series, rather only ingested samples since they were the last step that created the error.
numInsertablesIngested, err := app.ingestor.Dispatcher().InsertTs(context.Background(), model.Data{Rows: app.data, ReceivedTime: time.Now()})
dbIngestor, err := getIngestor(app.inserter)
if err != nil {
return fmt.Errorf("get ingestor: %w", err)
}
numInsertablesIngested, err := dbIngestor.Dispatcher().InsertTs(context.Background(), model.Data{Rows: app.data, ReceivedTime: time.Now()})
if err == nil {
samplesIngested.Add(float64(numInsertablesIngested))
}
Expand All @@ -109,6 +113,14 @@ func (app *appenderAdapter) shouldAppend() error {
func (app *appenderAdapter) Rollback() error {
app.closed = true
app.data = map[string][]model.Insertable{}
app.ingestor = nil
app.inserter = nil
return nil
}

func getIngestor(inserter ingestor.DBInserter) (*ingestor.DBIngestor, error) {
dbIngestor, ok := inserter.(*ingestor.DBIngestor)
if !ok {
return nil, fmt.Errorf("unable to ingest: DBIngestor not found. Received %T", inserter)
}
return dbIngestor, nil
}
7 changes: 1 addition & 6 deletions pkg/rules/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,8 @@ func NewManager(ctx context.Context, r prometheus.Registerer, client *pgclient.C
return nil, nil, fmt.Errorf("parsing UI-URL: %w", err)
}

ingestAdapter, err := adapters.NewIngestAdapter(client.Inserter())
if err != nil {
return nil, nil, fmt.Errorf("error creating ingest adapter: %w", err)
}

rulesManager := prom_rules.NewManager(&prom_rules.ManagerOptions{
Appendable: ingestAdapter,
Appendable: adapters.NewIngestAdapter(client.Inserter()),
Queryable: adapters.NewQueryAdapter(client.Queryable()),
Context: ctx,
ExternalURL: parsedUrl,
Expand Down

0 comments on commit 3aa15d1

Please sign in to comment.