Skip to content

Commit

Permalink
allow configuring a target chunk size in compressed bytes (#1406)
Browse files Browse the repository at this point in the history
* allow configuring a target chunk size in compressed bytes

Signed-off-by: Edward Welch <edward.welch@grafana.com>

* addressing feedback

Signed-off-by: Edward Welch <edward.welch@grafana.com>
  • Loading branch information
slim-bean authored Dec 12, 2019
1 parent 7bb77ec commit 5aa82ba
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 37 deletions.
10 changes: 9 additions & 1 deletion docs/configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,16 @@ The `ingester_config` block configures Ingesters.
# period as long as they receieve no further activity.
[chunk_idle_period: <duration> | default = 30m]
# The maximum size in bytes a chunk can be before it should be flushed.
# The targeted _uncompressed_ size in bytes of a chunk block
# When this threshold is exceeded the head block will be cut and compressed inside the chunk
[chunk_block_size: <int> | default = 262144]
# A target _compressed_ size in bytes for chunks.
# This is a desired size not an exact size, chunks may be slightly bigger
# or significantly smaller if they get flushed for other reasons (e.g. chunk_idle_period)
# The default value of 0 for this will create chunks with a fixed 10 blocks,
# A non zero value will create chunks with a variable number of blocks to meet the target size.
[chunk_target_size: <int> | default = 0]
```

### lifecycler_config
Expand Down
13 changes: 13 additions & 0 deletions pkg/chunkenc/dumb_chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func (c *dumbChunk) UncompressedSize() int {
return c.Size()
}

// CompressedSize implements Chunk.
func (c *dumbChunk) CompressedSize() int {
return 0
}

// Utilization implements Chunk
func (c *dumbChunk) Utilization() float64 {
return float64(len(c.entries)) / float64(tmpNumEntries)
Expand Down Expand Up @@ -91,6 +96,14 @@ func (c *dumbChunk) Bytes() ([]byte, error) {
return nil, nil
}

func (c *dumbChunk) Blocks() int {
return 0
}

func (c *dumbChunk) Close() error {
return nil
}

type dumbChunkIterator struct {
direction logproto.Direction
i int
Expand Down
50 changes: 42 additions & 8 deletions pkg/chunkenc/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (
"io"
"time"

"github.com/pkg/errors"

"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/pkg/errors"
)

const blocksPerChunk = 10
Expand Down Expand Up @@ -43,9 +44,13 @@ func newCRC32() hash.Hash32 {
type MemChunk struct {
// The number of uncompressed bytes per block.
blockSize int
// Target size in compressed bytes
targetSize int

// The finished blocks.
blocks []block
// The compressed size of all the blocks
cutBlockSize int

// Current in-mem block being appended to.
head *headBlock
Expand Down Expand Up @@ -129,10 +134,11 @@ type entry struct {

// NewMemChunkSize returns a new in-mem chunk.
// Mainly for config push size.
func NewMemChunkSize(enc Encoding, blockSize int) *MemChunk {
func NewMemChunkSize(enc Encoding, blockSize, targetSize int) *MemChunk {
c := &MemChunk{
blockSize: blockSize, // The blockSize in bytes.
blocks: []block{},
blockSize: blockSize, // The blockSize in bytes.
targetSize: targetSize, // Desired chunk size in compressed bytes
blocks: []block{},

head: &headBlock{},

Expand All @@ -151,7 +157,7 @@ func NewMemChunkSize(enc Encoding, blockSize int) *MemChunk {

// NewMemChunk returns a new in-mem chunk for query.
func NewMemChunk(enc Encoding) *MemChunk {
return NewMemChunkSize(enc, 256*1024)
return NewMemChunkSize(enc, 256*1024, 0)
}

// NewByteChunk returns a MemChunk on the passed bytes.
Expand Down Expand Up @@ -309,8 +315,20 @@ func (c *MemChunk) Size() int {
return ne
}

// Blocks implements Chunk.
func (c *MemChunk) Blocks() int {
return len(c.blocks)
}

// SpaceFor implements Chunk.
func (c *MemChunk) SpaceFor(*logproto.Entry) bool {
func (c *MemChunk) SpaceFor(e *logproto.Entry) bool {
if c.targetSize > 0 {
// This is looking to see if the uncompressed lines will fit which is not
// a great check, but it will guarantee we are always under the target size
newHBSize := c.head.size + len(e.Line)
return (c.cutBlockSize + newHBSize) < c.targetSize
}
// if targetSize is not defined, default to the original behavior of fixed blocks per chunk
return len(c.blocks) < blocksPerChunk
}

Expand All @@ -329,11 +347,25 @@ func (c *MemChunk) UncompressedSize() int {
return size
}

// Utilization implements Chunk. It is the bytes used as a percentage of the
// CompressedSize implements Chunk
func (c *MemChunk) CompressedSize() int {
size := 0
// Better to account for any uncompressed data than ignore it even though this isn't accurate.
if !c.head.isEmpty() {
size += c.head.size
}
size += c.cutBlockSize
return size
}

// Utilization implements Chunk.
func (c *MemChunk) Utilization() float64 {
if c.targetSize != 0 {
return float64(c.CompressedSize()) / float64(c.targetSize)
}
size := c.UncompressedSize()

return float64(size) / float64(blocksPerChunk*c.blockSize)

}

// Append implements Chunk.
Expand Down Expand Up @@ -382,6 +414,8 @@ func (c *MemChunk) cut() error {
uncompressedSize: c.head.size,
})

c.cutBlockSize += len(b)

c.head.entries = c.head.entries[:0]
c.head.mint = 0 // Will be set on first append.
c.head.size = 0
Expand Down
54 changes: 53 additions & 1 deletion pkg/chunkenc/gzip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import (

"github.com/stretchr/testify/assert"

"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/logproto"
)

func TestGZIPBlock(t *testing.T) {
Expand Down Expand Up @@ -175,6 +176,57 @@ func TestGZIPChunkFilling(t *testing.T) {
require.Equal(t, int64(lines), i)
}

func TestGZIPChunkTargetSize(t *testing.T) {
targetSize := 1024 * 1024
chk := NewMemChunkSize(EncGZIP, 1024, targetSize)

lineSize := 512
entry := &logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: "",
}

// Use a random number to generate random log data, otherwise the gzip compression is way too good
// and the following loop has to run waaayyyyy to many times
// Using the same seed should guarantee the same random numbers and same test data.
r := rand.New(rand.NewSource(99))

i := int64(0)

for ; chk.SpaceFor(entry) && i < 5000; i++ {
logLine := make([]byte, lineSize)
for j := range logLine {
logLine[j] = byte(r.Int())
}
entry = &logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: string(logLine),
}
entry.Timestamp = time.Unix(0, i)
require.NoError(t, chk.Append(entry))
}

// 5000 is a limit ot make sure the test doesn't run away, we shouldn't need this many log lines to make 1MB chunk
require.NotEqual(t, 5000, i)

require.NoError(t, chk.Close())

require.Equal(t, 0, chk.head.size)

// Even though the seed is static above and results should be deterministic,
// we will allow +/- 10% variance
minSize := int(float64(targetSize) * 0.9)
maxSize := int(float64(targetSize) * 1.1)
require.Greater(t, chk.CompressedSize(), minSize)
require.Less(t, chk.CompressedSize(), maxSize)

// Also verify our utilization is close to 1.0
ut := chk.Utilization()
require.Greater(t, ut, 0.99)
require.Less(t, ut, 1.01)

}

func TestMemChunk_AppendOutOfOrder(t *testing.T) {
t.Parallel()

Expand Down
3 changes: 3 additions & 0 deletions pkg/chunkenc/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,11 @@ type Chunk interface {
Iterator(from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error)
Size() int
Bytes() ([]byte, error)
Blocks() int
Utilization() float64
UncompressedSize() int
CompressedSize() int
Close() error
}

// CompressionWriter is the writer that compresses the data passed to it.
Expand Down
4 changes: 3 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Config struct {
RetainPeriod time.Duration `yaml:"chunk_retain_period"`
MaxChunkIdle time.Duration `yaml:"chunk_idle_period"`
BlockSize int `yaml:"chunk_block_size"`
TargetChunkSize int `yaml:"chunk_target_size"`

// For testing, you can override the address and ID of this ingester.
ingesterClientFactory func(cfg client.Config, addr string) (grpc_health_v1.HealthClient, error)
Expand All @@ -63,6 +64,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.RetainPeriod, "ingester.chunks-retain-period", 15*time.Minute, "")
f.DurationVar(&cfg.MaxChunkIdle, "ingester.chunks-idle-period", 30*time.Minute, "")
f.IntVar(&cfg.BlockSize, "ingester.chunks-block-size", 256*1024, "")
f.IntVar(&cfg.TargetChunkSize, "ingester.chunk-target-size", 0, "")
}

// Ingester builds chunks for incoming log streams.
Expand Down Expand Up @@ -189,7 +191,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
defer i.instancesMtx.Unlock()
inst, ok = i.instances[instanceID]
if !ok {
inst = newInstance(instanceID, i.cfg.BlockSize, i.limits)
inst = newInstance(instanceID, i.cfg.BlockSize, i.cfg.TargetChunkSize, i.limits)
i.instances[instanceID] = inst
}
return inst
Expand Down
20 changes: 11 additions & 9 deletions pkg/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,15 @@ type instance struct {
streamsCreatedTotal prometheus.Counter
streamsRemovedTotal prometheus.Counter

blockSize int
tailers map[uint32]*tailer
tailerMtx sync.RWMutex
blockSize int
targetChunkSize int // Compressed bytes
tailers map[uint32]*tailer
tailerMtx sync.RWMutex

limits *validation.Overrides
}

func newInstance(instanceID string, blockSize int, limits *validation.Overrides) *instance {
func newInstance(instanceID string, blockSize, targetChunkSize int, limits *validation.Overrides) *instance {
i := &instance{
streams: map[model.Fingerprint]*stream{},
index: index.New(),
Expand All @@ -76,9 +77,10 @@ func newInstance(instanceID string, blockSize int, limits *validation.Overrides)
streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID),
streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID),

blockSize: blockSize,
tailers: map[uint32]*tailer{},
limits: limits,
blockSize: blockSize,
targetChunkSize: targetChunkSize,
tailers: map[uint32]*tailer{},
limits: limits,
}
i.mapper = newFPMapper(i.getLabelsFromFingerprint)
return i
Expand All @@ -96,7 +98,7 @@ func (i *instance) consumeChunk(ctx context.Context, labels []client.LabelAdapte
stream, ok := i.streams[fp]
if !ok {
sortedLabels := i.index.Add(labels, fp)
stream = newStream(fp, sortedLabels, i.blockSize)
stream = newStream(fp, sortedLabels, i.blockSize, i.targetChunkSize)
i.streams[fp] = stream
i.streamsCreatedTotal.Inc()
memoryStreams.Inc()
Expand Down Expand Up @@ -154,7 +156,7 @@ func (i *instance) getOrCreateStream(labels []client.LabelAdapter) (*stream, err
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "per-user streams limit (%d) exceeded", i.limits.MaxStreamsPerUser(i.instanceID))
}
sortedLabels := i.index.Add(labels, fp)
stream = newStream(fp, sortedLabels, i.blockSize)
stream = newStream(fp, sortedLabels, i.blockSize, i.targetChunkSize)
i.streams[fp] = stream
memoryStreams.Inc()
i.streamsCreatedTotal.Inc()
Expand Down
10 changes: 6 additions & 4 deletions pkg/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ import (
"testing"
"time"

"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/prometheus/pkg/labels"

"github.com/grafana/loki/pkg/util/validation"
"github.com/grafana/loki/pkg/logproto"

"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/util/validation"
)

func TestLabelsCollisions(t *testing.T) {
o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000})
require.NoError(t, err)

i := newInstance("test", 512, o)
i := newInstance("test", 512, 0, o)

// avoid entries from the future.
tt := time.Now().Add(-5 * time.Minute)
Expand All @@ -45,7 +47,7 @@ func TestConcurrentPushes(t *testing.T) {
o, err := validation.NewOverrides(validation.Limits{MaxStreamsPerUser: 1000})
require.NoError(t, err)

inst := newInstance("test", 512, o)
inst := newInstance("test", 512, 0, o)

const (
concurrent = 10
Expand Down
Loading

0 comments on commit 5aa82ba

Please sign in to comment.