diff --git a/pkg/pattern/iter/batch_test.go b/pkg/pattern/iter/batch_test.go index a4457b95e7f97..9a4fc5a113c5e 100644 --- a/pkg/pattern/iter/batch_test.go +++ b/pkg/pattern/iter/batch_test.go @@ -1,6 +1,7 @@ package iter import ( + "sort" "testing" "github.com/go-kit/log" @@ -211,6 +212,12 @@ func TestReadMetricsBatch(t *testing.T) { it := NewSumMergeSampleIterator(tt.seriesIter) got, err := ReadMetricsBatch(it, tt.batchSize, log.NewNopLogger()) require.NoError(t, err) + sort.Slice(tt.expected.Series, func(i, j int) bool { + return tt.expected.Series[i].Labels < tt.expected.Series[j].Labels + }) + sort.Slice(got.Series, func(i, j int) bool { + return got.Series[i].Labels < got.Series[j].Labels + }) require.Equal(t, tt.expected.Series, got.Series) }) } diff --git a/pkg/storage/wal/chunks/chunks_test.go b/pkg/storage/wal/chunks/chunks_test.go index d27f7330c57c7..c8b8431977412 100644 --- a/pkg/storage/wal/chunks/chunks_test.go +++ b/pkg/storage/wal/chunks/chunks_test.go @@ -1,11 +1,8 @@ package chunks import ( - "bufio" "bytes" "fmt" - "os" - "path/filepath" "strconv" "testing" "time" @@ -13,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/storage/wal/testdata" ) func TestChunkReaderWriter(t *testing.T) { @@ -121,11 +119,11 @@ func TestChunkReaderWriter(t *testing.T) { } func TestChunkReaderWriterWithLogGenerator(t *testing.T) { - filenames := testDataFile() + filenames := testdata.Files() for _, filename := range filenames { t.Run(filename, func(t *testing.T) { - gen := newLogGenerator(t, filename) + gen := testdata.NewLogGenerator(t, filename) defer gen.Close() var entries []*logproto.Entry @@ -196,10 +194,10 @@ var ( // Benchmark reads with log generator func BenchmarkReadChunkWithLogGenerator(b *testing.B) { - filenames := testDataFile() + filenames := testdata.Files() for _, filename := range filenames { b.Run(filename, func(b *testing.B) { - gen := newLogGenerator(b, filename) + gen := testdata.NewLogGenerator(b, filename) defer gen.Close() var entries []*logproto.Entry @@ -239,12 +237,12 @@ func BenchmarkReadChunkWithLogGenerator(b *testing.B) { // Benchmark with log generator func BenchmarkWriteChunkWithLogGenerator(b *testing.B) { - filenames := testDataFile() + filenames := testdata.Files() for _, filename := range filenames { for _, count := range []int{1000, 10000, 100000} { b.Run(fmt.Sprintf("%s-%d", filename, count), func(b *testing.B) { - gen := newLogGenerator(b, filename) + gen := testdata.NewLogGenerator(b, filename) defer gen.Close() var entries []*logproto.Entry @@ -278,24 +276,6 @@ func BenchmarkWriteChunkWithLogGenerator(b *testing.B) { } } -func testDataFile() []string { - testdataDir := "../testdata" - files, err := os.ReadDir(testdataDir) - if err != nil { - panic(err) - } - - var fileNames []string - for _, file := range files { - if !file.IsDir() { - filePath := filepath.Join(testdataDir, file.Name()) - fileNames = append(fileNames, filePath) - } - } - - return fileNames -} - // generateLogEntries generates a slice of logproto.Entry with the given count. func generateLogEntries(count int) []*logproto.Entry { entries := make([]*logproto.Entry, count) @@ -307,39 +287,3 @@ func generateLogEntries(count int) []*logproto.Entry { } return entries } - -type logGenerator struct { - f *os.File - s *bufio.Scanner -} - -func (g *logGenerator) Next() (bool, []byte) { - if g.s.Scan() { - return true, g.s.Bytes() - } - g.reset() - return g.s.Scan(), g.s.Bytes() -} - -func (g *logGenerator) Close() { - if g.f != nil { - g.f.Close() - } - g.f = nil -} - -func (g *logGenerator) reset() { - _, _ = g.f.Seek(0, 0) - g.s = bufio.NewScanner(g.f) -} - -func newLogGenerator(t testing.TB, filename string) *logGenerator { - t.Helper() - file, err := os.Open(filename) - require.NoError(t, err) - - return &logGenerator{ - f: file, - s: bufio.NewScanner(file), - } -} diff --git a/pkg/storage/wal/segment.go b/pkg/storage/wal/segment.go index 68b9625cb0c30..4c1d134fe2ca4 100644 --- a/pkg/storage/wal/segment.go +++ b/pkg/storage/wal/segment.go @@ -37,8 +37,9 @@ type streamID struct { } type SegmentWriter struct { - streams *swiss.Map[streamID, *streamSegment] - buf1 encoding.Encbuf + streams *swiss.Map[streamID, *streamSegment] + buf1 encoding.Encbuf + inputSize int64 } type streamSegment struct { @@ -61,6 +62,9 @@ func (b *SegmentWriter) Append(tenantID, labelsString string, lbls labels.Labels if len(entries) == 0 { return } + for _, e := range entries { + b.inputSize += int64(len(e.Line)) + } id := streamID{labels: labelsString, tenant: tenantID} s, ok := b.streams.Get(id) if !ok { @@ -224,6 +228,13 @@ func (s *streamSegment) WriteTo(w io.Writer) (n int64, err error) { func (b *SegmentWriter) Reset() { b.streams.Clear() b.buf1.Reset() + b.inputSize = 0 +} + +// InputSize returns the total size of the input data written to the writer. +// It doesn't account for timestamps and labels. +func (b *SegmentWriter) InputSize() int64 { + return b.inputSize } type SegmentReader struct { @@ -332,3 +343,23 @@ func (r *SegmentReader) Series(ctx context.Context) (*SeriesIter, error) { return NewSeriesIter(r.idr, ps, r.b), nil } + +type Sizes struct { + Index int64 + Series []int64 +} + +func (r *SegmentReader) Sizes() (Sizes, error) { + var sizes Sizes + sizes.Index = r.idr.Size() + it, err := r.Series(context.Background()) + if err != nil { + return sizes, err + } + sizes.Series = []int64{} + for it.Next() { + _, size := it.chunksMeta[0].Ref.Unpack() + sizes.Series = append(sizes.Series, int64(size)) + } + return sizes, err +} diff --git a/pkg/storage/wal/segment_test.go b/pkg/storage/wal/segment_test.go index fd5fa47a3db59..f1755c975abb6 100644 --- a/pkg/storage/wal/segment_test.go +++ b/pkg/storage/wal/segment_test.go @@ -8,12 +8,14 @@ import ( "testing" "time" + "github.com/dustin/go-humanize" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb" + "github.com/grafana/loki/v3/pkg/storage/wal/testdata" "github.com/grafana/loki/pkg/push" ) @@ -186,3 +188,68 @@ func TestMultiTenantWrite(t *testing.T) { require.NoError(t, iter.Err()) require.ElementsMatch(t, expectedSeries, actualSeries) } + +func TestCompression(t *testing.T) { + size := []int64{250 * 1024, 500 * 1024, 750 * 1024, 1 << 20, 2 << 20, 5 << 20, 10 << 20, 20 << 20, 50 << 20, 100 << 20} + for _, s := range size { + t.Run(fmt.Sprintf("size %.2f", float64(s)/(1024*1024)), func(t *testing.T) { + testCompression(t, s) + }) + } +} + +func testCompression(t *testing.T, maxInputSize int64) { + w := NewWalSegmentWriter() + dst := bytes.NewBuffer(nil) + files := testdata.Files() + lbls := []labels.Labels{} + generators := []*testdata.LogGenerator{} + + for _, file := range files { + lbls = append(lbls, labels.FromStrings("filename", file, "namespace", "dev")) + lbls = append(lbls, labels.FromStrings("filename", file, "namespace", "prod")) + g := testdata.NewLogGenerator(t, file) + generators = append(generators, g, g) + } + inputSize := int64(0) + for inputSize < maxInputSize { + for i, lbl := range lbls { + more, line := generators[i].Next() + if !more { + continue + } + inputSize += int64(len(line)) + w.Append("tenant", lbl.String(), lbl, []*push.Entry{ + {Timestamp: time.Unix(0, int64(i*1e9)), Line: string(line)}, + }) + } + } + + require.Equal(t, inputSize, w.InputSize()) + + now := time.Now() + n, err := w.WriteTo(dst) + require.NoError(t, err) + require.True(t, n > 0) + compressionTime := time.Since(now) + + r, err := NewReader(dst.Bytes()) + require.NoError(t, err) + inputSizeMB := float64(w.InputSize()) / (1024 * 1024) + outputSizeMB := float64(dst.Len()) / (1024 * 1024) + compressionRatio := (1 - (outputSizeMB / inputSizeMB)) * 100 + + t.Logf("Input Size: %s\n", humanize.Bytes(uint64(w.InputSize()))) + t.Logf("Output Size: %s\n", humanize.Bytes(uint64(dst.Len()))) + t.Logf("Compression Ratio: %.2f%%\n", compressionRatio) + t.Logf("Write time: %s\n", compressionTime) + sizes, err := r.Sizes() + require.NoError(t, err) + t.Logf("Total chunks %d\n", len(sizes.Series)) + t.Logf("Index size %s\n", humanize.Bytes(uint64(sizes.Index))) + sizesString := "" + for _, size := range sizes.Series { + sizesString += humanize.Bytes(uint64(size)) + ", " + } + t.Logf("Series sizes: [%s]\n", sizesString) +} diff --git a/pkg/storage/wal/testdata/generator.go b/pkg/storage/wal/testdata/generator.go new file mode 100644 index 0000000000000..130ab8741f83d --- /dev/null +++ b/pkg/storage/wal/testdata/generator.go @@ -0,0 +1,71 @@ +package testdata + +import ( + "bufio" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +type LogGenerator struct { + f *os.File + s *bufio.Scanner +} + +func (g *LogGenerator) Next() (bool, []byte) { + if g.s.Scan() { + return true, g.s.Bytes() + } + g.reset() + return g.s.Scan(), g.s.Bytes() +} + +func (g *LogGenerator) Close() { + if g.f != nil { + g.f.Close() + } + g.f = nil +} + +func (g *LogGenerator) reset() { + _, _ = g.f.Seek(0, 0) + g.s = bufio.NewScanner(g.f) +} + +func NewLogGenerator(t testing.TB, filename string) *LogGenerator { + t.Helper() + file, err := os.Open(filename) + require.NoError(t, err) + + return &LogGenerator{ + f: file, + s: bufio.NewScanner(file), + } +} + +func Files() []string { + testdataDir := "./testdata" + files, err := os.ReadDir(testdataDir) + if err != nil && !os.IsNotExist(err) { + if !os.IsNotExist(err) { + panic(err) + } + testdataDir = "../testdata" + files, err = os.ReadDir(testdataDir) + if err != nil { + panic(err) + } + } + + var fileNames []string + for _, file := range files { + if !file.IsDir() { + filePath := filepath.Join(testdataDir, file.Name()) + fileNames = append(fileNames, filePath) + } + } + + return fileNames +}