Skip to content

Commit

Permalink
feat: implement extra compressed chunks
Browse files Browse the repository at this point in the history
This allows to keep more data in the circular buffer by compressing old
chunks and keeping them around, so the overall memory consumption is
smaller.

Change is backwards compatible - by default compression is not used.

Tests were refactored to use regular *testing.T.

Added benchmarks for read and write operations.

Writes are allocation-free, even when using compression (compression
buffer is re-used):

```
goos: linux
goarch: amd64
pkg: github.com/siderolabs/go-circular
cpu: AMD Ryzen 9 5950X 16-Core Processor
BenchmarkWrite/defaults-32         	 7848931	       150.5 ns/op	       0 B/op	       0 allocs/op
BenchmarkWrite/growing_buffer-32   	 7116709	       144.3 ns/op	       0 B/op	       0 allocs/op
BenchmarkWrite/compression-32      	 3042698	       399.2 ns/op	       0 B/op	       0 allocs/op
```

Reads are not allocation-free, but each Reader has fixed allocation cost
(decompression buffer is re-used):

```
goos: linux
goarch: amd64
pkg: github.com/siderolabs/go-circular
cpu: AMD Ryzen 9 5950X 16-Core Processor
BenchmarkRead/defaults-32         	   99565	     12682 ns/op	      80 B/op	       1 allocs/op
BenchmarkRead/growing_buffer-32   	   88371	     12327 ns/op	      80 B/op	       1 allocs/op
BenchmarkRead/compression-32      	    3193	    372417 ns/op	  139425 B/op	       3 allocs/op
```

Co-authored-by: Utku Ozdemir <utku.ozdemir@siderolabs.com>
Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
  • Loading branch information
smira and utkuozdemir committed May 8, 2024
1 parent 835f04c commit 3c48c53
Show file tree
Hide file tree
Showing 16 changed files with 1,205 additions and 442 deletions.
5 changes: 3 additions & 2 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2024-04-30T10:38:23Z by kres ebc009d.
# Generated on 2024-05-08T13:07:24Z by kres 1e986af.

*
!zstd
!chunk.go
!circular.go
!circular_test.go
!errors.go
!options.go
!reader.go
!streaming.go
!go.mod
!go.sum
!.golangci.yml
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2024-04-30T11:03:25Z by kres d15226e.
# Generated on 2024-05-08T11:26:09Z by kres 1e986af.

name: default
concurrency:
Expand Down Expand Up @@ -31,7 +31,7 @@ jobs:
if: (!startsWith(github.head_ref, 'renovate/') && !startsWith(github.head_ref, 'dependabot/'))
services:
buildkitd:
image: moby/buildkit:v0.13.1
image: moby/buildkit:v0.13.2
options: --privileged
ports:
- 1234:1234
Expand Down
6 changes: 3 additions & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2024-04-30T10:38:23Z by kres ebc009d.
# Generated on 2024-05-08T11:26:09Z by kres 1e986af.

# options for analysis running
run:
Expand Down Expand Up @@ -54,7 +54,6 @@ linters-settings:
goimports:
local-prefixes: github.com/siderolabs/go-circular/
gomodguard: { }
gomnd: { }
govet:
enable-all: true
lll:
Expand Down Expand Up @@ -109,17 +108,18 @@ linters:
disable:
- exhaustivestruct
- exhaustruct
- err113
- forbidigo
- funlen
- gochecknoglobals
- gochecknoinits
- godox
- goerr113
- gomnd
- gomoddirectives
- gosec
- inamedparam
- ireturn
- mnd
- nestif
- nonamedreturns
- nosnakecase
Expand Down
7 changes: 4 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# syntax = docker/dockerfile-upstream:1.7.0-labs
# syntax = docker/dockerfile-upstream:1.7.1-labs

# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2024-04-30T10:38:23Z by kres ebc009d.
# Generated on 2024-05-08T13:07:24Z by kres 1e986af.

ARG TOOLCHAIN

Expand Down Expand Up @@ -55,12 +55,13 @@ COPY go.sum go.sum
RUN cd .
RUN --mount=type=cache,target=/go/pkg go mod download
RUN --mount=type=cache,target=/go/pkg go mod verify
COPY ./zstd ./zstd
COPY ./chunk.go ./chunk.go
COPY ./circular.go ./circular.go
COPY ./circular_test.go ./circular_test.go
COPY ./errors.go ./errors.go
COPY ./options.go ./options.go
COPY ./reader.go ./reader.go
COPY ./streaming.go ./streaming.go
RUN --mount=type=cache,target=/go/pkg go list -mod=readonly all >/dev/null

# runs gofumpt
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2024-04-30T11:03:09Z by kres d15226e.
# Generated on 2024-05-08T11:26:09Z by kres 1e986af.

# common variables

Expand All @@ -20,9 +20,9 @@ GRPC_GO_VERSION ?= 1.3.0
GRPC_GATEWAY_VERSION ?= 2.19.1
VTPROTOBUF_VERSION ?= 0.6.0
DEEPCOPY_VERSION ?= v0.5.6
GOLANGCILINT_VERSION ?= v1.57.2
GOLANGCILINT_VERSION ?= v1.58.0
GOFUMPT_VERSION ?= v0.6.0
GO_VERSION ?= 1.22.2
GO_VERSION ?= 1.22.3
GOIMPORTS_VERSION ?= v0.20.0
GO_BUILDFLAGS ?=
GO_LDFLAGS ?=
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
# go-circular

Package circular provides a buffer with circular semantics.

## Design

The buffer is split into chunks, the last chunk is not compressed and being written to, and previous chunks
are compressed and immutable.

The buffer keeps a pointer to the current offset being written which is always incremented, while the index
of compressed chunks keeps the initial offset of each compressed chunk and its decompressed size.

If the compressed chunk is being read, it is decompressed on the fly by the Reader.
15 changes: 15 additions & 0 deletions chunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package circular

type chunk struct {
// compressed data
compressed []byte
// start offset of the chunk, as it was in the circular buffer when the chunk was created
startOffset int64
// uncompressed size of the chunk
size int64
// [TODO]: have a unique (incrementing?) chunk ID for file-based storage
}
118 changes: 100 additions & 18 deletions circular.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,33 @@ package circular

import (
"fmt"
"math"
"slices"
"sync"
)

// Buffer implements circular buffer with a thread-safe writer,
// that supports multiple readers each with its own offset.
type Buffer struct {
// waking up readers on new writes
// waking up streaming readers on new writes
cond *sync.Cond

// data slice, might grow up to MaxCapacity, then used
// compressed chunks, ordered from the smallest offset to the largest
chunks []chunk

// the last uncompressed chunk, might grow up to MaxCapacity, then used
// as circular buffer
data []byte

// synchronizing access to data, off
// buffer options
opt Options

// synchronizing access to data, off, chunks
mu sync.Mutex

// write offset, always goes up, actual offset in data slice
// is (off % cap(data))
off int64

opt Options
}

// NewBuffer creates new Buffer with specified options.
Expand Down Expand Up @@ -86,18 +92,46 @@ func (buf *Buffer) Write(p []byte) (int, error) {

var n int
for n < l {
rotate := false

i := int(buf.off % int64(buf.opt.MaxCapacity))

nn := buf.opt.MaxCapacity - i
if nn > len(p) {
nn = len(p)
} else {
rotate = true
}

copy(buf.data[i:], p[:nn])

buf.off += int64(nn)
n += nn
p = p[nn:]

if rotate && buf.opt.NumCompressedChunks > 0 {
var compressionBuf []byte

if len(buf.chunks) == buf.opt.NumCompressedChunks {
// going to drop the chunk, so reuse its buffer
compressionBuf = buf.chunks[0].compressed[:0]
}

compressed, err := buf.opt.Compressor.Compress(buf.data, compressionBuf)
if err != nil {
return n, err
}

buf.chunks = append(buf.chunks, chunk{
compressed: compressed,
startOffset: buf.off - int64(buf.opt.MaxCapacity),
size: int64(buf.opt.MaxCapacity),
})

if len(buf.chunks) > buf.opt.NumCompressedChunks {
buf.chunks = slices.Delete(buf.chunks, 0, 1)
}
}
}

buf.cond.Broadcast()
Expand All @@ -113,6 +147,47 @@ func (buf *Buffer) Capacity() int {
return cap(buf.data)
}

// NumCompressedChunks returns number of compressed chunks.
func (buf *Buffer) NumCompressedChunks() int {
buf.mu.Lock()
defer buf.mu.Unlock()

return len(buf.chunks)
}

// TotalCompressedSize reports the overall memory used by the circular buffer including compressed chunks.
func (buf *Buffer) TotalCompressedSize() int64 {
buf.mu.Lock()
defer buf.mu.Unlock()

var size int64

for _, c := range buf.chunks {
size += int64(len(c.compressed))
}

return size + int64(cap(buf.data))
}

// TotalSize reports overall number of bytes available for reading in the buffer.
//
// TotalSize might be higher than TotalCompressedSize, because compressed chunks
// take less memory than uncompressed data.
func (buf *Buffer) TotalSize() int64 {
buf.mu.Lock()
defer buf.mu.Unlock()

if len(buf.chunks) == 0 {
if buf.off < int64(cap(buf.data)) {
return buf.off
}

return int64(cap(buf.data))
}

return buf.off - buf.chunks[0].startOffset
}

// Offset returns current write offset (number of bytes written).
func (buf *Buffer) Offset() int64 {
buf.mu.Lock()
Expand All @@ -121,23 +196,16 @@ func (buf *Buffer) Offset() int64 {
return buf.off
}

// GetStreamingReader returns StreamingReader object which implements io.ReadCloser, io.Seeker.
// GetStreamingReader returns Reader object which implements io.ReadCloser, io.Seeker.
//
// StreamingReader starts at the most distant position in the past available.
func (buf *Buffer) GetStreamingReader() *StreamingReader {
buf.mu.Lock()
defer buf.mu.Unlock()
func (buf *Buffer) GetStreamingReader() *Reader {
r := buf.GetReader()

off := buf.off - int64(buf.opt.MaxCapacity-buf.opt.SafetyGap)
if off < 0 {
off = 0
}
r.endOff = math.MaxInt64
r.streaming = true

return &StreamingReader{
buf: buf,
initialOff: off,
off: off,
}
return r
}

// GetReader returns Reader object which implements io.ReadCloser, io.Seeker.
Expand All @@ -148,6 +216,20 @@ func (buf *Buffer) GetReader() *Reader {
buf.mu.Lock()
defer buf.mu.Unlock()

if len(buf.chunks) > 0 {
oldestChunk := buf.chunks[0]

return &Reader{
buf: buf,

chunk: &oldestChunk,

startOff: oldestChunk.startOffset,
endOff: buf.off,
off: oldestChunk.startOffset,
}
}

off := buf.off - int64(buf.opt.MaxCapacity-buf.opt.SafetyGap)
if off < 0 {
off = 0
Expand Down
Loading

0 comments on commit 3c48c53

Please sign in to comment.