Skip to content

Commit

Permalink
Cleaned up bundler and tester
Browse files Browse the repository at this point in the history
  • Loading branch information
jubeless committed Nov 18, 2022
1 parent 09ca634 commit 9b3e837
Show file tree
Hide file tree
Showing 12 changed files with 398 additions and 234 deletions.
167 changes: 110 additions & 57 deletions bundler/bundler.go
Original file line number Diff line number Diff line change
@@ -1,115 +1,168 @@
package bundler

import (
"bytes"
"context"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/jhump/protoreflect/dynamic"
"github.com/streamingfast/bstream"
"github.com/streamingfast/derr"
"github.com/streamingfast/dstore"
"github.com/streamingfast/substreams-sink-files/sink"
"go.uber.org/zap"
)

type BundlerType string
type FileType string

const (
BundlerTypeJSON BundlerType = "json"
FileTypeJSONL FileType = "jsonl"
)

type Bundler struct {
size uint64
bundlerType BundlerType
encoder Encoder
store dstore.Store
size uint64
encoder Encoder

objects []proto.Message
fileStores *DStoreIO
stateStore *StateStore
fileType FileType

startBlockNum uint64
activeBoundary *bstream.Range

zlogger *zap.Logger
}

func New(store dstore.Store, size uint64, startBlock uint64, bundlerType BundlerType, zlogger *zap.Logger) (*Bundler, error) {
func New(
store dstore.Store,
stateFilePath string,
size uint64,
fileType FileType,
zlogger *zap.Logger,
) (*Bundler, error) {

stateStore, err := loadStateStore(stateFilePath)
if err != nil {
return nil, fmt.Errorf("load state store: %w", err)
}

b := &Bundler{
store: store,
size: size,
bundlerType: bundlerType,
startBlockNum: startBlock,
objects: []proto.Message{},
zlogger: zlogger,
fileStores: newDStoreIO(store, 3, zlogger),
stateStore: stateStore,
fileType: fileType,
size: size,
zlogger: zlogger,
}

switch bundlerType {
case BundlerTypeJSON:
b.encoder = JSONEncode
switch fileType {
case FileTypeJSONL:
b.encoder = JSONLEncode
default:
return nil, fmt.Errorf("invalid bundler type %q", bundlerType)
return nil, fmt.Errorf("invalid file type %q", fileType)
}
return b, nil
}

func (b *Bundler) ForceFlush(ctx context.Context) error {
boundary := bstream.NewRangeExcludingEnd(b.startBlockNum, b.startBlockNum+b.size)
if err := b.save(ctx, boundary); err != nil {
return fmt.Errorf("save %q: %w", boundary.String(), err)
func (b *Bundler) GetCursor() (*sink.Cursor, error) {
return b.stateStore.Read()
}

func (b *Bundler) Start(blockNum uint64) error {
boundaryRange := b.newBoundary(blockNum)
b.activeBoundary = boundaryRange
filename := b.filename(boundaryRange)
b.zlogger.Info("starting new file boundary", zap.Stringer("boundary", boundaryRange), zap.String("fiename", filename))

if err := b.fileStores.StartFile(filename); err != nil {
return fmt.Errorf("start file: %w", err)
}

b.stateStore.newBoundary(filename, boundaryRange)
return nil
}

func (b *Bundler) Flush(ctx context.Context, blockNum uint64) (bool, error) {
boundaries := b.boundariesToSave(blockNum)
if len(boundaries) == 0 {
return false, nil
func (b *Bundler) Stop() error {
b.zlogger.Info("stopping file boundary")

if err := b.fileStores.CloseFile(); err != nil {
return fmt.Errorf("closing file: %w", err)
}
for _, boundary := range boundaries {
if err := b.save(ctx, boundary); err != nil {
return false, fmt.Errorf("save %q: %w", boundary.String(), err)
}

if err := b.stateStore.Save(); err != nil {
return fmt.Errorf("failed to save state: %w", err)
}
return true, nil

b.activeBoundary = nil
return nil
}

func (b *Bundler) Write(entities []*dynamic.Message) {
for _, entity := range entities {
b.objects = append(b.objects, proto.Message(entity))
func (b *Bundler) Roll(ctx context.Context, blockNum uint64) error {
if b.activeBoundary.Contains(blockNum) {
return nil
}
}

func (b *Bundler) save(ctx context.Context, boundary *bstream.Range) error {
filename := b.filename(boundary)
boundaries := boundariesToSkip(b.activeBoundary, blockNum, b.size)

b.zlogger.Debug("storing boundary",
zap.String("filename", filename),
b.zlogger.Info("block_num is not in active boundary",
zap.Stringer("active_boundary", b.activeBoundary),
zap.Int("boundaries_to_skip", len(boundaries)),
zap.Uint64("block_num", blockNum),
)

content, err := b.encoder(b.objects)
if err != nil {
return fmt.Errorf("encode objets: %w", err)
if err := b.Stop(); err != nil {
return fmt.Errorf("stop active boundary: %w", err)
}

if err := derr.RetryContext(ctx, 3, func(ctx context.Context) error {
return b.store.WriteObject(ctx, filename, bytes.NewReader(content))
}); err != nil {
return fmt.Errorf("write object: %w", err)
for _, boundary := range boundaries {
if err := b.Start(boundary.StartBlock()); err != nil {
return fmt.Errorf("start skipping boundary: %w", err)
}
if err := b.Stop(); err != nil {
return fmt.Errorf("stop skipping boundary: %w", err)
}
}

b.objects = []proto.Message{}
b.startBlockNum = *boundary.EndBlock()
if err := b.Start(blockNum); err != nil {
return fmt.Errorf("start skipping boundary: %w", err)
}
return nil
}

func (b *Bundler) newBoundary(containingBlockNum uint64) *bstream.Range {
startBlock := containingBlockNum - (containingBlockNum % b.size)
return bstream.NewRangeExcludingEnd(startBlock, startBlock+b.size)
}

func (b *Bundler) filename(blockRange *bstream.Range) string {
return fmt.Sprintf("%010d-%010d.%s", blockRange.StartBlock(), (*blockRange.EndBlock()), b.bundlerType)
return fmt.Sprintf("%010d-%010d.%s", blockRange.StartBlock(), (*blockRange.EndBlock()), b.fileType)
}

func (b *Bundler) Write(cursor *sink.Cursor, entities []*dynamic.Message) error {
var buf []byte
for _, entity := range entities {
cnt, err := b.encoder(proto.Message(entity))
if err != nil {
return fmt.Errorf("failed to encode: %w", err)
}
buf = append(buf, cnt...)
}

if _, err := b.fileStores.activeWriter.Write(buf); err != nil {
return fmt.Errorf("failed to write data: %w", err)
}

b.stateStore.setCursor(cursor)
return nil
}

func (b *Bundler) boundariesToSave(blockNum uint64) (out []*bstream.Range) {
rangeStartBlock := b.startBlockNum
for blockNum >= rangeStartBlock+b.size {
out = append(out, bstream.NewRangeExcludingEnd(rangeStartBlock, rangeStartBlock+b.size))
rangeStartBlock = rangeStartBlock + b.size
func boundariesToSkip(lastBoundary *bstream.Range, blockNum uint64, size uint64) (out []*bstream.Range) {
iter := *lastBoundary.EndBlock()
endBlock := computeEndBlock(iter, size)
for blockNum >= endBlock {
out = append(out, bstream.NewRangeExcludingEnd(iter, endBlock))
iter = endBlock
endBlock = computeEndBlock(iter, size)
}
return out
}

func computeEndBlock(startBlockNum, size uint64) uint64 {
return (startBlockNum + size) - (startBlockNum+size)%size
}
84 changes: 57 additions & 27 deletions bundler/bundler_test.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,70 @@
package bundler

import (
"github.com/magiconair/properties/assert"
"github.com/streamingfast/bstream"
"github.com/stretchr/testify/assert"
"testing"
)

func TestBundler_boundariesToSave(t *testing.T) {
func TestBoundary_newBoundary(t *testing.T) {
tests := []struct {
name string
startBlockNum uint64
bundlerSize uint64
blockNum uint64
expect []*bstream.Range
name string
bundlerSize uint64
blockNum uint64
expect *bstream.Range
}{
{"before boundary", 0, 100, 98, []*bstream.Range{}},
{"on boundary", 0, 100, 100, []*bstream.Range{
bstream.NewRangeExcludingEnd(0, 100),
}},
{"above boundary", 0, 100, 107, []*bstream.Range{
bstream.NewRangeExcludingEnd(0, 100),
}},
{"above boundary", 0, 100, 199, []*bstream.Range{
bstream.NewRangeExcludingEnd(0, 100),
}},
{"above boundary", 2, 100, 200, []*bstream.Range{
bstream.NewRangeExcludingEnd(2, 100),
{"start of boundary w/ size 10", 10, 0, bstream.NewRangeExcludingEnd(0, 10)},
{"middle of boundary w/ size 10", 10, 7, bstream.NewRangeExcludingEnd(0, 10)},
{"last block of boundary w/ size 10", 10, 9, bstream.NewRangeExcludingEnd(0, 10)},
{"end block of boundary w/ size 10", 10, 10, bstream.NewRangeExcludingEnd(10, 20)},
{"start of boundary w/ size 100", 100, 0, bstream.NewRangeExcludingEnd(0, 100)},
{"middle of boundary w/ size 100", 100, 73, bstream.NewRangeExcludingEnd(0, 100)},
{"last block of boundary w/ size 100", 100, 99, bstream.NewRangeExcludingEnd(0, 100)},
{"end block of boundary w/ size 100", 100, 100, bstream.NewRangeExcludingEnd(100, 200)},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
b := &Bundler{
size: test.bundlerSize,
}
assert.Equal(t, test.expect, b.newBoundary(test.blockNum))
})
}
}

func TestBoundary_computeEndBlock(t *testing.T) {
tests := []struct {
name string
start uint64
size uint64
expect uint64
}{
{"on boundary", 100, 100, 200},
{"off boundary", 123, 100, 200},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
assert.Equal(t, test.expect, computeEndBlock(test.start, test.size))
})
}
}

func TestBundler_boundariesToSkip(t *testing.T) {
tests := []struct {
name string
lastActiveBoundary *bstream.Range
bundlerSize uint64
blockNum uint64
expect []*bstream.Range
}{
{"before boundary", bstream.NewRangeExcludingEnd(0, 100), 100, 98, nil},
{"on boundary", bstream.NewRangeExcludingEnd(0, 100), 100, 100, nil},
{"above boundary", bstream.NewRangeExcludingEnd(0, 100), 100, 107, nil},
{"above boundary", bstream.NewRangeExcludingEnd(0, 100), 100, 199, nil},
{"above boundary", bstream.NewRangeExcludingEnd(2, 100), 100, 200, []*bstream.Range{
bstream.NewRangeExcludingEnd(100, 200),
}},
{"above boundary", 4, 100, 763, []*bstream.Range{
bstream.NewRangeExcludingEnd(4, 100),
{"above boundary", bstream.NewRangeExcludingEnd(4, 100), 100, 763, []*bstream.Range{
bstream.NewRangeExcludingEnd(100, 200),
bstream.NewRangeExcludingEnd(200, 300),
bstream.NewRangeExcludingEnd(300, 400),
Expand All @@ -40,12 +75,7 @@ func TestBundler_boundariesToSave(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
b := &Bundler{
size: test.bundlerSize,
startBlockNum: test.startBlockNum,
}
assert.Equal(t, test.expect, b.boundariesToSave(test.blockNum))
assert.Equal(t, test.expect, boundariesToSkip(test.lastActiveBoundary, test.blockNum, test.bundlerSize))
})
}

}
18 changes: 7 additions & 11 deletions bundler/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,15 @@ import (
"github.com/golang/protobuf/proto"
)

type Encoder func([]proto.Message) ([]byte, error)
type Encoder func(proto.Message) ([]byte, error)

func JSONEncode(messages []proto.Message) ([]byte, error) {
func JSONLEncode(message proto.Message) ([]byte, error) {
buf := []byte{}
for i := 0; i < len(messages); i++ {
data, err := json.Marshal(messages[i])
if err != nil {
return nil, fmt.Errorf("json marshal: %w", err)
}
buf = append(buf, data...)
if i < len(messages)-1 {
buf = append(buf, byte('\n'))
}
data, err := json.Marshal(message)
if err != nil {
return nil, fmt.Errorf("json marshal: %w", err)
}
buf = append(buf, data...)
buf = append(buf, byte('\n'))
return buf, nil
}
Loading

0 comments on commit 9b3e837

Please sign in to comment.