Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka): Enable querier to optionally query partition ingesters #14418

Merged
merged 7 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ run:
- cgo
- promtail_journal_enabled
- integration

# output configuration options
output:
formats:
Expand Down
5 changes: 5 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -4228,6 +4228,11 @@ engine:
# When true, querier limits sent via a header are enforced.
# CLI flag: -querier.per-request-limits-enabled
[per_request_limits_enabled: <boolean> | default = false]

# When true, querier directs ingester queries to the partition-ingesters instead
# of the normal ingesters.
# CLI flag: -querier.query_partition_ingesters
[query_partition_ingesters: <boolean> | default = false]
```

### query_range
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ require (
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.3
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2
github.com/grafana/dskit v0.0.0-20241004175247-687ec485facf
github.com/grafana/dskit v0.0.0-20241007172036-53283a0f6b41
github.com/grafana/go-gelf/v2 v2.0.1
github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1044,8 +1044,8 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I=
github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw=
github.com/grafana/dskit v0.0.0-20241004175247-687ec485facf h1:ZafqZwIpdCCMifH9Ok6C98rYaCh5OZeyyHLbU0FPedg=
github.com/grafana/dskit v0.0.0-20241004175247-687ec485facf/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ=
github.com/grafana/dskit v0.0.0-20241007172036-53283a0f6b41 h1:a4O59OU3FJZ+EJUVnlvvNTvdAc4uRN1P6EaGwqL9CnA=
github.com/grafana/dskit v0.0.0-20241007172036-53283a0f6b41/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ=
github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak=
github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=
Expand Down
2 changes: 1 addition & 1 deletion pkg/loki/loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func (t *Loki) setupModuleManager() error {
PatternRingClient: {Server, MemberlistKV, Analytics},
PatternIngesterTee: {Server, MemberlistKV, Analytics, PatternRingClient},
PatternIngester: {Server, MemberlistKV, Analytics, PatternRingClient, PatternIngesterTee},
IngesterQuerier: {Ring},
IngesterQuerier: {Ring, PartitionRing, Overrides},
QuerySchedulerRing: {Overrides, MemberlistKV},
IndexGatewayRing: {Overrides, MemberlistKV},
PartitionRing: {MemberlistKV, Server, Ring},
Expand Down
7 changes: 4 additions & 3 deletions pkg/loki/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,8 +975,9 @@ func (t *Loki) setupAsyncStore() error {
}

func (t *Loki) initIngesterQuerier() (_ services.Service, err error) {
logger := log.With(util_log.Logger, "component", "querier")
t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.IngesterClient, t.ring, t.Cfg.Querier.ExtraQueryDelay, t.Cfg.MetricsNamespace, logger)
logger := log.With(util_log.Logger, "component", "ingester-querier")

t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.Querier, t.Cfg.IngesterClient, t.ring, t.partitionRing, t.Overrides.IngestionPartitionsTenantShardSize, t.Cfg.MetricsNamespace, logger)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1756,7 +1757,7 @@ func (t *Loki) initAnalytics() (services.Service, error) {

// The Ingest Partition Ring is responsible for watching the available ingesters and assigning partitions to incoming requests.
func (t *Loki) initPartitionRing() (services.Service, error) {
if !t.Cfg.Ingester.KafkaIngestion.Enabled {
if !t.Cfg.Ingester.KafkaIngestion.Enabled && !t.Cfg.Querier.QueryPartitionIngesters {
return nil, nil
}

Expand Down
75 changes: 56 additions & 19 deletions pkg/querier/ingester_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/dskit/user"
"golang.org/x/exp/slices"

"github.com/grafana/loki/v3/pkg/storage/stores/index/seriesvolume"
Expand Down Expand Up @@ -40,28 +42,32 @@ type responseFromIngesters struct {

// IngesterQuerier helps with querying the ingesters.
type IngesterQuerier struct {
ring ring.ReadRing
pool *ring_client.Pool
extraQueryDelay time.Duration
logger log.Logger
querierConfig Config
ring ring.ReadRing
partitionRing *ring.PartitionInstanceRing
getShardCountForTenant func(string) int
pool *ring_client.Pool
logger log.Logger
}

func NewIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) {
func NewIngesterQuerier(querierConfig Config, clientCfg client.Config, ring ring.ReadRing, partitionRing *ring.PartitionInstanceRing, getShardCountForTenant func(string) int, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) {
factory := func(addr string) (ring_client.PoolClient, error) {
return client.New(clientCfg, addr)
}

return newIngesterQuerier(clientCfg, ring, extraQueryDelay, ring_client.PoolAddrFunc(factory), metricsNamespace, logger)
return newIngesterQuerier(querierConfig, clientCfg, ring, partitionRing, getShardCountForTenant, ring_client.PoolAddrFunc(factory), metricsNamespace, logger)
}

// newIngesterQuerier creates a new IngesterQuerier and allows to pass a custom ingester client factory
// used for testing purposes
func newIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, clientFactory ring_client.PoolFactory, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) {
func newIngesterQuerier(querierConfig Config, clientCfg client.Config, ring ring.ReadRing, partitionRing *ring.PartitionInstanceRing, getShardCountForTenant func(string) int, clientFactory ring_client.PoolFactory, metricsNamespace string, logger log.Logger) (*IngesterQuerier, error) {
iq := IngesterQuerier{
ring: ring,
pool: clientpool.NewPool("ingester", clientCfg.PoolConfig, ring, clientFactory, util_log.Logger, metricsNamespace),
extraQueryDelay: extraQueryDelay,
logger: logger,
querierConfig: querierConfig,
ring: ring,
partitionRing: partitionRing,
getShardCountForTenant: getShardCountForTenant, // limits?
pool: clientpool.NewPool("ingester", clientCfg.PoolConfig, ring, clientFactory, util_log.Logger, metricsNamespace),
logger: logger,
}

err := services.StartAndAwaitRunning(context.Background(), iq.pool)
Expand All @@ -73,22 +79,53 @@ func newIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryD
}

// forAllIngesters runs f, in parallel, for all ingesters
// TODO taken from Cortex, see if we can refactor out an usable interface.
func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
if q.querierConfig.QueryPartitionIngesters {
tenantID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}
tenantShards := q.getShardCountForTenant(tenantID)
subring, err := q.partitionRing.ShuffleShardWithLookback(tenantID, tenantShards, q.querierConfig.QueryIngestersWithin, time.Now())
if err != nil {
return nil, err
}
replicationSets, err := subring.GetReplicationSetsForOperation(ring.Read)
benclive marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
return q.forGivenIngesterSets(ctx, replicationSets, f)
}

replicationSet, err := q.ring.GetReplicationSetForOperation(ring.Read)
if err != nil {
return nil, err
}

return q.forGivenIngesters(ctx, replicationSet, f)
return q.forGivenIngesters(ctx, replicationSet, defaultQuorumConfig(), f)
}

// forGivenIngesters runs f, in parallel, for given ingesters
func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet ring.ReplicationSet, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
cfg := ring.DoUntilQuorumConfig{
// forGivenIngesterSets runs f, in parallel, for given ingester sets
func (q *IngesterQuerier) forGivenIngesterSets(ctx context.Context, replicationSet []ring.ReplicationSet, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
// Enable minimize requests so we initially query a single ingester per replication set, as each replication-set is one partition.
// Ingesters must supply zone information for this to have an effect.
config := ring.DoUntilQuorumConfig{
MinimizeRequests: true,
}
return concurrency.ForEachJobMergeResults[ring.ReplicationSet, responseFromIngesters](ctx, replicationSet, 0, func(ctx context.Context, set ring.ReplicationSet) ([]responseFromIngesters, error) {
return q.forGivenIngesters(ctx, set, config, f)
})
}

func defaultQuorumConfig() ring.DoUntilQuorumConfig {
return ring.DoUntilQuorumConfig{
// Nothing here
}
results, err := ring.DoUntilQuorum(ctx, replicationSet, cfg, func(ctx context.Context, ingester *ring.InstanceDesc) (responseFromIngesters, error) {
}

// forGivenIngesters runs f, in parallel, for given ingesters
func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet ring.ReplicationSet, quorumConfig ring.DoUntilQuorumConfig, f func(context.Context, logproto.QuerierClient) (interface{}, error)) ([]responseFromIngesters, error) {
results, err := ring.DoUntilQuorum(ctx, replicationSet, quorumConfig, func(ctx context.Context, ingester *ring.InstanceDesc) (responseFromIngesters, error) {
client, err := q.pool.GetClientFor(ingester.Addr)
if err != nil {
return responseFromIngesters{addr: ingester.Addr}, err
Expand Down Expand Up @@ -212,7 +249,7 @@ func (q *IngesterQuerier) TailDisconnectedIngesters(ctx context.Context, req *lo
}

// Instance a tail client for each ingester to re(connect)
reconnectClients, err := q.forGivenIngesters(ctx, ring.ReplicationSet{Instances: reconnectIngesters}, func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
reconnectClients, err := q.forGivenIngesters(ctx, ring.ReplicationSet{Instances: reconnectIngesters}, defaultQuorumConfig(), func(_ context.Context, client logproto.QuerierClient) (interface{}, error) {
return client.Tail(ctx, req)
})
if err != nil {
Expand Down Expand Up @@ -260,7 +297,7 @@ func (q *IngesterQuerier) TailersCount(ctx context.Context) ([]uint32, error) {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "no active ingester found")
}

responses, err := q.forGivenIngesters(ctx, replicationSet, func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
responses, err := q.forGivenIngesters(ctx, replicationSet, defaultQuorumConfig(), func(ctx context.Context, querierClient logproto.QuerierClient) (interface{}, error) {
resp, err := querierClient.TailersCount(ctx, &logproto.TailersCountRequest{})
if err != nil {
return nil, err
Expand Down
141 changes: 140 additions & 1 deletion pkg/querier/ingester_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/user"
"go.uber.org/atomic"

"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -224,6 +225,129 @@ func TestIngesterQuerier_earlyExitOnQuorum(t *testing.T) {
}
}

func TestIngesterQuerierFetchesResponsesFromPartitionIngesters(t *testing.T) {
t.Parallel()
ctx := user.InjectOrgID(context.Background(), "test-user")
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

ingesters := []ring.InstanceDesc{
mockInstanceDescWithZone("1.1.1.1", ring.ACTIVE, "A"),
mockInstanceDescWithZone("2.2.2.2", ring.ACTIVE, "B"),
mockInstanceDescWithZone("3.3.3.3", ring.ACTIVE, "A"),
mockInstanceDescWithZone("4.4.4.4", ring.ACTIVE, "B"),
mockInstanceDescWithZone("5.5.5.5", ring.ACTIVE, "A"),
mockInstanceDescWithZone("6.6.6.6", ring.ACTIVE, "B"),
}

tests := map[string]struct {
method string
testFn func(*IngesterQuerier) error
retVal interface{}
shards int
}{
"label": {
method: "Label",
testFn: func(ingesterQuerier *IngesterQuerier) error {
_, err := ingesterQuerier.Label(ctx, nil)
return err
},
retVal: new(logproto.LabelResponse),
},
"series": {
method: "Series",
testFn: func(ingesterQuerier *IngesterQuerier) error {
_, err := ingesterQuerier.Series(ctx, nil)
return err
},
retVal: new(logproto.SeriesResponse),
},
"get_chunk_ids": {
method: "GetChunkIDs",
testFn: func(ingesterQuerier *IngesterQuerier) error {
_, err := ingesterQuerier.GetChunkIDs(ctx, model.Time(0), model.Time(0))
return err
},
retVal: new(logproto.GetChunkIDsResponse),
},
"select_logs": {
method: "Query",
testFn: func(ingesterQuerier *IngesterQuerier) error {
_, err := ingesterQuerier.SelectLogs(ctx, logql.SelectLogParams{
QueryRequest: new(logproto.QueryRequest),
})
return err
},
retVal: newQueryClientMock(),
},
"select_sample": {
method: "QuerySample",
testFn: func(ingesterQuerier *IngesterQuerier) error {
_, err := ingesterQuerier.SelectSample(ctx, logql.SelectSampleParams{
SampleQueryRequest: new(logproto.SampleQueryRequest),
})
return err
},
retVal: newQuerySampleClientMock(),
},
"select_logs_shuffle_sharded": {
method: "Query",
testFn: func(ingesterQuerier *IngesterQuerier) error {
_, err := ingesterQuerier.SelectLogs(ctx, logql.SelectLogParams{
QueryRequest: new(logproto.QueryRequest),
})
return err
},
retVal: newQueryClientMock(),
shards: 2, // Must be less than number of partitions
},
}

for testName, testData := range tests {
cnt := atomic.NewInt32(0)

t.Run(testName, func(t *testing.T) {
cnt.Store(0)
runFn := func(args mock.Arguments) {
ctx := args[0].(context.Context)

select {
case <-ctx.Done():
// should not be cancelled by the tracker
require.NoError(t, ctx.Err())
default:
cnt.Add(1)
}
}

instanceRing := newReadRingMock(ingesters, 0)
ingesterClient := newQuerierClientMock()
ingesterClient.On(testData.method, mock.Anything, mock.Anything, mock.Anything).Return(testData.retVal, nil).Run(runFn)

partitions := 3
ingestersPerPartition := len(ingesters) / partitions
assert.Greaterf(t, ingestersPerPartition, 1, "must have more than one ingester per partition")

ingesterQuerier, err := newTestPartitionIngesterQuerier(ingesterClient, instanceRing, newPartitionInstanceRingMock(instanceRing, ingesters, partitions, ingestersPerPartition), testData.shards)
require.NoError(t, err)

ingesterQuerier.querierConfig.QueryPartitionIngesters = true

err = testData.testFn(ingesterQuerier)
require.NoError(t, err)

if testData.shards == 0 {
testData.shards = partitions
}
expectedCalls := min(testData.shards, partitions)
// Wait for responses: We expect one request per queried partition because we have request minimization enabled & ingesters are in multiple zones.
// If shuffle sharding is enabled, we expect one query per shard as we write to a subset of partitions.
require.Eventually(t, func() bool { return cnt.Load() >= int32(expectedCalls) }, time.Millisecond*100, time.Millisecond*1, "expected all ingesters to respond")
ingesterClient.AssertNumberOfCalls(t, testData.method, expectedCalls)
})
}
}

func TestQuerier_tailDisconnectedIngesters(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -400,9 +524,24 @@ func TestIngesterQuerier_DetectedLabels(t *testing.T) {

func newTestIngesterQuerier(readRingMock *readRingMock, ingesterClient *querierClientMock) (*IngesterQuerier, error) {
return newIngesterQuerier(
mockQuerierConfig(),
mockIngesterClientConfig(),
readRingMock,
mockQuerierConfig().ExtraQueryDelay,
nil,
func(string) int { return 0 },
newIngesterClientMockFactory(ingesterClient),
constants.Loki,
log.NewNopLogger(),
)
}

func newTestPartitionIngesterQuerier(ingesterClient *querierClientMock, instanceRing *readRingMock, partitionRing *ring.PartitionInstanceRing, tenantShards int) (*IngesterQuerier, error) {
return newIngesterQuerier(
mockQuerierConfig(),
mockIngesterClientConfig(),
instanceRing,
partitionRing,
func(string) int { return tenantShards },
newIngesterClientMockFactory(ingesterClient),
constants.Loki,
log.NewNopLogger(),
Expand Down
2 changes: 2 additions & 0 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type Config struct {
QueryIngesterOnly bool `yaml:"query_ingester_only"`
MultiTenantQueriesEnabled bool `yaml:"multi_tenant_queries_enabled"`
PerRequestLimitsEnabled bool `yaml:"per_request_limits_enabled"`
QueryPartitionIngesters bool `yaml:"query_partition_ingesters" category:"experimental"`
}

// RegisterFlags register flags.
Expand All @@ -85,6 +86,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.QueryIngesterOnly, "querier.query-ingester-only", false, "When true, queriers only query the ingesters, and not stored data. This is useful when the object store is unavailable.")
f.BoolVar(&cfg.MultiTenantQueriesEnabled, "querier.multi-tenant-queries-enabled", false, "When true, allow queries to span multiple tenants.")
f.BoolVar(&cfg.PerRequestLimitsEnabled, "querier.per-request-limits-enabled", false, "When true, querier limits sent via a header are enforced.")
f.BoolVar(&cfg.QueryPartitionIngesters, "querier.query_partition_ingesters", false, "When true, querier directs ingester queries to the partition-ingesters instead of the normal ingesters.")
}

// Validate validates the config.
Expand Down
Loading
Loading