-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WAL/marshalable chunks #2764
WAL/marshalable chunks #2764
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
package ingester | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/grafana/loki/pkg/chunkenc" | ||
) | ||
|
||
// The passed wireChunks slice is for re-use. | ||
func toWireChunks(descs []*chunkDesc, wireChunks []Chunk) ([]Chunk, error) { | ||
if cap(wireChunks) < len(descs) { | ||
wireChunks = make([]Chunk, len(descs)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if it wouldn't be better to append what is missing. But without benchmark, I can't prove it so we shall see in the future. |
||
} else { | ||
wireChunks = wireChunks[:len(descs)] | ||
} | ||
for i, d := range descs { | ||
from, to := d.chunk.Bounds() | ||
wireChunk := Chunk{ | ||
From: from, | ||
To: to, | ||
Closed: d.closed, | ||
FlushedAt: d.flushed, | ||
} | ||
|
||
slice := wireChunks[i].Data[:0] // try to re-use the memory from last time | ||
if cap(slice) < d.chunk.CompressedSize() { | ||
slice = make([]byte, 0, d.chunk.CompressedSize()) | ||
} | ||
|
||
out, err := d.chunk.BytesWith(slice) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
wireChunk.Data = out | ||
wireChunks[i] = wireChunk | ||
} | ||
return wireChunks, nil | ||
} | ||
|
||
func fromWireChunks(conf *Config, wireChunks []Chunk) ([]*chunkDesc, error) { | ||
descs := make([]*chunkDesc, 0, len(wireChunks)) | ||
for _, c := range wireChunks { | ||
desc := &chunkDesc{ | ||
closed: c.Closed, | ||
flushed: c.FlushedAt, | ||
lastUpdated: time.Now(), | ||
} | ||
|
||
mc, err := chunkenc.NewByteChunk(c.Data, conf.BlockSize, conf.TargetChunkSize) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changing the config of this could be catastrophic for the wal ? I wonder if we should encode this in the chunk header now ? Not for this PR but just raising a point here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was worried about this as well, but don't think it will adversely affect us. We'd simply flush on the next push if moving from a larger to a smaller block or target size. |
||
if err != nil { | ||
return nil, err | ||
} | ||
desc.chunk = mc | ||
|
||
descs = append(descs, desc) | ||
} | ||
return descs, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fun fact we also need this for iterator, to be able to reuse them.