Skip to content

Commit

Permalink
feat: Introduce wal segment read path. (#13695)
Browse files Browse the repository at this point in the history
Co-authored-by: Ben Clive <ben.clive@grafana.com>
  • Loading branch information
cyriltovena and benclive authored Aug 2, 2024
1 parent 7c50b43 commit 917053a
Show file tree
Hide file tree
Showing 17 changed files with 2,629 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pkg/ingester-rf1/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter
wal.ReportSegmentStats(stats, i.metrics.segmentMetrics)

id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader).String()
if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+id), buf); err != nil {
if err := i.store.PutObject(ctx, fmt.Sprintf(wal.Dir+id), buf); err != nil {
i.metrics.flushFailuresTotal.Inc()
return fmt.Errorf("failed to put object: %w", err)
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
metastoreclient "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/client"
"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/health"
"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
"github.com/grafana/loki/v3/pkg/ingester-rf1/objstore"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
Expand Down Expand Up @@ -415,7 +416,11 @@ func (t *Loki) initQuerier() (services.Service, error) {

if t.Cfg.QuerierRF1.Enabled {
logger.Log("Using RF-1 querier implementation")
t.Querier, err = querierrf1.New(t.Cfg.QuerierRF1, t.Store, t.Overrides, deleteStore, logger)
store, err := objstore.New(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics)
if err != nil {
return nil, err
}
t.Querier, err = querierrf1.New(t.Cfg.QuerierRF1, t.Store, t.Overrides, deleteStore, t.MetastoreClient, store, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1818,7 +1823,7 @@ func (t *Loki) initMetastore() (services.Service, error) {
return nil, nil
}
if t.Cfg.isTarget(All) {
t.Cfg.MetastoreClient.MetastoreAddress = fmt.Sprintf("localhost:%s", t.Cfg.Server.GRPCListenAddress)
t.Cfg.MetastoreClient.MetastoreAddress = fmt.Sprintf("localhost:%d", t.Cfg.Server.GRPCListenPort)
}
m, err := metastore.New(t.Cfg.Metastore, log.With(util_log.Logger, "component", "metastore"), prometheus.DefaultRegisterer, t.health)
if err != nil {
Expand Down
13 changes: 10 additions & 3 deletions pkg/querier-rf1/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/logqlmodel"
"github.com/grafana/loki/v3/pkg/querier"
"github.com/grafana/loki/v3/pkg/querier-rf1/wal"
querier_limits "github.com/grafana/loki/v3/pkg/querier/limits"
"github.com/grafana/loki/v3/pkg/querier/plan"
"github.com/grafana/loki/v3/pkg/storage"
Expand Down Expand Up @@ -97,19 +98,25 @@ type Rf1Querier struct {
deleteGetter deleteGetter
logger log.Logger
patternQuerier PatterQuerier
walQuerier logql.Querier
}

type deleteGetter interface {
GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]deletion.DeleteRequest, error)
}

// New makes a new Querier for RF1 work.
func New(cfg Config, store Store, limits Limits, d deleteGetter, logger log.Logger) (*Rf1Querier, error) {
func New(cfg Config, store Store, limits Limits, d deleteGetter, metastore wal.Metastore, b wal.BlockStorage, logger log.Logger) (*Rf1Querier, error) {
querier, err := wal.New(metastore, b)
if err != nil {
return nil, err
}
return &Rf1Querier{
cfg: cfg,
store: store,
limits: limits,
deleteGetter: d,
walQuerier: querier,
logger: logger,
}, nil
}
Expand All @@ -134,7 +141,7 @@ func (q *Rf1Querier) SelectLogs(ctx context.Context, params logql.SelectLogParam
"msg", "querying rf1 store",
"params", params)
}
storeIter, err := q.store.SelectLogs(ctx, params)
storeIter, err := q.walQuerier.SelectLogs(ctx, params)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -164,7 +171,7 @@ func (q *Rf1Querier) SelectSamples(ctx context.Context, params logql.SelectSampl
"msg", "querying rf1 store for samples",
"params", params)
}
storeIter, err := q.store.SelectSamples(ctx, params)
storeIter, err := q.walQuerier.SelectSamples(ctx, params)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 917053a

Please sign in to comment.