Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unordered chunks #4001

Merged
merged 22 commits into from
Jul 27, 2021
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
e15f53f
merge feature/unordered-replay
owen-d Jul 9, 2021
1147aa3
interoperable head chunks
owen-d Jul 13, 2021
e2cc745
Merge remote-tracking branch 'upstream/main' into headblock-interop
owen-d Jul 13, 2021
e92b4da
memchunk block interop
owen-d Jul 14, 2021
ad4acae
retain ordered memchunk optimizations when possible
owen-d Jul 14, 2021
5efceb3
tests+bench for unordered chunk reads
owen-d Jul 14, 2021
e3be2c4
reorder on chunk close
owen-d Jul 15, 2021
35ba4fa
[wip] ingester stream unorderd
owen-d Jul 16, 2021
500631c
unordered writes default in testware config, fixes OOO bug & removes …
owen-d Jul 16, 2021
96661ac
validity window is 1/2 of max age & fixes old transfer test
owen-d Jul 16, 2021
fea3831
more consistent headblock checking/creation
owen-d Jul 20, 2021
9a5e43c
more cohesive encoding tests
owen-d Jul 20, 2021
b9d9718
unordered stream test with validity bounds
owen-d Jul 20, 2021
d2c8a02
Merge remote-tracking branch 'upstream/main' into feature-unordered-i…
owen-d Jul 20, 2021
3e8b165
compat - unordered
owen-d Jul 20, 2021
9ba70c8
reinstates memchunk defaults when rebounding & updates storage test c…
owen-d Jul 20, 2021
e4b77a3
lint
owen-d Jul 20, 2021
4e89368
Merge remote-tracking branch 'upstream/main' into feature-unordered-i…
owen-d Jul 21, 2021
a1e82f9
reorder across blocks doesnt overflow
owen-d Jul 21, 2021
14b495e
respect chunk configs during rebounding when possible
owen-d Jul 21, 2021
10a2008
only sync checks on ordered writes
owen-d Jul 21, 2021
038cce6
Merge remote-tracking branch 'upstream/main' into feature-unordered-i…
owen-d Jul 27, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 130 additions & 16 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,34 @@ const (
defaultBlockSize = 256 * 1024
)

var (
HeadBlockFmts = []HeadBlockFmt{OrderedHeadBlockFmt, UnorderedHeadBlockFmt}
)

type HeadBlockFmt byte

func (f HeadBlockFmt) Byte() byte { return byte(f) }

func (f HeadBlockFmt) String() string {
switch {
case f < UnorderedHeadBlockFmt:
return "ordered"
case f == UnorderedHeadBlockFmt:
return "unordered"
default:
return fmt.Sprintf("unknown: %v", byte(f))
}
}

func (f HeadBlockFmt) NewBlock() HeadBlock {
switch {
case f < UnorderedHeadBlockFmt:
return &headBlock{}
default:
return newUnorderedHeadBlock()
}
}

const (
_ HeadBlockFmt = iota
// placeholders to start splitting chunk formats vs head block
Expand Down Expand Up @@ -93,6 +117,7 @@ type MemChunk struct {
// the chunk format default to v2
format byte
encoding Encoding
headFmt HeadBlockFmt
}

type block struct {
Expand Down Expand Up @@ -309,19 +334,18 @@ type entry struct {
}

// NewMemChunk returns a new in-mem chunk.
func NewMemChunk(enc Encoding, blockSize, targetSize int) *MemChunk {
c := &MemChunk{
func NewMemChunk(enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
return &MemChunk{
blockSize: blockSize, // The blockSize in bytes.
targetSize: targetSize, // Desired chunk size in compressed bytes
blocks: []block{},

head: &headBlock{},
format: DefaultChunkFormat,
head: head.NewBlock(),

encoding: enc,
headFmt: head,
}

return c
}

// NewByteChunk returns a MemChunk on the passed bytes.
Expand Down Expand Up @@ -563,12 +587,19 @@ func (c *MemChunk) CheckpointSize() (chunk, head int) {
return c.BytesSize(), c.head.CheckpointSize()
}

func MemchunkFromCheckpoint(chk, head []byte, blockSize int, targetSize int) (*MemChunk, error) {
func MemchunkFromCheckpoint(chk, head []byte, desired HeadBlockFmt, blockSize int, targetSize int) (*MemChunk, error) {
mc, err := NewByteChunk(chk, blockSize, targetSize)
if err != nil {
return nil, err
}
return mc, mc.head.LoadBytes(head)
h, err := HeadFromCheckpoint(head, desired)
if err != nil {
return nil, err
}

mc.head = h
mc.headFmt = desired
return mc, nil
}

// Encoding implements Chunk.
Expand Down Expand Up @@ -642,7 +673,7 @@ func (c *MemChunk) Append(entry *logproto.Entry) error {

// If the head block is empty but there are cut blocks, we have to make
// sure the new entry is not out of order compared to the previous block
if c.head.IsEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp {
if c.headFmt < UnorderedHeadBlockFmt && c.head.IsEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp {
return ErrOutOfOrder
}

Expand All @@ -660,7 +691,37 @@ func (c *MemChunk) Append(entry *logproto.Entry) error {
// Close implements Chunk.
// TODO: Fix this to check edge cases.
func (c *MemChunk) Close() error {
return c.cut()
if err := c.cut(); err != nil {
return err
}
return c.reorder()
}

// reorder ensures all blocks in a chunk are in
// monotonically increasing order.
// This mutates
func (c *MemChunk) reorder() error {
var lastMax int64 // placeholder to check order across blocks
ordered := true
for _, b := range c.blocks {
if b.mint < lastMax {
ordered = false
}
lastMax = b.maxt
}

if ordered {
return nil
}

// Otherwise, we need to rebuild the blocks
from, to := c.Bounds()
newC, err := c.Rebound(from, to)
if err != nil {
return err
}
*c = *newC.(*MemChunk)
return nil
}

// cut a new block and add it to finished blocks.
Expand Down Expand Up @@ -712,14 +773,28 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
blockItrs := make([]iter.EntryIterator, 0, len(c.blocks)+1)
var headIterator iter.EntryIterator

var lastMax int64 // placeholder to check order across blocks
ordered := true
for _, b := range c.blocks {

// skip this block
if maxt < b.mint || b.maxt < mint {
continue
}

if b.mint < lastMax {
ordered = false
}
lastMax = b.maxt

blockItrs = append(blockItrs, encBlock{c.encoding, b}.Iterator(ctx, pipeline))
}

if !c.head.IsEmpty() {
from, _ := c.head.Bounds()
if from < lastMax {
ordered = false
}
headIterator = c.head.Iterator(ctx, direction, mint, maxt, pipeline)
}

Expand All @@ -728,8 +803,16 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
if headIterator != nil {
blockItrs = append(blockItrs, headIterator)
}

var it iter.EntryIterator
if ordered {
it = iter.NewNonOverlappingIterator(blockItrs, "")
} else {
it = iter.NewHeapIterator(ctx, blockItrs, direction)
}

return iter.NewTimeRangedIterator(
iter.NewNonOverlappingIterator(blockItrs, ""),
it,
time.Unix(0, mint),
time.Unix(0, maxt),
), nil
Expand All @@ -755,27 +838,50 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
blockItrs[i], blockItrs[j] = blockItrs[j], blockItrs[i]
}

return iter.NewNonOverlappingIterator(blockItrs, ""), nil
if ordered {
return iter.NewNonOverlappingIterator(blockItrs, ""), nil
}
return iter.NewHeapIterator(ctx, blockItrs, direction), nil

}

// Iterator implements Chunk.
func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator {
mint, maxt := from.UnixNano(), through.UnixNano()
its := make([]iter.SampleIterator, 0, len(c.blocks)+1)

var lastMax int64 // placeholder to check order across blocks
ordered := true
for _, b := range c.blocks {
// skip this block
if maxt < b.mint || b.maxt < mint {
continue
}

if b.mint < lastMax {
ordered = false
}
lastMax = b.maxt
its = append(its, encBlock{c.encoding, b}.SampleIterator(ctx, extractor))
}

if !c.head.IsEmpty() {
from, _ := c.head.Bounds()
if from < lastMax {
ordered = false
}
its = append(its, c.head.SampleIterator(ctx, mint, maxt, extractor))
}

var it iter.SampleIterator
if ordered {
it = iter.NewNonOverlappingSampleIterator(its, "")
} else {
it = iter.NewHeapSampleIterator(ctx, its)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might need variant of HeapIterator that just reorder samples/logs depending on how that affects performance.

I'm guessing this only happening for chunk in memory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the unordered version only happens on the ingesters. Since we reorder chunks before flushing to storage (if needed), the unoptimized heapIter versions are only run on the ingesters themselves and only if the ingester is receiving out of order data.

edit: I should look at refactoring the order detection into a helper fn. These methods are way too complex for my tastes :(

}

return iter.NewTimeRangedSampleIterator(
iter.NewNonOverlappingSampleIterator(its, ""),
it,
mint,
maxt,
)
Expand All @@ -802,10 +908,18 @@ func (c *MemChunk) Rebound(start, end time.Time) (Chunk, error) {
return nil, err
}

// Using defaultBlockSize for target block size.
// The alternative here could be going over all the blocks and using the size of the largest block as target block size but I(Sandeep) feel that it is not worth the complexity.
// For target chunk size I am using compressed size of original chunk since the newChunk should anyways be lower in size than that.
newChunk := NewMemChunk(c.Encoding(), defaultBlockSize, c.CompressedSize())
var newChunk *MemChunk
// as close as possible, respect the block/target sizes specified. However,
// if the blockSize is not set, use reasonable defaults.
if c.blockSize > 0 {
newChunk = NewMemChunk(c.Encoding(), c.headFmt, c.blockSize, c.targetSize)
} else {
// Using defaultBlockSize for target block size.
// The alternative here could be going over all the blocks and using the size of the largest block as target block size but I(Sandeep) feel that it is not worth the complexity.
// For target chunk size I am using compressed size of original chunk since the newChunk should anyways be lower in size than that.
newChunk = NewMemChunk(c.Encoding(), c.headFmt, defaultBlockSize, c.CompressedSize())

}

for itr.Next() {
entry := itr.Entry()
Expand Down
Loading