Skip to content

Commit

Permalink
feat: add persistence support
Browse files Browse the repository at this point in the history
Add support to persist and load buffer from disk.

Co-authored-by: Utku Ozdemir <utku.ozdemir@siderolabs.com>
Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
  • Loading branch information
smira and utkuozdemir committed May 16, 2024
1 parent 3c48c53 commit cbce5c3
Show file tree
Hide file tree
Showing 15 changed files with 886 additions and 10 deletions.
6 changes: 5 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2024-05-08T13:07:24Z by kres 1e986af.
# Generated on 2024-05-16T11:12:28Z by kres ce88e1c.

*
!zstd
!chunk.go
!circular.go
!circular_bench_test.go
!circular_test.go
!errors.go
!options.go
!options_test.go
!persistence.go
!persistence_test.go
!reader.go
!go.mod
!go.sum
Expand Down
4 changes: 2 additions & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2024-05-08T11:26:09Z by kres 1e986af.
# Generated on 2024-05-16T11:12:28Z by kres ce88e1c.

# options for analysis running
run:
Expand Down Expand Up @@ -35,7 +35,7 @@ linters-settings:
sections:
- standard # Standard section: captures all standard packages.
- default # Default section: contains all imports that could not be matched to another section type.
- prefix(github.com/siderolabs/go-circular/) # Custom section: groups all imports with the specified Prefix.
- localmodule
gocognit:
min-complexity: 30
nestif:
Expand Down
6 changes: 5 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2024-05-08T13:07:24Z by kres 1e986af.
# Generated on 2024-05-16T11:12:28Z by kres ce88e1c.

ARG TOOLCHAIN

Expand Down Expand Up @@ -58,9 +58,13 @@ RUN --mount=type=cache,target=/go/pkg go mod verify
COPY ./zstd ./zstd
COPY ./chunk.go ./chunk.go
COPY ./circular.go ./circular.go
COPY ./circular_bench_test.go ./circular_bench_test.go
COPY ./circular_test.go ./circular_test.go
COPY ./errors.go ./errors.go
COPY ./options.go ./options.go
COPY ./options_test.go ./options_test.go
COPY ./persistence.go ./persistence.go
COPY ./persistence_test.go ./persistence_test.go
COPY ./reader.go ./reader.go
RUN --mount=type=cache,target=/go/pkg go list -mod=readonly all >/dev/null

Expand Down
3 changes: 2 additions & 1 deletion chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ type chunk struct {
startOffset int64
// uncompressed size of the chunk
size int64
// [TODO]: have a unique (incrementing?) chunk ID for file-based storage
// unique chunk ID
id int64
}
63 changes: 63 additions & 0 deletions circular.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"math"
"slices"
"sync"
"sync/atomic"
)

// Buffer implements circular buffer with a thread-safe writer,
Expand All @@ -18,6 +19,9 @@ type Buffer struct {
// waking up streaming readers on new writes
cond *sync.Cond

// channel for persistence commands from the writer to the persistence goroutine
commandCh chan persistenceCommand

// compressed chunks, ordered from the smallest offset to the largest
chunks []chunk

Expand All @@ -28,6 +32,12 @@ type Buffer struct {
// buffer options
opt Options

// waitgroup to wait for persistence goroutine to finish
wg sync.WaitGroup

// closed flag (to disable writes after close)
closed atomic.Bool

// synchronizing access to data, off, chunks
mu sync.Mutex

Expand Down Expand Up @@ -59,16 +69,43 @@ func NewBuffer(opts ...OptionFunc) (*Buffer, error) {
buf.data = make([]byte, buf.opt.InitialCapacity)
buf.cond = sync.NewCond(&buf.mu)

if err := buf.load(); err != nil {
return nil, err
}

buf.run()

return buf, nil
}

// Close closes the buffer and waits for persistence goroutine to finish.
func (buf *Buffer) Close() error {
if buf.closed.Swap(true) {
return nil
}

if buf.commandCh != nil {
close(buf.commandCh)
}

buf.wg.Wait()

return nil
}

// Write implements io.Writer interface.
//
//nolint:gocognit
func (buf *Buffer) Write(p []byte) (int, error) {
l := len(p)
if l == 0 {
return 0, nil
}

if buf.closed.Load() {
return 0, ErrClosed
}

buf.mu.Lock()
defer buf.mu.Unlock()

Expand Down Expand Up @@ -122,13 +159,34 @@ func (buf *Buffer) Write(p []byte) (int, error) {
return n, err
}

var maxID int64

for _, c := range buf.chunks {
maxID = max(c.id, maxID)
}

buf.chunks = append(buf.chunks, chunk{
compressed: compressed,
startOffset: buf.off - int64(buf.opt.MaxCapacity),
size: int64(buf.opt.MaxCapacity),
id: maxID + 1,
})

if buf.commandCh != nil {
buf.commandCh <- persistenceCommand{
chunkID: maxID + 1,
data: compressed,
}
}

if len(buf.chunks) > buf.opt.NumCompressedChunks {
if buf.commandCh != nil {
buf.commandCh <- persistenceCommand{
chunkID: buf.chunks[0].id,
drop: true,
}
}

buf.chunks = slices.Delete(buf.chunks, 0, 1)
}
}
Expand All @@ -147,6 +205,11 @@ func (buf *Buffer) Capacity() int {
return cap(buf.data)
}

// MaxCapacity returns maximum number of (decompressed) bytes (including compressed chunks) that can be stored in the buffer.
func (buf *Buffer) MaxCapacity() int {
return buf.opt.MaxCapacity * (buf.opt.NumCompressedChunks + 1)
}

// NumCompressedChunks returns number of compressed chunks.
func (buf *Buffer) NumCompressedChunks() int {
buf.mu.Lock()
Expand Down
3 changes: 2 additions & 1 deletion circular_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

//go:build !race

package circular_test

//nolint:gci
import (
"crypto/rand"
"io"
Expand Down
1 change: 0 additions & 1 deletion circular_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package circular_test

//nolint:gci
import (
"bytes"
"context"
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ require (
github.com/klauspost/compress v1.17.8
github.com/siderolabs/gen v0.4.8
github.com/stretchr/testify v1.9.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.7.0
golang.org/x/time v0.5.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
13 changes: 12 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,28 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/siderolabs/gen v0.4.8 h1:VNpbmDLhkXp7qcSEkKk1Ee7vU2afs3xvHrWLGR2UuiY=
github.com/siderolabs/gen v0.4.8/go.mod h1:7ROKVHHB68R3Amrd4a1ZXz/oMrXWF3Mg3lSEgnkJY5c=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
71 changes: 69 additions & 2 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,61 @@

package circular

import "fmt"
import (
"fmt"
"math/rand/v2"
"time"

"go.uber.org/zap"
)

// Options defines settings for Buffer.
type Options struct {
Compressor Compressor

Logger *zap.Logger

PersistenceOptions PersistenceOptions

InitialCapacity int
MaxCapacity int
SafetyGap int

NumCompressedChunks int
}

// PersistenceOptions defines settings for Buffer persistence.
type PersistenceOptions struct {
// ChunkPath is the base path to the store chunk files.
//
// Example: /var/log/machine/my-machine.log, chunks will be stored
// by appending a chunk ID to this path, e.g. /var/log/machine/my-machine.log.3.
//
// If ChunkPath is empty, persistence is disabled.
ChunkPath string

// FlushInterval flushes buffer content to disk every FlushInterval (if there were any changes).
FlushInterval time.Duration

// FlushJitter adds random jitter to FlushInterval to avoid thundering herd problem (a ratio of FlushInterval).
FlushJitter float64
}

// NextInterval calculates next flush interval with jitter.
func (p PersistenceOptions) NextInterval() time.Duration {
return time.Duration(((rand.Float64()*2-1)*p.FlushJitter + 1.0) * float64(p.FlushInterval))
}

// Compressor implements an optional interface for chunk compression.
//
// Compress and Decompress append to the dest slice and return the result.
//
// Compressor should be safe for concurrent use by multiple goroutines.
// Compressor should verify checksums of the compressed data.
type Compressor interface {
Compress(src, dest []byte) ([]byte, error)
Decompress(src, dest []byte) ([]byte, error)
DecompressedSize(src []byte) (int64, error)
}

// defaultOptions returns default initial values.
Expand All @@ -31,6 +67,7 @@ func defaultOptions() Options {
InitialCapacity: 16384,
MaxCapacity: 1048576,
SafetyGap: 1024,
Logger: zap.NewNop(),
}
}

Expand Down Expand Up @@ -71,7 +108,7 @@ func WithMaxCapacity(capacity int) OptionFunc {
func WithSafetyGap(gap int) OptionFunc {
return func(opt *Options) error {
if gap <= 0 {
return fmt.Errorf("safety gap should be positive: %q", gap)
return fmt.Errorf("safety gap should be positive: %d", gap)
}

opt.SafetyGap = gap
Expand All @@ -95,3 +132,33 @@ func WithNumCompressedChunks(num int, c Compressor) OptionFunc {
return nil
}
}

// WithPersistence enables buffer persistence to disk.
func WithPersistence(options PersistenceOptions) OptionFunc {
return func(opt *Options) error {
if options.ChunkPath == "" {
return fmt.Errorf("chunk path should be set")
}

if options.FlushJitter < 0 || options.FlushJitter > 1 {
return fmt.Errorf("flush jitter should be in range [0, 1]: %f", options.FlushJitter)
}

if opt.Compressor == nil {
return fmt.Errorf("compressor should be set for persistence")
}

opt.PersistenceOptions = options

return nil
}
}

// WithLogger sets logger for Buffer.
func WithLogger(logger *zap.Logger) OptionFunc {
return func(opt *Options) error {
opt.Logger = logger

return nil
}
}
Loading

0 comments on commit cbce5c3

Please sign in to comment.