Skip to content

Commit

Permalink
refactor: Chunkref as alias to logproto.ShortRef (#12159)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored and pull[bot] committed Sep 23, 2024
1 parent 4b1b766 commit 1000000
Show file tree
Hide file tree
Showing 14 changed files with 72 additions and 81 deletions.
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ func newBatchedChunkLoader(

return v1.ChunkRefWithIter{
Ref: v1.ChunkRef{
Start: c.From,
End: c.Through,
From: c.From,
Through: c.Through,
Checksum: c.Checksum,
},
Itr: itr,
Expand Down
6 changes: 3 additions & 3 deletions pkg/bloomcompactor/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,13 +265,13 @@ func (s *StoreChunkLoader) Load(ctx context.Context, userID string, series *v1.S
// us in the case of refactoring/changing this and likely isn't a perf bottleneck.
chksByFetcher := make(map[*fetcher.Fetcher][]chunk.Chunk)
for _, chk := range series.Chunks {
fetcher := s.fetcherProvider.GetChunkFetcher(chk.Start)
fetcher := s.fetcherProvider.GetChunkFetcher(chk.From)
chksByFetcher[fetcher] = append(chksByFetcher[fetcher], chunk.Chunk{
ChunkRef: logproto.ChunkRef{
Fingerprint: uint64(series.Fingerprint),
UserID: userID,
From: chk.Start,
Through: chk.End,
From: chk.From,
Through: chk.Through,
Checksum: chk.Checksum,
},
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomcompactor/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ func (t *TSDBSeriesIter) background() {
}
for _, chk := range chks {
res.Chunks = append(res.Chunks, v1.ChunkRef{
Start: model.Time(chk.MinTime),
End: model.Time(chk.MaxTime),
From: model.Time(chk.MinTime),
Through: model.Time(chk.MaxTime),
Checksum: chk.Checksum,
})
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/bloomcompactor/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func (f forSeriesTestImpl) ForSeries(
unmapped := make([]index.ChunkMeta, 0, len(f[i].Chunks))
for _, c := range f[i].Chunks {
unmapped = append(unmapped, index.ChunkMeta{
MinTime: int64(c.Start),
MaxTime: int64(c.End),
MinTime: int64(c.From),
MaxTime: int64(c.Through),
Checksum: c.Checksum,
})
}
Expand All @@ -48,13 +48,13 @@ func TestTSDBSeriesIter(t *testing.T) {
Fingerprint: 1,
Chunks: []v1.ChunkRef{
{
Start: 0,
End: 1,
From: 0,
Through: 1,
Checksum: 2,
},
{
Start: 3,
End: 4,
From: 3,
Through: 4,
Checksum: 5,
},
},
Expand Down
7 changes: 5 additions & 2 deletions pkg/bloomgateway/bloomgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,12 @@ func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, r
for i := range res.Removals {
toRemove := res.Removals[i]
for j := 0; j < len(req.Refs[idx].Refs); j++ {
if req.Refs[idx].Refs[j] == nil {
continue
}

// TODO(owen-d): These should check start/end/checksum, not just checksum.
// TODO(owen-d): These structs have equivalent data -- can we combine them?
if toRemove.Checksum == req.Refs[idx].Refs[j].Checksum {
if logproto.ShortRef(toRemove) == *req.Refs[idx].Refs[j] {
filtered += 1

// TODO(owen-d): usually not a problem (n is small), but I've seen some series have
Expand Down
12 changes: 1 addition & 11 deletions pkg/bloomgateway/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,12 @@ func getFromThrough(refs []*logproto.ShortRef) (model.Time, model.Time) {
return refs[0].From, maxItem.Through
}

// convertToShortRefs converts a v1.ChunkRefs into []*logproto.ShortRef
// TODO(chaudum): Avoid conversion by transferring v1.ChunkRefs in gRPC request.
func convertToShortRefs(refs v1.ChunkRefs) []*logproto.ShortRef {
result := make([]*logproto.ShortRef, 0, len(refs))
for _, ref := range refs {
result = append(result, &logproto.ShortRef{From: ref.Start, Through: ref.End, Checksum: ref.Checksum})
}
return result
}

// convertToChunkRefs converts a []*logproto.ShortRef into v1.ChunkRefs
// TODO(chaudum): Avoid conversion by transferring v1.ChunkRefs in gRPC request.
func convertToChunkRefs(refs []*logproto.ShortRef) v1.ChunkRefs {
result := make(v1.ChunkRefs, 0, len(refs))
for _, ref := range refs {
result = append(result, v1.ChunkRef{Start: ref.From, End: ref.Through, Checksum: ref.Checksum})
result = append(result, v1.ChunkRef{From: ref.From, Through: ref.Through, Checksum: ref.Checksum})
}
return result
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/bloomgateway/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,8 @@ func createQueryInputFromBlockData(t *testing.T, tenant string, data [][]v1.Seri
res = append(res, &logproto.ChunkRef{
Fingerprint: uint64(data[i][j].Series.Fingerprint),
UserID: tenant,
From: chk.Start,
Through: chk.End,
From: chk.From,
Through: chk.Through,
Checksum: chk.Checksum,
})
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/bloom/v1/bloom_tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ func clearCache(cache map[string]interface{}) {
func prefixedToken(ngram int, chk ChunkRef, buf []byte) ([]byte, int) {
enc := encoding.EncWith(buf)
enc.Reset()
enc.PutBE64(uint64(chk.Start))
enc.PutBE64(uint64(chk.End))
enc.PutBE64(uint64(chk.From))
enc.PutBE64(uint64(chk.Through))
enc.PutBE32(chk.Checksum)
prefixLn := enc.Len() // record the length of the prefix

Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/bloom/v1/bloom_tokenizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ func TestPrefixedKeyCreation(t *testing.T) {
var ones uint64 = 0xffffffffffffffff

ref := ChunkRef{
Start: 0,
End: model.Time(int64(ones)),
From: 0,
Through: model.Time(int64(ones)),
Checksum: 0xffffffff,
}
for _, tc := range []struct {
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/bloom/v1/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,14 +438,14 @@ func (b *IndexBuilder) Append(series SeriesWithOffset) error {

// must be > 1
func chkBounds(chks []ChunkRef) (from, through model.Time) {
from, through = chks[0].Start, chks[0].End
from, through = chks[0].From, chks[0].Through
for _, chk := range chks[1:] {
if chk.Start.Before(from) {
from = chk.Start
if chk.From.Before(from) {
from = chk.From
}

if chk.End.After(through) {
through = chk.End
if chk.Through.After(through) {
through = chk.Through
}
}
return
Expand Down
26 changes: 12 additions & 14 deletions pkg/storage/bloom/v1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/prometheus/common/model"

"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util/encoding"
)

Expand Down Expand Up @@ -399,36 +400,33 @@ func (s *SeriesWithOffset) Decode(dec *encoding.Decbuf, previousFp model.Fingerp
return s.Fingerprint, s.Offset, dec.Err()
}

type ChunkRef struct {
Start, End model.Time
Checksum uint32
}
type ChunkRef logproto.ShortRef

func (r *ChunkRef) Less(other ChunkRef) bool {
if r.Start != other.Start {
return r.Start < other.Start
if r.From != other.From {
return r.From < other.From
}

if r.End != other.End {
return r.End < other.End
if r.Through != other.Through {
return r.Through < other.Through
}

return r.Checksum < other.Checksum
}

func (r *ChunkRef) Encode(enc *encoding.Encbuf, previousEnd model.Time) model.Time {
// delta encode start time
enc.PutVarint64(int64(r.Start - previousEnd))
enc.PutVarint64(int64(r.End - r.Start))
enc.PutVarint64(int64(r.From - previousEnd))
enc.PutVarint64(int64(r.Through - r.From))
enc.PutBE32(r.Checksum)
return r.End
return r.Through
}

func (r *ChunkRef) Decode(dec *encoding.Decbuf, previousEnd model.Time) (model.Time, error) {
r.Start = previousEnd + model.Time(dec.Varint64())
r.End = r.Start + model.Time(dec.Varint64())
r.From = previousEnd + model.Time(dec.Varint64())
r.Through = r.From + model.Time(dec.Varint64())
r.Checksum = dec.Be32()
return r.End, dec.Err()
return r.Through, dec.Err()
}

type BloomOffset struct {
Expand Down
52 changes: 26 additions & 26 deletions pkg/storage/bloom/v1/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ func TestSeriesEncoding(t *testing.T) {
Fingerprint: model.Fingerprint(1),
Chunks: []ChunkRef{
{
Start: 1,
End: 2,
From: 1,
Through: 2,
Checksum: 3,
},
{
Start: 4,
End: 5,
From: 4,
Through: 5,
Checksum: 6,
},
},
Expand Down Expand Up @@ -71,53 +71,53 @@ func TestChunkRefCompare(t *testing.T) {
{
desc: "left empty",
left: nil,
right: ChunkRefs{{Start: 1, End: 2}},
right: ChunkRefs{{From: 1, Through: 2}},
exclusive: nil,
inclusive: nil,
},
{
desc: "right empty",
left: ChunkRefs{{Start: 1, End: 2}},
left: ChunkRefs{{From: 1, Through: 2}},
right: nil,
exclusive: ChunkRefs{{Start: 1, End: 2}},
exclusive: ChunkRefs{{From: 1, Through: 2}},
inclusive: nil,
},
{
desc: "left before right",
left: ChunkRefs{{Start: 1, End: 2}},
right: ChunkRefs{{Start: 3, End: 4}},
exclusive: ChunkRefs{{Start: 1, End: 2}},
left: ChunkRefs{{From: 1, Through: 2}},
right: ChunkRefs{{From: 3, Through: 4}},
exclusive: ChunkRefs{{From: 1, Through: 2}},
inclusive: nil,
},
{
desc: "left after right",
left: ChunkRefs{{Start: 3, End: 4}},
right: ChunkRefs{{Start: 1, End: 2}},
exclusive: ChunkRefs{{Start: 3, End: 4}},
left: ChunkRefs{{From: 3, Through: 4}},
right: ChunkRefs{{From: 1, Through: 2}},
exclusive: ChunkRefs{{From: 3, Through: 4}},
inclusive: nil,
},
{
desc: "left overlaps right",
left: ChunkRefs{
{Start: 1, End: 3},
{Start: 2, End: 4},
{Start: 3, End: 5},
{Start: 4, End: 6},
{Start: 5, End: 7},
{From: 1, Through: 3},
{From: 2, Through: 4},
{From: 3, Through: 5},
{From: 4, Through: 6},
{From: 5, Through: 7},
},
right: ChunkRefs{
{Start: 2, End: 4},
{Start: 4, End: 6},
{Start: 5, End: 6}, // not in left
{From: 2, Through: 4},
{From: 4, Through: 6},
{From: 5, Through: 6}, // not in left
},
exclusive: ChunkRefs{
{Start: 1, End: 3},
{Start: 3, End: 5},
{Start: 5, End: 7},
{From: 1, Through: 3},
{From: 3, Through: 5},
{From: 5, Through: 7},
},
inclusive: ChunkRefs{
{Start: 2, End: 4},
{Start: 4, End: 6},
{From: 2, Through: 4},
{From: 4, Through: 6},
},
},
} {
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/bloom/v1/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func MkBasicSeriesWithBlooms(nSeries, _ int, fromFp, throughFp model.Fingerprint
from := fromTs.Add(timeDelta * time.Duration(i))
series.Chunks = []ChunkRef{
{
Start: from,
End: from.Add(timeDelta),
From: from,
Through: from.Add(timeDelta),
Checksum: uint32(i),
},
}
Expand Down
4 changes: 2 additions & 2 deletions tools/tsdb/bloom-tester/tokenizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ func (bt *BloomTokenizer) PopulateSeriesWithBloom(seriesWithBloom *v1.SeriesWith

}
seriesWithBloom.Series.Chunks = append(seriesWithBloom.Series.Chunks, v1.ChunkRef{
Start: chunks[idx].From,
End: chunks[idx].Through,
From: chunks[idx].From,
Through: chunks[idx].Through,
Checksum: chunks[idx].Checksum,
})
} // for each chunk
Expand Down

0 comments on commit 1000000

Please sign in to comment.