Skip to content

Commit

Permalink
Fix query filtering (#5951)
Browse files Browse the repository at this point in the history
- milli -> nano conversion in the querier
- add deletes to store requests from the ingester
  • Loading branch information
MasslessParticle committed Apr 20, 2022
1 parent f62b4ae commit 7299e3b
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 13 deletions.
2 changes: 2 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,7 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie
End: end,
Limit: req.Limit,
Shards: req.Shards,
Deletes: req.Deletes,
}}
storeItr, err := i.store.SelectLogs(ctx, storeReq)
if err != nil {
Expand Down Expand Up @@ -628,6 +629,7 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log
End: end,
Selector: req.Selector,
Shards: req.Shards,
Deletes: req.Deletes,
}}
storeItr, err := i.store.SelectSamples(ctx, storeReq)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,11 @@ func (q *SingleTenantQuerier) deletesForUser(ctx context.Context, startT, endT t

var deletes []*logproto.Delete
for _, del := range d {
if int64(del.StartTime) <= end && int64(del.EndTime) >= start {
if del.StartTime.UnixNano() <= end && del.EndTime.UnixNano() >= start {
deletes = append(deletes, &logproto.Delete{
Selector: del.Query,
Start: int64(del.StartTime),
End: int64(del.EndTime),
Start: del.StartTime.UnixNano(),
End: del.EndTime.UnixNano(),
})
}
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,8 +761,8 @@ func TestQuerier_SelectLogWithDeletes(t *testing.T) {
request := logproto.QueryRequest{
Selector: `{type="test"} |= "foo"`,
Limit: 10,
Start: time.Unix(0, 300),
End: time.Unix(0, 600),
Start: time.Unix(0, 300000000),
End: time.Unix(0, 600000000),
Direction: logproto.FORWARD,
}

Expand All @@ -776,9 +776,9 @@ func TestQuerier_SelectLogWithDeletes(t *testing.T) {
End: request.End,
Direction: request.Direction,
Deletes: []*logproto.Delete{
{Selector: "1", Start: 200, End: 400},
{Selector: "2", Start: 400, End: 500},
{Selector: "3", Start: 500, End: 700},
{Selector: "1", Start: 200000000, End: 400000000},
{Selector: "2", Start: 400000000, End: 500000000},
{Selector: "3", Start: 500000000, End: 700000000},
},
}

Expand Down Expand Up @@ -822,8 +822,8 @@ func TestQuerier_SelectSamplesWithDeletes(t *testing.T) {

request := logproto.SampleQueryRequest{
Selector: `count_over_time({foo="bar"}[5m])`,
Start: time.Unix(0, 300),
End: time.Unix(0, 600),
Start: time.Unix(0, 300000000),
End: time.Unix(0, 600000000),
}

_, err = q.SelectSamples(ctx, logql.SelectSampleParams{SampleQueryRequest: &request})
Expand All @@ -835,9 +835,9 @@ func TestQuerier_SelectSamplesWithDeletes(t *testing.T) {
Start: request.Start,
End: request.End,
Deletes: []*logproto.Delete{
{Selector: "1", Start: 200, End: 400},
{Selector: "2", Start: 400, End: 500},
{Selector: "3", Start: 500, End: 700},
{Selector: "1", Start: 200000000, End: 400000000},
{Selector: "2", Start: 400000000, End: 500000000},
{Selector: "3", Start: 500000000, End: 700000000},
},
},
}
Expand Down

0 comments on commit 7299e3b

Please sign in to comment.