From dc0bd01e354fa653c78ba15d33370e053be5fdd3 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Wed, 4 Nov 2020 09:04:31 -0500 Subject: [PATCH] WAL/marshalable chunks (#2764) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer --- pkg/chunkenc/dumb_chunk.go | 6 + pkg/chunkenc/interface.go | 2 + pkg/chunkenc/memchunk.go | 13 +- pkg/chunkenc/memchunk_test.go | 10 + pkg/ingester/checkpoint.go | 59 ++ pkg/ingester/checkpoint.pb.go | 1035 +++++++++++++++++++++++++++++++++ pkg/ingester/checkpoint.proto | 27 + pkg/ingester/encoding.go | 185 ++++++ pkg/ingester/encoding_test.go | 68 +++ pkg/ingester/transfer.go | 2 +- 10 files changed, 1402 insertions(+), 5 deletions(-) create mode 100644 pkg/ingester/checkpoint.go create mode 100644 pkg/ingester/checkpoint.pb.go create mode 100644 pkg/ingester/checkpoint.proto create mode 100644 pkg/ingester/encoding.go create mode 100644 pkg/ingester/encoding_test.go diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go index ea2086802bc2..31bd475ad174 100644 --- a/pkg/chunkenc/dumb_chunk.go +++ b/pkg/chunkenc/dumb_chunk.go @@ -68,6 +68,8 @@ func (c *dumbChunk) Utilization() float64 { return float64(len(c.entries)) / float64(tmpNumEntries) } +func (c *dumbChunk) Encoding() Encoding { return EncNone } + // Returns an iterator that goes from _most_ recent to _least_ recent (ie, // backwards). func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ labels.Labels, _ logql.Pipeline) (iter.EntryIterator, error) { @@ -103,6 +105,10 @@ func (c *dumbChunk) Bytes() ([]byte, error) { return nil, nil } +func (c *dumbChunk) BytesWith(_ []byte) ([]byte, error) { + return nil, nil +} + func (c *dumbChunk) Blocks(_ time.Time, _ time.Time) []Block { return nil } diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index 74803adc50f3..6f45e12a4197 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -106,11 +106,13 @@ type Chunk interface { Blocks(mintT, maxtT time.Time) []Block Size() int Bytes() ([]byte, error) + BytesWith([]byte) ([]byte, error) // uses provided []byte for buffer instantiation BlockCount() int Utilization() float64 UncompressedSize() int CompressedSize() int Close() error + Encoding() Encoding } // Block is a chunk block. diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index f6c5f793a684..546a79e84f86 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -242,8 +242,8 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) { return bc, nil } -// Bytes implements Chunk. -func (c *MemChunk) Bytes() ([]byte, error) { +// BytesWith uses a provided []byte for buffer instantiation +func (c *MemChunk) BytesWith(b []byte) ([]byte, error) { if c.head != nil { // When generating the bytes, we need to flush the data held in-buffer. if err := c.cut(); err != nil { @@ -252,7 +252,7 @@ func (c *MemChunk) Bytes() ([]byte, error) { } crc32Hash := newCRC32() - buf := bytes.NewBuffer(nil) + buf := bytes.NewBuffer(b[:0]) offset := 0 eb := encbuf{b: make([]byte, 0, 1<<10)} @@ -317,6 +317,11 @@ func (c *MemChunk) Bytes() ([]byte, error) { return buf.Bytes(), nil } +// Bytes implements Chunk. +func (c *MemChunk) Bytes() ([]byte, error) { + return c.BytesWith(nil) +} + // Encoding implements Chunk. func (c *MemChunk) Encoding() Encoding { return c.encoding @@ -368,7 +373,7 @@ func (c *MemChunk) UncompressedSize() int { return size } -// CompressedSize implements Chunk +// CompressedSize implements Chunk. func (c *MemChunk) CompressedSize() int { size := 0 // Better to account for any uncompressed data than ignore it even though this isn't accurate. diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 3b09564e659a..fdba84eb4ae7 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -767,3 +767,13 @@ func TestMemchunkLongLine(t *testing.T) { }) } } + +// Ensure passing a reusable []byte doesn't affect output +func TestBytesWith(t *testing.T) { + exp, err := NewMemChunk(EncNone, testBlockSize, testTargetSize).BytesWith(nil) + require.Nil(t, err) + out, err := NewMemChunk(EncNone, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3}) + require.Nil(t, err) + + require.Equal(t, exp, out) +} diff --git a/pkg/ingester/checkpoint.go b/pkg/ingester/checkpoint.go new file mode 100644 index 000000000000..1a0f79c96297 --- /dev/null +++ b/pkg/ingester/checkpoint.go @@ -0,0 +1,59 @@ +package ingester + +import ( + "time" + + "github.com/grafana/loki/pkg/chunkenc" +) + +// The passed wireChunks slice is for re-use. +func toWireChunks(descs []*chunkDesc, wireChunks []Chunk) ([]Chunk, error) { + if cap(wireChunks) < len(descs) { + wireChunks = make([]Chunk, len(descs)) + } else { + wireChunks = wireChunks[:len(descs)] + } + for i, d := range descs { + from, to := d.chunk.Bounds() + wireChunk := Chunk{ + From: from, + To: to, + Closed: d.closed, + FlushedAt: d.flushed, + } + + slice := wireChunks[i].Data[:0] // try to re-use the memory from last time + if cap(slice) < d.chunk.CompressedSize() { + slice = make([]byte, 0, d.chunk.CompressedSize()) + } + + out, err := d.chunk.BytesWith(slice) + if err != nil { + return nil, err + } + + wireChunk.Data = out + wireChunks[i] = wireChunk + } + return wireChunks, nil +} + +func fromWireChunks(conf *Config, wireChunks []Chunk) ([]*chunkDesc, error) { + descs := make([]*chunkDesc, 0, len(wireChunks)) + for _, c := range wireChunks { + desc := &chunkDesc{ + closed: c.Closed, + flushed: c.FlushedAt, + lastUpdated: time.Now(), + } + + mc, err := chunkenc.NewByteChunk(c.Data, conf.BlockSize, conf.TargetChunkSize) + if err != nil { + return nil, err + } + desc.chunk = mc + + descs = append(descs, desc) + } + return descs, nil +} diff --git a/pkg/ingester/checkpoint.pb.go b/pkg/ingester/checkpoint.pb.go new file mode 100644 index 000000000000..aa0b5356b068 --- /dev/null +++ b/pkg/ingester/checkpoint.pb.go @@ -0,0 +1,1035 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: pkg/ingester/checkpoint.proto + +package ingester + +import ( + bytes "bytes" + fmt "fmt" + _ "github.com/cortexproject/cortex/pkg/ingester/client" + github_com_cortexproject_cortex_pkg_ingester_client "github.com/cortexproject/cortex/pkg/ingester/client" + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + _ "github.com/gogo/protobuf/types" + github_com_gogo_protobuf_types "github.com/gogo/protobuf/types" + io "io" + math "math" + reflect "reflect" + strings "strings" + time "time" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf +var _ = time.Kitchen + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +// Chunk is a {de,}serializable intermediate type for chunkDesc which allows +// efficient loading/unloading to disk during WAL checkpoint recovery. +type Chunk struct { + From time.Time `protobuf:"bytes,1,opt,name=from,proto3,stdtime" json:"from"` + To time.Time `protobuf:"bytes,2,opt,name=to,proto3,stdtime" json:"to"` + FlushedAt time.Time `protobuf:"bytes,3,opt,name=flushedAt,proto3,stdtime" json:"flushedAt"` + Closed bool `protobuf:"varint,4,opt,name=closed,proto3" json:"closed,omitempty"` + Data []byte `protobuf:"bytes,5,opt,name=data,proto3" json:"data,omitempty"` +} + +func (m *Chunk) Reset() { *m = Chunk{} } +func (*Chunk) ProtoMessage() {} +func (*Chunk) Descriptor() ([]byte, []int) { + return fileDescriptor_00f4b7152db9bdb5, []int{0} +} +func (m *Chunk) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Chunk) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Chunk.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Chunk) XXX_Merge(src proto.Message) { + xxx_messageInfo_Chunk.Merge(m, src) +} +func (m *Chunk) XXX_Size() int { + return m.Size() +} +func (m *Chunk) XXX_DiscardUnknown() { + xxx_messageInfo_Chunk.DiscardUnknown(m) +} + +var xxx_messageInfo_Chunk proto.InternalMessageInfo + +func (m *Chunk) GetFrom() time.Time { + if m != nil { + return m.From + } + return time.Time{} +} + +func (m *Chunk) GetTo() time.Time { + if m != nil { + return m.To + } + return time.Time{} +} + +func (m *Chunk) GetFlushedAt() time.Time { + if m != nil { + return m.FlushedAt + } + return time.Time{} +} + +func (m *Chunk) GetClosed() bool { + if m != nil { + return m.Closed + } + return false +} + +func (m *Chunk) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +// Series is a {de,}serializable intermediate type for Series. +type Series struct { + UserID string `protobuf:"bytes,1,opt,name=userID,proto3" json:"userID,omitempty"` + Fingerprint uint64 `protobuf:"varint,2,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` + Labels []github_com_cortexproject_cortex_pkg_ingester_client.LabelAdapter `protobuf:"bytes,3,rep,name=labels,proto3,customtype=github.com/cortexproject/cortex/pkg/ingester/client.LabelAdapter" json:"labels"` + Chunks []Chunk `protobuf:"bytes,4,rep,name=chunks,proto3" json:"chunks"` +} + +func (m *Series) Reset() { *m = Series{} } +func (*Series) ProtoMessage() {} +func (*Series) Descriptor() ([]byte, []int) { + return fileDescriptor_00f4b7152db9bdb5, []int{1} +} +func (m *Series) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Series) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Series.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Series) XXX_Merge(src proto.Message) { + xxx_messageInfo_Series.Merge(m, src) +} +func (m *Series) XXX_Size() int { + return m.Size() +} +func (m *Series) XXX_DiscardUnknown() { + xxx_messageInfo_Series.DiscardUnknown(m) +} + +var xxx_messageInfo_Series proto.InternalMessageInfo + +func (m *Series) GetUserID() string { + if m != nil { + return m.UserID + } + return "" +} + +func (m *Series) GetFingerprint() uint64 { + if m != nil { + return m.Fingerprint + } + return 0 +} + +func (m *Series) GetChunks() []Chunk { + if m != nil { + return m.Chunks + } + return nil +} + +func init() { + proto.RegisterType((*Chunk)(nil), "ingester.Chunk") + proto.RegisterType((*Series)(nil), "ingester.Series") +} + +func init() { proto.RegisterFile("pkg/ingester/checkpoint.proto", fileDescriptor_00f4b7152db9bdb5) } + +var fileDescriptor_00f4b7152db9bdb5 = []byte{ + // 427 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x52, 0xbd, 0x8e, 0xd4, 0x30, + 0x10, 0x8e, 0x77, 0x73, 0xd1, 0x9e, 0x17, 0x09, 0xe1, 0x02, 0x45, 0x2b, 0xe1, 0x44, 0x57, 0xa5, + 0x39, 0x47, 0x3a, 0x28, 0xa8, 0x10, 0x17, 0x28, 0x40, 0xa2, 0x40, 0x81, 0x8a, 0x2e, 0x3f, 0x4e, + 0x62, 0x36, 0x89, 0x23, 0xdb, 0x91, 0x28, 0x79, 0x84, 0x7b, 0x0c, 0x1e, 0xe5, 0xca, 0x2d, 0x4f, + 0x14, 0x07, 0x9b, 0x95, 0x80, 0xf2, 0x1e, 0x01, 0xc5, 0x49, 0xd8, 0xa5, 0xdc, 0xeb, 0xfc, 0xcd, + 0x7c, 0xdf, 0xcc, 0x37, 0x33, 0x86, 0x4f, 0x9a, 0x75, 0xee, 0xb3, 0x3a, 0xa7, 0x52, 0x51, 0xe1, + 0x27, 0x05, 0x4d, 0xd6, 0x0d, 0x67, 0xb5, 0x22, 0x8d, 0xe0, 0x8a, 0xa3, 0xc5, 0x94, 0x5a, 0x39, + 0x39, 0xe7, 0x79, 0x49, 0x7d, 0x1d, 0x8f, 0xdb, 0xcc, 0x57, 0xac, 0xa2, 0x52, 0x45, 0x55, 0x33, + 0x50, 0x57, 0xe7, 0x39, 0x53, 0x45, 0x1b, 0x93, 0x84, 0x57, 0x7e, 0xce, 0x73, 0xbe, 0x67, 0xf6, + 0x48, 0x03, 0xfd, 0x1a, 0xe9, 0x2f, 0x0f, 0xe8, 0x09, 0x17, 0x8a, 0x7e, 0x69, 0x04, 0xff, 0x4c, + 0x13, 0x35, 0x22, 0xff, 0x7f, 0x63, 0x25, 0xa3, 0xf5, 0x94, 0x1a, 0x2a, 0x9c, 0xfd, 0x06, 0xf0, + 0xe4, 0x55, 0xd1, 0xd6, 0x6b, 0xf4, 0x1c, 0x9a, 0x99, 0xe0, 0x95, 0x0d, 0x5c, 0xe0, 0x2d, 0x2f, + 0x56, 0x64, 0xb0, 0x4a, 0x26, 0x03, 0xe4, 0xe3, 0x64, 0x35, 0x58, 0x5c, 0xdf, 0x3a, 0xc6, 0xd5, + 0x0f, 0x07, 0x84, 0x5a, 0x81, 0x9e, 0xc1, 0x99, 0xe2, 0xf6, 0xec, 0x08, 0xdd, 0x4c, 0x71, 0x14, + 0xc0, 0xd3, 0xac, 0x6c, 0x65, 0x41, 0xd3, 0x4b, 0x65, 0xcf, 0x8f, 0x10, 0xef, 0x65, 0xe8, 0x31, + 0xb4, 0x92, 0x92, 0x4b, 0x9a, 0xda, 0xa6, 0x0b, 0xbc, 0x45, 0x38, 0x22, 0x84, 0xa0, 0x99, 0x46, + 0x2a, 0xb2, 0x4f, 0x5c, 0xe0, 0x3d, 0x08, 0xf5, 0xfb, 0xec, 0x17, 0x80, 0xd6, 0x07, 0x2a, 0x18, + 0x95, 0xbd, 0xac, 0x95, 0x54, 0xbc, 0x7d, 0xad, 0x87, 0x3d, 0x0d, 0x47, 0x84, 0x5c, 0xb8, 0xcc, + 0xfa, 0x6d, 0x89, 0x46, 0xb0, 0x5a, 0xe9, 0x89, 0xcc, 0xf0, 0x30, 0x84, 0x24, 0xb4, 0xca, 0x28, + 0xa6, 0xa5, 0xb4, 0xe7, 0xee, 0xdc, 0x5b, 0x5e, 0x3c, 0x22, 0xe3, 0x36, 0xdf, 0xf5, 0xd1, 0xf7, + 0x11, 0x13, 0xc1, 0x9b, 0xde, 0xe8, 0xf7, 0x5b, 0xe7, 0x3e, 0xb7, 0x19, 0xca, 0x5c, 0xa6, 0x51, + 0xa3, 0xa8, 0x08, 0xc7, 0x56, 0xe8, 0x1c, 0x5a, 0x49, 0x7f, 0x22, 0x69, 0x9b, 0xba, 0xe9, 0x43, + 0x32, 0xc9, 0x88, 0x3e, 0x5d, 0x60, 0xf6, 0x2d, 0xc3, 0x91, 0x14, 0xbc, 0xd8, 0x6c, 0xb1, 0x71, + 0xb3, 0xc5, 0xc6, 0xdd, 0x16, 0x83, 0xaf, 0x1d, 0x06, 0xdf, 0x3a, 0x0c, 0xae, 0x3b, 0x0c, 0x36, + 0x1d, 0x06, 0x3f, 0x3b, 0x0c, 0xfe, 0x74, 0xd8, 0xb8, 0xeb, 0x30, 0xb8, 0xda, 0x61, 0x63, 0xb3, + 0xc3, 0xc6, 0xcd, 0x0e, 0x1b, 0x9f, 0xfe, 0x7d, 0xd2, 0xd8, 0xd2, 0xdb, 0x7f, 0xfa, 0x37, 0x00, + 0x00, 0xff, 0xff, 0xaf, 0xc1, 0x8c, 0xad, 0xd6, 0x02, 0x00, 0x00, +} + +func (this *Chunk) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*Chunk) + if !ok { + that2, ok := that.(Chunk) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if !this.From.Equal(that1.From) { + return false + } + if !this.To.Equal(that1.To) { + return false + } + if !this.FlushedAt.Equal(that1.FlushedAt) { + return false + } + if this.Closed != that1.Closed { + return false + } + if !bytes.Equal(this.Data, that1.Data) { + return false + } + return true +} +func (this *Series) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*Series) + if !ok { + that2, ok := that.(Series) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.UserID != that1.UserID { + return false + } + if this.Fingerprint != that1.Fingerprint { + return false + } + if len(this.Labels) != len(that1.Labels) { + return false + } + for i := range this.Labels { + if !this.Labels[i].Equal(that1.Labels[i]) { + return false + } + } + if len(this.Chunks) != len(that1.Chunks) { + return false + } + for i := range this.Chunks { + if !this.Chunks[i].Equal(&that1.Chunks[i]) { + return false + } + } + return true +} +func (this *Chunk) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 9) + s = append(s, "&ingester.Chunk{") + s = append(s, "From: "+fmt.Sprintf("%#v", this.From)+",\n") + s = append(s, "To: "+fmt.Sprintf("%#v", this.To)+",\n") + s = append(s, "FlushedAt: "+fmt.Sprintf("%#v", this.FlushedAt)+",\n") + s = append(s, "Closed: "+fmt.Sprintf("%#v", this.Closed)+",\n") + s = append(s, "Data: "+fmt.Sprintf("%#v", this.Data)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *Series) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 8) + s = append(s, "&ingester.Series{") + s = append(s, "UserID: "+fmt.Sprintf("%#v", this.UserID)+",\n") + s = append(s, "Fingerprint: "+fmt.Sprintf("%#v", this.Fingerprint)+",\n") + s = append(s, "Labels: "+fmt.Sprintf("%#v", this.Labels)+",\n") + if this.Chunks != nil { + vs := make([]*Chunk, len(this.Chunks)) + for i := range vs { + vs[i] = &this.Chunks[i] + } + s = append(s, "Chunks: "+fmt.Sprintf("%#v", vs)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringCheckpoint(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} +func (m *Chunk) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Chunk) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + dAtA[i] = 0xa + i++ + i = encodeVarintCheckpoint(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.From))) + n1, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.From, dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + dAtA[i] = 0x12 + i++ + i = encodeVarintCheckpoint(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.To))) + n2, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.To, dAtA[i:]) + if err != nil { + return 0, err + } + i += n2 + dAtA[i] = 0x1a + i++ + i = encodeVarintCheckpoint(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.FlushedAt))) + n3, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.FlushedAt, dAtA[i:]) + if err != nil { + return 0, err + } + i += n3 + if m.Closed { + dAtA[i] = 0x20 + i++ + if m.Closed { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ + } + if len(m.Data) > 0 { + dAtA[i] = 0x2a + i++ + i = encodeVarintCheckpoint(dAtA, i, uint64(len(m.Data))) + i += copy(dAtA[i:], m.Data) + } + return i, nil +} + +func (m *Series) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Series) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.UserID) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintCheckpoint(dAtA, i, uint64(len(m.UserID))) + i += copy(dAtA[i:], m.UserID) + } + if m.Fingerprint != 0 { + dAtA[i] = 0x10 + i++ + i = encodeVarintCheckpoint(dAtA, i, uint64(m.Fingerprint)) + } + if len(m.Labels) > 0 { + for _, msg := range m.Labels { + dAtA[i] = 0x1a + i++ + i = encodeVarintCheckpoint(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if len(m.Chunks) > 0 { + for _, msg := range m.Chunks { + dAtA[i] = 0x22 + i++ + i = encodeVarintCheckpoint(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func encodeVarintCheckpoint(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *Chunk) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.From) + n += 1 + l + sovCheckpoint(uint64(l)) + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.To) + n += 1 + l + sovCheckpoint(uint64(l)) + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.FlushedAt) + n += 1 + l + sovCheckpoint(uint64(l)) + if m.Closed { + n += 2 + } + l = len(m.Data) + if l > 0 { + n += 1 + l + sovCheckpoint(uint64(l)) + } + return n +} + +func (m *Series) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.UserID) + if l > 0 { + n += 1 + l + sovCheckpoint(uint64(l)) + } + if m.Fingerprint != 0 { + n += 1 + sovCheckpoint(uint64(m.Fingerprint)) + } + if len(m.Labels) > 0 { + for _, e := range m.Labels { + l = e.Size() + n += 1 + l + sovCheckpoint(uint64(l)) + } + } + if len(m.Chunks) > 0 { + for _, e := range m.Chunks { + l = e.Size() + n += 1 + l + sovCheckpoint(uint64(l)) + } + } + return n +} + +func sovCheckpoint(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozCheckpoint(x uint64) (n int) { + return sovCheckpoint(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *Chunk) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Chunk{`, + `From:` + strings.Replace(strings.Replace(this.From.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, + `To:` + strings.Replace(strings.Replace(this.To.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, + `FlushedAt:` + strings.Replace(strings.Replace(this.FlushedAt.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, + `Closed:` + fmt.Sprintf("%v", this.Closed) + `,`, + `Data:` + fmt.Sprintf("%v", this.Data) + `,`, + `}`, + }, "") + return s +} +func (this *Series) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Series{`, + `UserID:` + fmt.Sprintf("%v", this.UserID) + `,`, + `Fingerprint:` + fmt.Sprintf("%v", this.Fingerprint) + `,`, + `Labels:` + fmt.Sprintf("%v", this.Labels) + `,`, + `Chunks:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Chunks), "Chunk", "Chunk", 1), `&`, ``, 1) + `,`, + `}`, + }, "") + return s +} +func valueToStringCheckpoint(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *Chunk) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCheckpoint + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Chunk: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Chunk: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field From", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCheckpoint + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthCheckpoint + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthCheckpoint + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.From, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field To", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCheckpoint + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthCheckpoint + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthCheckpoint + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.To, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field FlushedAt", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCheckpoint + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthCheckpoint + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthCheckpoint + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.FlushedAt, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Closed", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCheckpoint + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Closed = bool(v != 0) + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCheckpoint + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthCheckpoint + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthCheckpoint + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...) + if m.Data == nil { + m.Data = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipCheckpoint(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthCheckpoint + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthCheckpoint + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Series) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCheckpoint + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Series: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Series: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field UserID", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCheckpoint + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCheckpoint + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthCheckpoint + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.UserID = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Fingerprint", wireType) + } + m.Fingerprint = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCheckpoint + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Fingerprint |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCheckpoint + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthCheckpoint + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthCheckpoint + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Labels = append(m.Labels, github_com_cortexproject_cortex_pkg_ingester_client.LabelAdapter{}) + if err := m.Labels[len(m.Labels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Chunks", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCheckpoint + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthCheckpoint + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthCheckpoint + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Chunks = append(m.Chunks, Chunk{}) + if err := m.Chunks[len(m.Chunks)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipCheckpoint(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthCheckpoint + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthCheckpoint + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipCheckpoint(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowCheckpoint + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowCheckpoint + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowCheckpoint + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthCheckpoint + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthCheckpoint + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowCheckpoint + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipCheckpoint(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthCheckpoint + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthCheckpoint = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowCheckpoint = fmt.Errorf("proto: integer overflow") +) diff --git a/pkg/ingester/checkpoint.proto b/pkg/ingester/checkpoint.proto new file mode 100644 index 000000000000..0437746c8e7a --- /dev/null +++ b/pkg/ingester/checkpoint.proto @@ -0,0 +1,27 @@ +syntax = "proto3"; + +package ingester; + +option go_package = "ingester"; + +import "google/protobuf/timestamp.proto"; +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; +import "github.com/cortexproject/cortex/pkg/ingester/client/cortex.proto"; + +// Chunk is a {de,}serializable intermediate type for chunkDesc which allows +// efficient loading/unloading to disk during WAL checkpoint recovery. +message Chunk { + google.protobuf.Timestamp from = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + google.protobuf.Timestamp to = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + google.protobuf.Timestamp flushedAt = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + bool closed = 4; + bytes data = 5; +} + +// Series is a {de,}serializable intermediate type for Series. +message Series { + string userID = 1; + uint64 fingerprint = 2; + repeated cortex.LabelPair labels = 3 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/ingester/client.LabelAdapter"]; + repeated Chunk chunks = 4 [(gogoproto.nullable) = false]; +} diff --git a/pkg/ingester/encoding.go b/pkg/ingester/encoding.go new file mode 100644 index 000000000000..a933caec5a56 --- /dev/null +++ b/pkg/ingester/encoding.go @@ -0,0 +1,185 @@ +package ingester + +import ( + "time" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/tsdb/encoding" + tsdb_record "github.com/prometheus/prometheus/tsdb/record" + + "github.com/grafana/loki/pkg/logproto" +) + +// RecordType represents the type of the WAL/Checkpoint record. +type RecordType byte + +const ( + _ = iota // ignore first value so the zero value doesn't look like a record type. + // WALRecordSeries is the type for the WAL record for series. + WALRecordSeries RecordType = iota + // WALRecordSamples is the type for the WAL record for samples. + WALRecordEntries + // CheckpointRecord is the type for the Checkpoint record based on protos. + CheckpointRecord +) + +// WALRecord is a struct combining the series and samples record. +type WALRecord struct { + UserID string + Series []tsdb_record.RefSeries + RefEntries RefEntries +} + +type RefEntries struct { + Ref uint64 + Entries []logproto.Entry +} + +func (record *WALRecord) encodeSeries(b []byte) []byte { + buf := EncWith(b) + buf.PutByte(byte(WALRecordSeries)) + buf.PutUvarintStr(record.UserID) + + var enc tsdb_record.Encoder + // The 'encoded' already has the type header and userID here, hence re-using + // the remaining part of the slice (i.e. encoded[len(encoded):])) to encode the series. + encoded := buf.Get() + encoded = append(encoded, enc.Series(record.Series, encoded[len(encoded):])...) + + return encoded +} + +func (record *WALRecord) encodeEntries(b []byte) []byte { + buf := EncWith(b) + buf.PutByte(byte(WALRecordEntries)) + buf.PutUvarintStr(record.UserID) + + entries := record.RefEntries.Entries + if len(entries) == 0 { + return buf.Get() + } + + // Only encode the series fingerprint if there are >0 entries. + buf.PutBE64(record.RefEntries.Ref) + + // Store base timestamp and base reference number of first sample. + // All samples encode their timestamp and ref as delta to those. + first := entries[0].Timestamp.UnixNano() + + buf.PutBE64int64(first) + + for _, s := range entries { + buf.PutVarint64(s.Timestamp.UnixNano() - first) + // denote line length + byteLine := []byte(s.Line) + buf.PutUvarint(len(byteLine)) + buf.PutBytes(byteLine) + } + return buf.Get() + +} + +func decodeEntries(b []byte, entries *RefEntries) error { + if len(b) == 0 { + return nil + } + + dec := DecWith(b) + + entries.Ref = dec.Be64() + baseTime := dec.Be64int64() + + for len(dec.B) > 0 && dec.Err() == nil { + dRef := dec.Varint64() + ln := dec.Uvarint() + line := dec.Bytes(ln) + + entries.Entries = append(entries.Entries, logproto.Entry{ + Timestamp: time.Unix(0, baseTime+dRef), + Line: string(line), + }) + } + + if dec.Err() != nil { + return errors.Wrapf(dec.Err(), "decode error after %d entries", len(entries.Entries)) + } + if len(dec.B) > 0 { + return errors.Errorf("unexpected %d bytes left in entry", len(dec.B)) + } + return nil + +} + +func decodeWALRecord(b []byte, walRec *WALRecord) (err error) { + var ( + userID string + dec tsdb_record.Decoder + rSeries []tsdb_record.RefSeries + + decbuf = DecWith(b) + t = RecordType(decbuf.Byte()) + ) + + walRec.Series = walRec.Series[:0] + walRec.RefEntries.Entries = walRec.RefEntries.Entries[:0] + + switch t { + case WALRecordSeries: + userID = decbuf.UvarintStr() + rSeries, err = dec.Series(decbuf.B, walRec.Series) + case WALRecordEntries: + userID = decbuf.UvarintStr() + err = decodeEntries(decbuf.B, &walRec.RefEntries) + default: + return errors.New("unknown record type") + } + + // We reach here only if its a record with type header. + if decbuf.Err() != nil { + return decbuf.Err() + } + + if err != nil { + return err + } + + walRec.UserID = userID + walRec.Series = rSeries + return nil +} + +func EncWith(b []byte) (res Encbuf) { + res.B = b + return res + +} + +// Encbuf extends encoding.Encbuf with support for multi byte encoding +type Encbuf struct { + encoding.Encbuf +} + +func (e *Encbuf) PutBytes(c []byte) { e.B = append(e.B, c...) } + +func DecWith(b []byte) (res Decbuf) { + res.B = b + return res +} + +// Decbuf extends encoding.Decbuf with support for multi byte decoding +type Decbuf struct { + encoding.Decbuf +} + +func (d *Decbuf) Bytes(n int) []byte { + if d.E != nil { + return nil + } + if len(d.B) < n { + d.E = encoding.ErrInvalidSize + return nil + } + x := d.B[:n] + d.B = d.B[n:] + return x +} diff --git a/pkg/ingester/encoding_test.go b/pkg/ingester/encoding_test.go new file mode 100644 index 000000000000..ec8aa010d94f --- /dev/null +++ b/pkg/ingester/encoding_test.go @@ -0,0 +1,68 @@ +package ingester + +import ( + "testing" + "time" + + "github.com/grafana/loki/pkg/logproto" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/stretchr/testify/require" +) + +func Test_Encoding_Series(t *testing.T) { + record := &WALRecord{ + UserID: "123", + Series: []record.RefSeries{ + { + Ref: 456, + Labels: labels.FromMap(map[string]string{ + "foo": "bar", + "bazz": "buzz", + }), + }, + { + Ref: 789, + Labels: labels.FromMap(map[string]string{ + "abc": "123", + "def": "456", + }), + }, + }, + } + + buf := record.encodeSeries(nil) + + var decoded WALRecord + + err := decodeWALRecord(buf, &decoded) + require.Nil(t, err) + require.Equal(t, record, &decoded) +} + +func Test_Encoding_Entries(t *testing.T) { + record := &WALRecord{ + UserID: "123", + RefEntries: RefEntries{ + Ref: 456, + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(1000, 0), + Line: "first", + }, + { + Timestamp: time.Unix(2000, 0), + Line: "second", + }, + }, + }, + } + + buf := record.encodeEntries(nil) + + var decoded WALRecord + + err := decodeWALRecord(buf, &decoded) + require.Nil(t, err) + require.Equal(t, record, &decoded) +} diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 94ce69aa6e5a..fc67087d2fc6 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -244,7 +244,7 @@ func (i *Ingester) transferOut(ctx context.Context) error { return err } - chunks := make([]*logproto.Chunk, 1) + chunks := make([]*logproto.Chunk, 1, 1) chunks[0] = &logproto.Chunk{ Data: bb, }