Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi: introduce a batch filter writer #274

Merged
merged 6 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions chanutils/batch_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package chanutils

import (
"sync"
"time"
)

// BatchWriterConfig holds the configuration options for BatchWriter.
type BatchWriterConfig[T any] struct {
yyforyongyu marked this conversation as resolved.
Show resolved Hide resolved
// QueueBufferSize sets the buffer size of the output channel of the
// concurrent queue used by the BatchWriter.
QueueBufferSize int

// MaxBatch is the maximum number of filters to be persisted to the DB
// in one go.
MaxBatch int

// DBWritesTickerDuration is the time after receiving a filter that the
// writer will wait for more filters before writing the current batch
// to the DB.
DBWritesTickerDuration time.Duration

// PutItems will be used by the BatchWriter to persist filters in
// batches.
PutItems func(...T) error
}

// BatchWriter manages writing Filters to the DB and tries to batch the writes
// as much as possible.
type BatchWriter[T any] struct {
started sync.Once
stopped sync.Once

cfg *BatchWriterConfig[T]

queue *ConcurrentQueue[T]

quit chan struct{}
wg sync.WaitGroup
}

// NewBatchWriter constructs a new BatchWriter using the given
// BatchWriterConfig.
func NewBatchWriter[T any](cfg *BatchWriterConfig[T]) *BatchWriter[T] {
return &BatchWriter[T]{
cfg: cfg,
queue: NewConcurrentQueue[T](cfg.QueueBufferSize),
quit: make(chan struct{}),
}
}

// Start starts the BatchWriter.
func (b *BatchWriter[T]) Start() {
b.started.Do(func() {
b.queue.Start()

b.wg.Add(1)
go b.manageNewItems()
})
}

// Stop stops the BatchWriter.
func (b *BatchWriter[T]) Stop() {
b.stopped.Do(func() {
close(b.quit)
b.wg.Wait()

b.queue.Stop()
})
}

// AddItem adds a given item to the BatchWriter queue.
func (b *BatchWriter[T]) AddItem(item T) {
b.queue.ChanIn() <- item
ellemouton marked this conversation as resolved.
Show resolved Hide resolved
}

// manageNewItems manages collecting filters and persisting them to the DB.
// There are two conditions for writing a batch of filters to the DB: the first
// is if a certain threshold (MaxBatch) of filters has been collected and the
// other is if at least one filter has been collected and a timeout has been
// reached.
//
// NOTE: this must be run in a goroutine.
func (b *BatchWriter[T]) manageNewItems() {
defer b.wg.Done()

batch := make([]T, 0, b.cfg.MaxBatch)

// writeBatch writes the current contents of the batch slice to the
// filters DB.
writeBatch := func() {
if len(batch) == 0 {
return
}

err := b.cfg.PutItems(batch...)
if err != nil {
log.Errorf("Could not write filters to filterDB: %v",
err)
}

// Empty the batch slice.
batch = make([]T, 0, b.cfg.MaxBatch)
}

ticker := time.NewTicker(b.cfg.DBWritesTickerDuration)
defer ticker.Stop()

// Stop the ticker since we don't want it to tick unless there is at
// least one item in the queue.
ticker.Stop()
Roasbeef marked this conversation as resolved.
Show resolved Hide resolved
Roasbeef marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: there's a near-impossible case, that if the DBWritesTickerDuration is extremely small, we may end up having a tick already inside ticker, causing an immediate write - no big deal here. But in a hindsight it looks like the lnd/ticker package is more suitable for this case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool yeah im happy to update to the lnd ticker.

The only reason not to would be for performance reasons: this ticker stops and starts the same one the whole time while the LND one keeps creating new ones & discarding old ones.
So maybe makes sense to keep as is? Agreed re the possible immediate write but also agree that it is not a big deal 👍

lemme know :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also doesnt really make sense to make the DBWritesTickerDuration too small cause then the batching benefits are lost

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah i think it's fine to leave it as-is, in the future we can add some config sanity check to make sure it won't happen - or let it happen, worse case we get an immediate call to the batch write.


for {
select {
case filter, ok := <-b.queue.ChanOut():
if !ok {
return
}

batch = append(batch, filter)

switch len(batch) {
// If the batch slice is full, we stop the ticker and
// write the batch contents to disk.
case b.cfg.MaxBatch:
ticker.Stop()
writeBatch()

// If an item is added to the batch, we reset the timer.
// This ensures that if the batch threshold is not met
// then items are still persisted in a timely manner.
default:
ticker.Reset(b.cfg.DBWritesTickerDuration)
}

case <-ticker.C:
// If the ticker ticks, then we stop it and write the
// current batch contents to the db. If any more items
// are added, the ticker will be reset.
ticker.Stop()
writeBatch()

case <-b.quit:
Roasbeef marked this conversation as resolved.
Show resolved Hide resolved
writeBatch()

return
}
}
}
210 changes: 210 additions & 0 deletions chanutils/batch_writer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package chanutils

import (
"fmt"
"math/rand"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)

const waitTime = time.Second * 5

// TestBatchWriter tests that the BatchWriter behaves as expected.
func TestBatchWriter(t *testing.T) {
t.Parallel()
rand.Seed(time.Now().UnixNano())

// waitForItems is a helper function that will wait for a given set of
// items to appear in the db.
waitForItems := func(db *mockItemsDB, items ...*item) {
err := waitFor(func() bool {
return db.hasItems(items...)
}, waitTime)
require.NoError(t, err)
}

t.Run("filters persisted after ticker", func(t *testing.T) {
t.Parallel()

// Create a mock filters DB.
db := newMockItemsDB()

// Construct a new BatchWriter backed by the mock db.
b := NewBatchWriter[*item](&BatchWriterConfig[*item]{
QueueBufferSize: 10,
MaxBatch: 20,
DBWritesTickerDuration: time.Millisecond * 500,
PutItems: db.PutItems,
})
b.Start()
t.Cleanup(b.Stop)

fs := genFilterSet(5)
for _, f := range fs {
b.AddItem(f)
}
waitForItems(db, fs...)
})

t.Run("write once threshold is reached", func(t *testing.T) {
ellemouton marked this conversation as resolved.
Show resolved Hide resolved
t.Parallel()

// Create a mock filters DB.
db := newMockItemsDB()

// Construct a new BatchWriter backed by the mock db.
// Make the DB writes ticker duration extra long so that we
// can explicitly test that the batch gets persisted if the
// MaxBatch threshold is reached.
b := NewBatchWriter[*item](&BatchWriterConfig[*item]{
QueueBufferSize: 10,
MaxBatch: 20,
DBWritesTickerDuration: time.Hour,
PutItems: db.PutItems,
})
b.Start()
t.Cleanup(b.Stop)

// Generate 30 filters and add each one to the batch writer.
fs := genFilterSet(30)
for _, f := range fs {
b.AddItem(f)
}

// Since the MaxBatch threshold has been reached, we expect the
// first 20 filters to be persisted.
waitForItems(db, fs[:20]...)

// Since the last 10 filters don't reach the threshold and since
// the ticker has definitely not ticked yet, we don't expect the
// last 10 filters to be in the db yet.
require.False(t, db.hasItems(fs[21:]...))
})

t.Run("stress test", func(t *testing.T) {
t.Parallel()

// Create a mock filters DB.
db := newMockItemsDB()

// Construct a new BatchWriter backed by the mock db.
// Make the DB writes ticker duration extra long so that we
// can explicitly test that the batch gets persisted if the
// MaxBatch threshold is reached.
b := NewBatchWriter[*item](&BatchWriterConfig[*item]{
QueueBufferSize: 5,
MaxBatch: 5,
DBWritesTickerDuration: time.Millisecond * 2,
PutItems: db.PutItems,
})
b.Start()
t.Cleanup(b.Stop)

// Generate lots of filters and add each to the batch writer.
// Sleep for a bit between each filter to ensure that we
// sometimes hit the timeout write and sometimes the threshold
// write.
fs := genFilterSet(1000)
for _, f := range fs {
b.AddItem(f)

n := rand.Intn(3)
time.Sleep(time.Duration(n) * time.Millisecond)
}

// Since the MaxBatch threshold has been reached, we expect the
// first 20 filters to be persisted.
waitForItems(db, fs...)
})
}

type item struct {
i int
}

// mockItemsDB is a mock DB that holds a set of items.
type mockItemsDB struct {
items map[int]bool
mu sync.Mutex
}

// newMockItemsDB constructs a new mockItemsDB.
func newMockItemsDB() *mockItemsDB {
return &mockItemsDB{
items: make(map[int]bool),
}
}

// hasItems returns true if the db contains all the given items.
func (m *mockItemsDB) hasItems(items ...*item) bool {
m.mu.Lock()
defer m.mu.Unlock()

for _, i := range items {
_, ok := m.items[i.i]
if !ok {
return false
}
}

return true
}

// PutItems adds a set of items to the db.
func (m *mockItemsDB) PutItems(items ...*item) error {
m.mu.Lock()
defer m.mu.Unlock()

for _, i := range items {
m.items[i.i] = true
}

return nil
}

// genItemSet generates a set of numFilters items.
func genFilterSet(numFilters int) []*item {
res := make([]*item, numFilters)
for i := 0; i < numFilters; i++ {
res[i] = &item{i: i}
}

return res
}

// pollInterval is a constant specifying a 200 ms interval.
const pollInterval = 200 * time.Millisecond

// waitFor is a helper test function that will wait for a timeout period of
// time until the passed predicate returns true. This function is helpful as
// timing doesn't always line up well when running integration tests with
// several running lnd nodes. This function gives callers a way to assert that
// some property is upheld within a particular time frame.
func waitFor(pred func() bool, timeout time.Duration) error {
exitTimer := time.After(timeout)
result := make(chan bool, 1)

for {
<-time.After(pollInterval)

go func() {
result <- pred()
}()

// Each time we call the pred(), we expect a result to be
// returned otherwise it will timeout.
select {
case <-exitTimer:
return fmt.Errorf("predicate not satisfied after " +
"time out")

case succeed := <-result:
if succeed {
return nil
}
}
}
}
26 changes: 26 additions & 0 deletions chanutils/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package chanutils

import "github.com/btcsuite/btclog"

// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger

// The default amount of logging is none.
func init() {
DisableLog()
}

// DisableLog disables all library log output. Logging output is disabled
// by default until either UseLogger or SetLogWriter are called.
func DisableLog() {
UseLogger(btclog.Disabled)
}

// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}
Loading