Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: try reading chunks which have incorrect offset for blocks #13720

Merged
merged 4 commits into from
Aug 4, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 50 additions & 10 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,13 +441,18 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me

metasOffset := uint64(0)
metasLen := uint64(0)
expectedBlockOffset := 0
if version >= ChunkFormatV4 {
// version >= 4 starts writing length of sections after their offsets
// version >= 4 starts writing length of sections before their offsets
metasLen, metasOffset = readSectionLenAndOffset(chunkMetasSectionIdx)
structuredMetadataLength, structuredMetadataOffset := readSectionLenAndOffset(chunkStructuredMetadataSectionIdx)
expectedBlockOffset = int(structuredMetadataLength + structuredMetadataOffset + 4)
} else {
// version <= 3 does not store length of metas. metas are followed by metasOffset + hash and then the chunk ends
metasOffset = binary.BigEndian.Uint64(b[len(b)-8:])
metasLen = uint64(len(b)-(8+4)) - metasOffset
// version 1 writes blocks after version number while version 2 and 3 write blocks after chunk encoding
expectedBlockOffset = len(b) - len(db.b)
}
mb := b[metasOffset : metasOffset+metasLen]
db = decbuf{b: mb}
Expand Down Expand Up @@ -476,18 +481,35 @@ func newByteChunk(b []byte, blockSize, targetSize int, fromCheckpoint bool) (*Me
blk.uncompressedSize = db.uvarint()
}
l := db.uvarint()
if blk.offset+l > len(b) {
return nil, fmt.Errorf("block %d offset %d + length %d exceeds chunk length %d", i, blk.offset, l, len(b))
}
blk.b = b[blk.offset : blk.offset+l]

// Verify checksums.
expCRC := binary.BigEndian.Uint32(b[blk.offset+l:])
if expCRC != crc32.Checksum(blk.b, castagnoliTable) {
_ = level.Error(util_log.Logger).Log("msg", "Checksum does not match for a block in chunk, this block will be skipped", "err", ErrInvalidChecksum)
continue
invalidBlockErr := validateBlock(b, blk.offset, l)
if invalidBlockErr != nil {
level.Error(util_log.Logger).Log("msg", "invalid block found", "err", invalidBlockErr)
// if block is expected to have different offset than what is encoded, see if we get a valid block using expected offset
if blk.offset != expectedBlockOffset {
_ = level.Error(util_log.Logger).Log("msg", "block offset does not match expected one, will try reading with expected offset", "actual", blk.offset, "expected", expectedBlockOffset)
blk.offset = expectedBlockOffset
if err := validateBlock(b, blk.offset, l); err != nil {
level.Error(util_log.Logger).Log("msg", "could not find valid block using expected offset", "err", err)
} else {
invalidBlockErr = nil
level.Info(util_log.Logger).Log("msg", "valid block found using expected offset")
}
}

// if the block read with expected offset is still invalid, do not continue further
if invalidBlockErr != nil {
if errors.Is(invalidBlockErr, ErrInvalidChecksum) {
expectedBlockOffset += l + 4
continue
}
return nil, invalidBlockErr
}
}

// next block starts at current block start + current block length + checksum
expectedBlockOffset = blk.offset + l + 4
blk.b = b[blk.offset : blk.offset+l]
bc.blocks = append(bc.blocks, blk)

// Update the counter used to track the size of cut blocks.
Expand Down Expand Up @@ -1696,3 +1718,21 @@ func (e *sampleBufferedIterator) StreamHash() uint64 { return e.extractor.BaseLa
func (e *sampleBufferedIterator) At() logproto.Sample {
return e.cur
}

// validateBlock validates block by doing following checks:
// 1. Offset+length do not overrun size of the chunk from which we are reading the block.
// 2. Checksum of the block we will read matches the stored checksum in the chunk.
func validateBlock(chunkBytes []byte, offset, length int) error {
if offset+length > len(chunkBytes) {
return fmt.Errorf("offset %d + length %d exceeds chunk length %d", offset, length, len(chunkBytes))
}

blockBytes := chunkBytes[offset : offset+length]
// Verify checksums.
expCRC := binary.BigEndian.Uint32(chunkBytes[offset+length:])
if expCRC != crc32.Checksum(blockBytes, castagnoliTable) {
return ErrInvalidChecksum
}

return nil
}
117 changes: 117 additions & 0 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/binary"
"fmt"
"hash"
"math"
"math/rand"
"sort"
Expand Down Expand Up @@ -2044,3 +2045,119 @@ func TestMemChunk_IteratorWithStructuredMetadata(t *testing.T) {
})
}
}

func TestDecodeChunkIncorrectBlockOffset(t *testing.T) {
// use small block size to build multiple blocks in the test chunk
blockSize := 10

for _, format := range allPossibleFormats {
t.Run(fmt.Sprintf("chunkFormat:%v headBlockFmt:%v", format.chunkFormat, format.headBlockFmt), func(t *testing.T) {
for incorrectOffsetBlockNum := 0; incorrectOffsetBlockNum < 3; incorrectOffsetBlockNum++ {
t.Run(fmt.Sprintf("inorrect offset block: %d", incorrectOffsetBlockNum), func(t *testing.T) {
chk := NewMemChunk(format.chunkFormat, EncNone, format.headBlockFmt, blockSize, testTargetSize)
ts := time.Now().Unix()
for i := 0; i < 3; i++ {
dup, err := chk.Append(&logproto.Entry{
Timestamp: time.Now(),
Line: fmt.Sprintf("%d-%d", ts, i),
StructuredMetadata: []logproto.LabelAdapter{
{Name: "foo", Value: fmt.Sprintf("%d-%d", ts, i)},
},
})
require.NoError(t, err)
require.False(t, dup)
}

require.Len(t, chk.blocks, 3)

b, err := chk.Bytes()
require.NoError(t, err)

metasOffset := binary.BigEndian.Uint64(b[len(b)-8:])

w := bytes.NewBuffer(nil)
eb := EncodeBufferPool.Get().(*encbuf)
defer EncodeBufferPool.Put(eb)

crc32Hash := crc32HashPool.Get().(hash.Hash32)
defer crc32HashPool.Put(crc32Hash)

crc32Hash.Reset()
eb.reset()

// BEGIN - code copied from writeTo func starting from encoding of block metas to change offset of a block
eb.putUvarint(len(chk.blocks))

for i, b := range chk.blocks {
eb.putUvarint(b.numEntries)
eb.putVarint64(b.mint)
eb.putVarint64(b.maxt)
// change offset of one block
blockOffset := b.offset
if i == incorrectOffsetBlockNum {
blockOffset += 5
}
eb.putUvarint(blockOffset)
if chk.format >= ChunkFormatV3 {
eb.putUvarint(b.uncompressedSize)
}
eb.putUvarint(len(b.b))
}
metasLen := len(eb.get())
eb.putHash(crc32Hash)

_, err = w.Write(eb.get())
require.NoError(t, err)

if chk.format >= ChunkFormatV4 {
// Write structured metadata offset and length
eb.reset()

eb.putBE64int(int(binary.BigEndian.Uint64(b[len(b)-32:])))
eb.putBE64int(int(binary.BigEndian.Uint64(b[len(b)-24:])))
_, err = w.Write(eb.get())
require.NoError(t, err)
}

// Write the metasOffset.
eb.reset()
if chk.format >= ChunkFormatV4 {
eb.putBE64int(metasLen)
}
eb.putBE64int(int(metasOffset))
_, err = w.Write(eb.get())
require.NoError(t, err)
// END - code copied from writeTo func

// build chunk using pre-block meta section + rewritten remainder of the chunk with incorrect offset for a block
chkWithIncorrectOffset := make([]byte, int(metasOffset)+w.Len())
copy(chkWithIncorrectOffset, b[:metasOffset])
copy(chkWithIncorrectOffset[metasOffset:], w.Bytes())

// decoding the problematic chunk should succeed
decodedChkWithIncorrectOffset, err := newByteChunk(chkWithIncorrectOffset, blockSize, testTargetSize, false)
require.NoError(t, err)

require.Len(t, decodedChkWithIncorrectOffset.blocks, len(chk.blocks))

// both chunks should have same log lines
origChunkItr, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}))
require.NoError(t, err)

corruptChunkItr, err := decodedChkWithIncorrectOffset.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}))
require.NoError(t, err)

numEntriesFound := 0
for origChunkItr.Next() {
numEntriesFound++
require.True(t, corruptChunkItr.Next())
require.Equal(t, origChunkItr.At(), corruptChunkItr.At())
}

require.False(t, corruptChunkItr.Next())
require.Equal(t, 3, numEntriesFound)
})
}
})
}
}
Loading