Skip to content

Commit

Permalink
WAL/marshalable chunks (#2764)
Browse files Browse the repository at this point in the history
* marshalable chunks

* wal record types custom serialization

* proto types for wal checkpoints

* byteswith output unaffected by buffer
  • Loading branch information
owen-d authored Nov 4, 2020
1 parent f3ae1f6 commit dc0bd01
Show file tree
Hide file tree
Showing 10 changed files with 1,402 additions and 5 deletions.
6 changes: 6 additions & 0 deletions pkg/chunkenc/dumb_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func (c *dumbChunk) Utilization() float64 {
return float64(len(c.entries)) / float64(tmpNumEntries)
}

func (c *dumbChunk) Encoding() Encoding { return EncNone }

// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ labels.Labels, _ logql.Pipeline) (iter.EntryIterator, error) {
Expand Down Expand Up @@ -103,6 +105,10 @@ func (c *dumbChunk) Bytes() ([]byte, error) {
return nil, nil
}

func (c *dumbChunk) BytesWith(_ []byte) ([]byte, error) {
return nil, nil
}

func (c *dumbChunk) Blocks(_ time.Time, _ time.Time) []Block {
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,13 @@ type Chunk interface {
Blocks(mintT, maxtT time.Time) []Block
Size() int
Bytes() ([]byte, error)
BytesWith([]byte) ([]byte, error) // uses provided []byte for buffer instantiation
BlockCount() int
Utilization() float64
UncompressedSize() int
CompressedSize() int
Close() error
Encoding() Encoding
}

// Block is a chunk block.
Expand Down
13 changes: 9 additions & 4 deletions pkg/chunkenc/memchunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) {
return bc, nil
}

// Bytes implements Chunk.
func (c *MemChunk) Bytes() ([]byte, error) {
// BytesWith uses a provided []byte for buffer instantiation
func (c *MemChunk) BytesWith(b []byte) ([]byte, error) {
if c.head != nil {
// When generating the bytes, we need to flush the data held in-buffer.
if err := c.cut(); err != nil {
Expand All @@ -252,7 +252,7 @@ func (c *MemChunk) Bytes() ([]byte, error) {
}
crc32Hash := newCRC32()

buf := bytes.NewBuffer(nil)
buf := bytes.NewBuffer(b[:0])
offset := 0

eb := encbuf{b: make([]byte, 0, 1<<10)}
Expand Down Expand Up @@ -317,6 +317,11 @@ func (c *MemChunk) Bytes() ([]byte, error) {
return buf.Bytes(), nil
}

// Bytes implements Chunk.
func (c *MemChunk) Bytes() ([]byte, error) {
return c.BytesWith(nil)
}

// Encoding implements Chunk.
func (c *MemChunk) Encoding() Encoding {
return c.encoding
Expand Down Expand Up @@ -368,7 +373,7 @@ func (c *MemChunk) UncompressedSize() int {
return size
}

// CompressedSize implements Chunk
// CompressedSize implements Chunk.
func (c *MemChunk) CompressedSize() int {
size := 0
// Better to account for any uncompressed data than ignore it even though this isn't accurate.
Expand Down
10 changes: 10 additions & 0 deletions pkg/chunkenc/memchunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,3 +767,13 @@ func TestMemchunkLongLine(t *testing.T) {
})
}
}

// Ensure passing a reusable []byte doesn't affect output
func TestBytesWith(t *testing.T) {
exp, err := NewMemChunk(EncNone, testBlockSize, testTargetSize).BytesWith(nil)
require.Nil(t, err)
out, err := NewMemChunk(EncNone, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3})
require.Nil(t, err)

require.Equal(t, exp, out)
}
59 changes: 59 additions & 0 deletions pkg/ingester/checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package ingester

import (
"time"

"github.com/grafana/loki/pkg/chunkenc"
)

// The passed wireChunks slice is for re-use.
func toWireChunks(descs []*chunkDesc, wireChunks []Chunk) ([]Chunk, error) {
if cap(wireChunks) < len(descs) {
wireChunks = make([]Chunk, len(descs))
} else {
wireChunks = wireChunks[:len(descs)]
}
for i, d := range descs {
from, to := d.chunk.Bounds()
wireChunk := Chunk{
From: from,
To: to,
Closed: d.closed,
FlushedAt: d.flushed,
}

slice := wireChunks[i].Data[:0] // try to re-use the memory from last time
if cap(slice) < d.chunk.CompressedSize() {
slice = make([]byte, 0, d.chunk.CompressedSize())
}

out, err := d.chunk.BytesWith(slice)
if err != nil {
return nil, err
}

wireChunk.Data = out
wireChunks[i] = wireChunk
}
return wireChunks, nil
}

func fromWireChunks(conf *Config, wireChunks []Chunk) ([]*chunkDesc, error) {
descs := make([]*chunkDesc, 0, len(wireChunks))
for _, c := range wireChunks {
desc := &chunkDesc{
closed: c.Closed,
flushed: c.FlushedAt,
lastUpdated: time.Now(),
}

mc, err := chunkenc.NewByteChunk(c.Data, conf.BlockSize, conf.TargetChunkSize)
if err != nil {
return nil, err
}
desc.chunk = mc

descs = append(descs, desc)
}
return descs, nil
}
Loading

0 comments on commit dc0bd01

Please sign in to comment.