diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index e297a482da2e..362df2c89338 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -44,6 +44,20 @@ const ( defaultBlockSize = 256 * 1024 ) +type HeadBlockFmt byte + +func (f HeadBlockFmt) Byte() byte { return byte(f) } + +const ( + _ HeadBlockFmt = iota + // placeholders to start splitting chunk formats vs head block + // fmts at v3 + _ + _ + OrderedHeadBlockFmt + UnorderedHeadBlockFmt +) + var magicNumber = uint32(0x12EE56A) // The table gets initialized with sync.Once but may still cause a race @@ -74,7 +88,7 @@ type MemChunk struct { cutBlockSize int // Current in-mem block being appended to. - head *headBlock + head HeadBlock // the chunk format default to v2 format byte @@ -102,11 +116,17 @@ type headBlock struct { mint, maxt int64 } -func (hb *headBlock) isEmpty() bool { +func (hb *headBlock) Format() HeadBlockFmt { return OrderedHeadBlockFmt } + +func (hb *headBlock) IsEmpty() bool { return len(hb.entries) == 0 } -func (hb *headBlock) clear() { +func (hb *headBlock) Entries() int { return len(hb.entries) } + +func (hb *headBlock) UncompressedSize() int { return hb.size } + +func (hb *headBlock) Reset() { if hb.entries != nil { hb.entries = hb.entries[:0] } @@ -115,8 +135,10 @@ func (hb *headBlock) clear() { hb.maxt = 0 } -func (hb *headBlock) append(ts int64, line string) error { - if !hb.isEmpty() && hb.maxt > ts { +func (hb *headBlock) Bounds() (int64, int64) { return hb.mint, hb.maxt } + +func (hb *headBlock) Append(ts int64, line string) error { + if !hb.IsEmpty() && hb.maxt > ts { return ErrOutOfOrder } @@ -130,7 +152,7 @@ func (hb *headBlock) append(ts int64, line string) error { return nil } -func (hb *headBlock) serialise(pool WriterPool) ([]byte, error) { +func (hb *headBlock) Serialise(pool WriterPool) ([]byte, error) { inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer) defer func() { inBuf.Reset() @@ -164,14 +186,14 @@ func (hb *headBlock) serialise(pool WriterPool) ([]byte, error) { // CheckpointBytes serializes a headblock to []byte. This is used by the WAL checkpointing, // which does not want to mutate a chunk by cutting it (otherwise risking content address changes), but // needs to serialize/deserialize the data to disk to ensure data durability. -func (hb *headBlock) CheckpointBytes(version byte, b []byte) ([]byte, error) { +func (hb *headBlock) CheckpointBytes(b []byte) ([]byte, error) { buf := bytes.NewBuffer(b[:0]) - err := hb.CheckpointTo(version, buf) + err := hb.CheckpointTo(buf) return buf.Bytes(), err } // CheckpointSize returns the estimated size of the headblock checkpoint. -func (hb *headBlock) CheckpointSize(version byte) int { +func (hb *headBlock) CheckpointSize() int { size := 1 // version size += binary.MaxVarintLen32 * 2 // total entries + total size size += binary.MaxVarintLen64 * 2 // mint,maxt @@ -184,13 +206,13 @@ func (hb *headBlock) CheckpointSize(version byte) int { } // CheckpointTo serializes a headblock to a `io.Writer`. see `CheckpointBytes`. -func (hb *headBlock) CheckpointTo(version byte, w io.Writer) error { +func (hb *headBlock) CheckpointTo(w io.Writer) error { eb := EncodeBufferPool.Get().(*encbuf) defer EncodeBufferPool.Put(eb) eb.reset() - eb.putByte(version) + eb.putByte(byte(hb.Format())) _, err := w.Write(eb.get()) if err != nil { return errors.Wrap(err, "write headBlock version") @@ -225,7 +247,7 @@ func (hb *headBlock) CheckpointTo(version byte, w io.Writer) error { return nil } -func (hb *headBlock) FromCheckpoint(b []byte) error { +func (hb *headBlock) LoadBytes(b []byte) error { if len(b) < 1 { return nil } @@ -267,6 +289,20 @@ func (hb *headBlock) FromCheckpoint(b []byte) error { return nil } +func (hb *headBlock) Convert(version HeadBlockFmt) (HeadBlock, error) { + if version < UnorderedHeadBlockFmt { + return hb, nil + } + out := newUnorderedHeadBlock() + + for _, e := range hb.entries { + if err := out.Append(e.t, e.s); err != nil { + return nil, err + } + } + return out, nil +} + type entry struct { t int64 s string @@ -511,11 +547,11 @@ func (c *MemChunk) SerializeForCheckpointTo(chk, head io.Writer) error { return err } - if c.head.isEmpty() { + if c.head.IsEmpty() { return nil } - err = c.head.CheckpointTo(c.format, head) + err = c.head.CheckpointTo(head) if err != nil { return err } @@ -524,7 +560,7 @@ func (c *MemChunk) SerializeForCheckpointTo(chk, head io.Writer) error { } func (c *MemChunk) CheckpointSize() (chunk, head int) { - return c.BytesSize(), c.head.CheckpointSize(c.format) + return c.BytesSize(), c.head.CheckpointSize() } func MemchunkFromCheckpoint(chk, head []byte, blockSize int, targetSize int) (*MemChunk, error) { @@ -532,7 +568,7 @@ func MemchunkFromCheckpoint(chk, head []byte, blockSize int, targetSize int) (*M if err != nil { return nil, err } - return mc, mc.head.FromCheckpoint(head) + return mc, mc.head.LoadBytes(head) } // Encoding implements Chunk. @@ -547,9 +583,7 @@ func (c *MemChunk) Size() int { ne += blk.numEntries } - if !c.head.isEmpty() { - ne += len(c.head.entries) - } + ne += c.head.Entries() return ne } @@ -564,7 +598,7 @@ func (c *MemChunk) SpaceFor(e *logproto.Entry) bool { if c.targetSize > 0 { // This is looking to see if the uncompressed lines will fit which is not // a great check, but it will guarantee we are always under the target size - newHBSize := c.head.size + len(e.Line) + newHBSize := c.head.UncompressedSize() + len(e.Line) return (c.cutBlockSize + newHBSize) < c.targetSize } // if targetSize is not defined, default to the original behavior of fixed blocks per chunk @@ -575,9 +609,7 @@ func (c *MemChunk) SpaceFor(e *logproto.Entry) bool { func (c *MemChunk) UncompressedSize() int { size := 0 - if !c.head.isEmpty() { - size += c.head.size - } + size += c.head.UncompressedSize() for _, b := range c.blocks { size += b.uncompressedSize @@ -590,9 +622,7 @@ func (c *MemChunk) UncompressedSize() int { func (c *MemChunk) CompressedSize() int { size := 0 // Better to account for any uncompressed data than ignore it even though this isn't accurate. - if !c.head.isEmpty() { - size += c.head.size - } + size += c.head.UncompressedSize() size += c.cutBlockSize return size } @@ -612,15 +642,15 @@ func (c *MemChunk) Append(entry *logproto.Entry) error { // If the head block is empty but there are cut blocks, we have to make // sure the new entry is not out of order compared to the previous block - if c.head.isEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp { + if c.head.IsEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp { return ErrOutOfOrder } - if err := c.head.append(entryTimestamp, entry.Line); err != nil { + if err := c.head.Append(entryTimestamp, entry.Line); err != nil { return err } - if c.head.size >= c.blockSize { + if c.head.UncompressedSize() >= c.blockSize { return c.cut() } @@ -635,44 +665,41 @@ func (c *MemChunk) Close() error { // cut a new block and add it to finished blocks. func (c *MemChunk) cut() error { - if c.head.isEmpty() { + if c.head.IsEmpty() { return nil } - b, err := c.head.serialise(getWriterPool(c.encoding)) + b, err := c.head.Serialise(getWriterPool(c.encoding)) if err != nil { return err } + mint, maxt := c.head.Bounds() c.blocks = append(c.blocks, block{ b: b, - numEntries: len(c.head.entries), - mint: c.head.mint, - maxt: c.head.maxt, - uncompressedSize: c.head.size, + numEntries: c.head.Entries(), + mint: mint, + maxt: maxt, + uncompressedSize: c.head.UncompressedSize(), }) c.cutBlockSize += len(b) - c.head.clear() + c.head.Reset() return nil } // Bounds implements Chunk. func (c *MemChunk) Bounds() (fromT, toT time.Time) { - var from, to int64 - if len(c.blocks) > 0 { - from = c.blocks[0].mint - to = c.blocks[len(c.blocks)-1].maxt - } + from, to := c.head.Bounds() - if !c.head.isEmpty() { - if from == 0 || from > c.head.mint { - from = c.head.mint + // need to check all the blocks in case they overlap + for _, b := range c.blocks { + if from == 0 || from > b.mint { + from = b.mint } - - if to < c.head.maxt { - to = c.head.maxt + if to < b.maxt { + to = b.maxt } } @@ -692,8 +719,8 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi blockItrs = append(blockItrs, encBlock{c.encoding, b}.Iterator(ctx, pipeline)) } - if !c.head.isEmpty() { - headIterator = c.head.iterator(ctx, direction, mint, maxt, pipeline) + if !c.head.IsEmpty() { + headIterator = c.head.Iterator(ctx, direction, mint, maxt, pipeline) } if direction == logproto.FORWARD { @@ -743,8 +770,8 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time, its = append(its, encBlock{c.encoding, b}.SampleIterator(ctx, extractor)) } - if !c.head.isEmpty() { - its = append(its, c.head.sampleIterator(ctx, mint, maxt, extractor)) + if !c.head.IsEmpty() { + its = append(its, c.head.SampleIterator(ctx, mint, maxt, extractor)) } return iter.NewTimeRangedSampleIterator( @@ -837,8 +864,8 @@ func (b block) MaxTime() int64 { return b.maxt } -func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline) iter.EntryIterator { - if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) { +func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline) iter.EntryIterator { + if hb.IsEmpty() || (maxt < hb.mint || hb.maxt < mint) { return iter.NoopIterator } @@ -895,8 +922,8 @@ func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, return iter.NewStreamsIterator(ctx, streamsResult, direction) } -func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor) iter.SampleIterator { - if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) { +func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor) iter.SampleIterator { + if hb.IsEmpty() || (maxt < hb.mint || hb.maxt < mint) { return iter.NoopIterator } chunkStats := stats.GetChunkData(ctx) diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index ed457315c40c..fb6173f5fca2 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -283,9 +283,9 @@ func TestRoundtripV3(t *testing.T) { require.Nil(t, err) // have to populate then clear the head block or fail comparing against nil vs zero values - err = r.head.append(1, "1") + err = r.head.Append(1, "1") require.Nil(t, err) - r.head.clear() + r.head.Reset() require.Equal(t, c, r) }) @@ -418,7 +418,7 @@ func TestGZIPChunkTargetSize(t *testing.T) { require.NoError(t, chk.Close()) - require.Equal(t, 0, chk.head.size) + require.Equal(t, 0, chk.head.UncompressedSize()) // Even though the seed is static above and results should be deterministic, // we will allow +/- 10% variance @@ -735,7 +735,7 @@ func BenchmarkHeadBlockIterator(b *testing.B) { h := headBlock{} for i := 0; i < j; i++ { - if err := h.append(int64(i), "this is the append string"); err != nil { + if err := h.Append(int64(i), "this is the append string"); err != nil { b.Fatal(err) } } @@ -743,7 +743,7 @@ func BenchmarkHeadBlockIterator(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - iter := h.iterator(context.Background(), logproto.BACKWARD, 0, math.MaxInt64, noopStreamPipeline) + iter := h.Iterator(context.Background(), logproto.BACKWARD, 0, math.MaxInt64, noopStreamPipeline) for iter.Next() { _ = iter.Entry() @@ -759,7 +759,7 @@ func BenchmarkHeadBlockSampleIterator(b *testing.B) { h := headBlock{} for i := 0; i < j; i++ { - if err := h.append(int64(i), "this is the append string"); err != nil { + if err := h.Append(int64(i), "this is the append string"); err != nil { b.Fatal(err) } } @@ -767,7 +767,7 @@ func BenchmarkHeadBlockSampleIterator(b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - iter := h.sampleIterator(context.Background(), 0, math.MaxInt64, countExtractor) + iter := h.SampleIterator(context.Background(), 0, math.MaxInt64, countExtractor) for iter.Next() { _ = iter.Sample() @@ -896,11 +896,11 @@ func TestHeadBlockCheckpointing(t *testing.T) { // ensure blocks are not cut require.Equal(t, 0, len(c.blocks)) - b, err := c.head.CheckpointBytes(c.format, nil) + b, err := c.head.CheckpointBytes(nil) require.Nil(t, err) hb := &headBlock{} - require.Nil(t, hb.FromCheckpoint(b)) + require.Nil(t, hb.LoadBytes(b)) require.Equal(t, c.head, hb) } diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index eed63763517c..924ca0ace93a 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/binary" + "fmt" "io" "math" "time" @@ -21,6 +22,35 @@ import ( var noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{}) +type HeadBlock interface { + IsEmpty() bool + CheckpointTo(w io.Writer) error + CheckpointBytes(b []byte) ([]byte, error) + CheckpointSize() int + LoadBytes(b []byte) error + Serialise(pool WriterPool) ([]byte, error) + Reset() + Bounds() (mint, maxt int64) + Entries() int + UncompressedSize() int + Convert(HeadBlockFmt) (HeadBlock, error) + Append(int64, string) error + Iterator( + ctx context.Context, + direction logproto.Direction, + mint, + maxt int64, + pipeline log.StreamPipeline, + ) iter.EntryIterator + SampleIterator( + ctx context.Context, + mint, + maxt int64, + extractor log.StreamSampleExtractor, + ) iter.SampleIterator + Format() HeadBlockFmt +} + type unorderedHeadBlock struct { // Opted for range tree over skiplist for space reduction. // Inserts: O(log(n)) @@ -38,10 +68,29 @@ func newUnorderedHeadBlock() *unorderedHeadBlock { } } -func (hb *unorderedHeadBlock) isEmpty() bool { +func (hb *unorderedHeadBlock) Format() HeadBlockFmt { return UnorderedHeadBlockFmt } + +func (hb *unorderedHeadBlock) IsEmpty() bool { return hb.size == 0 } +func (hb *unorderedHeadBlock) Bounds() (int64, int64) { + return hb.mint, hb.maxt +} + +func (hb *unorderedHeadBlock) Entries() int { + return hb.lines +} + +func (hb *unorderedHeadBlock) UncompressedSize() int { + return hb.size +} + +func (hb *unorderedHeadBlock) Reset() { + x := newUnorderedHeadBlock() + *hb = *x +} + // collection of entries belonging to the same nanosecond type nsEntries struct { ts int64 @@ -52,7 +101,7 @@ func (e *nsEntries) ValueAtDimension(_ uint64) int64 { return e.ts } -func (hb *unorderedHeadBlock) append(ts int64, line string) { +func (hb *unorderedHeadBlock) Append(ts int64, line string) error { // This is an allocation hack. The rangetree lib does not // support the ability to pass a "mutate" function during an insert // and instead will displace any existing entry at the specified timestamp. @@ -83,6 +132,8 @@ func (hb *unorderedHeadBlock) append(ts int64, line string) { hb.size += len(line) hb.lines++ + + return nil } // Implements rangetree.Interval @@ -104,7 +155,7 @@ func (hb *unorderedHeadBlock) forEntries( maxt int64, entryFn func(int64, string) error, // returning an error exits early ) (err error) { - if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) { + if hb.IsEmpty() || (maxt < hb.mint || hb.maxt < mint) { return } @@ -157,7 +208,7 @@ func (hb *unorderedHeadBlock) forEntries( return nil } -func (hb *unorderedHeadBlock) iterator( +func (hb *unorderedHeadBlock) Iterator( ctx context.Context, direction logproto.Direction, mint, @@ -210,7 +261,7 @@ func (hb *unorderedHeadBlock) iterator( } // nolint:unused -func (hb *unorderedHeadBlock) sampleIterator( +func (hb *unorderedHeadBlock) SampleIterator( ctx context.Context, mint, maxt int64, @@ -267,7 +318,7 @@ func (hb *unorderedHeadBlock) sampleIterator( // nolint:unused // serialise is used in creating an ordered, compressed block from an unorderedHeadBlock -func (hb *unorderedHeadBlock) serialise(pool WriterPool) ([]byte, error) { +func (hb *unorderedHeadBlock) Serialise(pool WriterPool) ([]byte, error) { inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer) defer func() { inBuf.Reset() @@ -306,8 +357,26 @@ func (hb *unorderedHeadBlock) serialise(pool WriterPool) ([]byte, error) { return outBuf.Bytes(), nil } +func (hb *unorderedHeadBlock) Convert(version HeadBlockFmt) (HeadBlock, error) { + if version > OrderedHeadBlockFmt { + return hb, nil + } + out := &headBlock{} + + err := hb.forEntries( + context.Background(), + logproto.FORWARD, + 0, + math.MaxInt64, + func(ts int64, line string) error { + return out.Append(ts, line) + }, + ) + return out, err +} + // CheckpointSize returns the estimated size of the headblock checkpoint. -func (hb *unorderedHeadBlock) CheckpointSize(version byte) int { +func (hb *unorderedHeadBlock) CheckpointSize() int { size := 1 // version size += binary.MaxVarintLen32 * 2 // total entries + total size size += binary.MaxVarintLen64 * 2 // mint,maxt @@ -319,20 +388,20 @@ func (hb *unorderedHeadBlock) CheckpointSize(version byte) int { // CheckpointBytes serializes a headblock to []byte. This is used by the WAL checkpointing, // which does not want to mutate a chunk by cutting it (otherwise risking content address changes), but // needs to serialize/deserialize the data to disk to ensure data durability. -func (hb *unorderedHeadBlock) CheckpointBytes(version byte, b []byte) ([]byte, error) { +func (hb *unorderedHeadBlock) CheckpointBytes(b []byte) ([]byte, error) { buf := bytes.NewBuffer(b[:0]) - err := hb.CheckpointTo(version, buf) + err := hb.CheckpointTo(buf) return buf.Bytes(), err } // CheckpointTo serializes a headblock to a `io.Writer`. see `CheckpointBytes`. -func (hb *unorderedHeadBlock) CheckpointTo(version byte, w io.Writer) error { +func (hb *unorderedHeadBlock) CheckpointTo(w io.Writer) error { eb := EncodeBufferPool.Get().(*encbuf) defer EncodeBufferPool.Put(eb) eb.reset() - eb.putByte(version) + eb.putByte(byte(hb.Format())) _, err := w.Write(eb.get()) if err != nil { return errors.Wrap(err, "write headBlock version") @@ -372,7 +441,7 @@ func (hb *unorderedHeadBlock) CheckpointTo(version byte, w io.Writer) error { return nil } -func (hb *unorderedHeadBlock) FromCheckpoint(b []byte) error { +func (hb *unorderedHeadBlock) LoadBytes(b []byte) error { // ensure it's empty *hb = *newUnorderedHeadBlock() @@ -387,9 +456,9 @@ func (hb *unorderedHeadBlock) FromCheckpoint(b []byte) error { return errors.Wrap(db.err(), "verifying headblock header") } switch version { - case chunkFormatV1, chunkFormatV2, chunkFormatV3: + case UnorderedHeadBlockFmt.Byte(): default: - return errors.Errorf("incompatible headBlock version (%v), only V1,V2,V3 is currently supported", version) + return errors.Errorf("incompatible headBlock version (%v), only V4 is currently supported", version) } n := db.uvarint() @@ -402,7 +471,9 @@ func (hb *unorderedHeadBlock) FromCheckpoint(b []byte) error { ts := db.varint64() lineLn := db.uvarint() line := string(db.bytes(lineLn)) - hb.append(ts, line) + if err := hb.Append(ts, line); err != nil { + return err + } } if err := db.err(); err != nil { @@ -411,3 +482,36 @@ func (hb *unorderedHeadBlock) FromCheckpoint(b []byte) error { return nil } + +// HeadFromCheckpoint handles reading any head block format and returning the desired form. +// This is particularly helpful replaying WALs from different configurations +// such as after enabling unordered writes. +func HeadFromCheckpoint(b []byte, desired HeadBlockFmt) (HeadBlock, error) { + db := decbuf{b: b} + + version := db.byte() + if db.err() != nil { + return nil, errors.Wrap(db.err(), "verifying headblock header") + } + format := HeadBlockFmt(version) + + var decodedBlock HeadBlock + switch { + case format <= OrderedHeadBlockFmt: + decodedBlock = &headBlock{} + case format == UnorderedHeadBlockFmt: + decodedBlock = newUnorderedHeadBlock() + default: + return nil, fmt.Errorf("unexpected head block version: %v", version) + } + + if err := decodedBlock.LoadBytes(b); err != nil { + return nil, err + } + + if decodedBlock.Format() != desired { + return decodedBlock.Convert(desired) + } + return decodedBlock, nil + +} diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index 210c9320851d..a8fe29740ee7 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -30,7 +30,7 @@ func iterEq(t *testing.T, exp []entry, got iter.EntryIterator) { func Test_forEntriesEarlyReturn(t *testing.T) { hb := newUnorderedHeadBlock() for i := 0; i < 10; i++ { - hb.append(int64(i), fmt.Sprint(i)) + require.Nil(t, hb.Append(int64(i), fmt.Sprint(i))) } // forward @@ -143,10 +143,10 @@ func Test_Unordered_InsertRetrieval(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { hb := newUnorderedHeadBlock() for _, e := range tc.input { - hb.append(e.t, e.s) + require.Nil(t, hb.Append(e.t, e.s)) } - itr := hb.iterator( + itr := hb.Iterator( context.Background(), tc.dir, 0, @@ -205,10 +205,10 @@ func Test_UnorderedBoundedIter(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { hb := newUnorderedHeadBlock() for _, e := range tc.input { - hb.append(e.t, e.s) + require.Nil(t, hb.Append(e.t, e.s)) } - itr := hb.iterator( + itr := hb.Iterator( context.Background(), tc.dir, tc.mint, @@ -221,26 +221,44 @@ func Test_UnorderedBoundedIter(t *testing.T) { } } -func Test_UnorderedHeadBlockCheckpointRoundtrip(t *testing.T) { - hb := newUnorderedHeadBlock() - +func TestHeadBlockInterop(t *testing.T) { + unordered, ordered := newUnorderedHeadBlock(), &headBlock{} for i := 0; i < 100; i++ { - hb.append(int64(i), fmt.Sprint(i)) + require.Nil(t, unordered.Append(int64(99-i), fmt.Sprint(99-i))) + require.Nil(t, ordered.Append(int64(i), fmt.Sprint(i))) } // turn to bytes - b, err := hb.CheckpointBytes(DefaultChunkFormat, nil) + b1, err := ordered.CheckpointBytes(nil) + require.Nil(t, err) + b2, err := unordered.CheckpointBytes(nil) require.Nil(t, err) - // restore a copy from bytes - cpy := newUnorderedHeadBlock() - require.Nil(t, cpy.FromCheckpoint(b)) + // Ensure we can recover ordered checkpoint into ordered headblock + recovered, err := HeadFromCheckpoint(b1, OrderedHeadBlockFmt) + require.Nil(t, err) + require.Equal(t, ordered, recovered) - // ensure copy's bytes match original - cpyBytes, err := cpy.CheckpointBytes(DefaultChunkFormat, nil) + // Ensure we can recover ordered checkpoint into unordered headblock + recovered, err = HeadFromCheckpoint(b1, UnorderedHeadBlockFmt) require.Nil(t, err) - require.Equal(t, b, cpyBytes) + require.Equal(t, unordered, recovered) + + // Ensure we can recover unordered checkpoint into ordered headblock + recovered, err = HeadFromCheckpoint(b2, OrderedHeadBlockFmt) + require.Nil(t, err) + require.Equal(t, ordered, recovered) + + // Ensure we can recover unordered checkpoint into unordered headblock + recovered, err = HeadFromCheckpoint(b2, UnorderedHeadBlockFmt) + require.Nil(t, err) + require.Equal(t, unordered, recovered) +} +// ensure backwards compatibility from when chunk format +// and head block format was split +func TestChunkBlockFmt(t *testing.T) { + require.Equal(t, chunkFormatV3, byte(OrderedHeadBlockFmt)) } func BenchmarkHeadBlockWrites(b *testing.B) { @@ -254,14 +272,14 @@ func BenchmarkHeadBlockWrites(b *testing.B) { headBlockFn := func() func(int64, string) { hb := &headBlock{} return func(ts int64, line string) { - _ = hb.append(ts, line) + _ = hb.Append(ts, line) } } unorderedHeadBlockFn := func() func(int64, string) { hb := newUnorderedHeadBlock() return func(ts int64, line string) { - hb.append(ts, line) + _ = hb.Append(ts, line) } }