Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[statefulsyncer] Add Variadic Initializer #250

Merged
merged 6 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions fetcher/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ const (
// clientTimeout is returned when a request exceeds the set
// HTTP timeout setting.
clientTimeout = "Client.Timeout exceeded"

// serverClosedIdleConnection is returned when the client
// attempts to make a request on a connection that was closed
// by the server.
serverClosedIdleConnection = "server closed idle connection"
)

// Backoff wraps backoff.BackOff so we can
Expand Down Expand Up @@ -66,6 +71,7 @@ func transientError(err error) bool {
if errors.Is(err, client.ErrRetriable) ||
strings.Contains(err.Error(), io.EOF.Error()) ||
strings.Contains(err.Error(), connectionResetByPeer) ||
strings.Contains(err.Error(), serverClosedIdleConnection) ||
strings.Contains(err.Error(), clientTimeout) {
return true
}
Expand Down
59 changes: 59 additions & 0 deletions statefulsyncer/configuration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2020 Coinbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package statefulsyncer

import (
"time"
)

// Option is used to overwrite default values in
// StatefulSyncer construction. Any Option not provided
// falls back to the default value.
type Option func(s *StatefulSyncer)

// WithCacheSize overrides the default cache size.
func WithCacheSize(cacheSize int) Option {
return func(s *StatefulSyncer) {
s.cacheSize = cacheSize
}
}

// WithPastBlockLimit overrides the default past block limit
func WithPastBlockLimit(blocks int) Option {
return func(s *StatefulSyncer) {
s.pastBlockLimit = blocks
}
}

// WithMaxConcurrency overrides the default max concurrency.
func WithMaxConcurrency(concurrency int64) Option {
return func(s *StatefulSyncer) {
s.maxConcurrency = concurrency
}
}

// WithAdjustmentWindow overrides the default adjustment window.
func WithAdjustmentWindow(adjustmentWindow int64) Option {
return func(s *StatefulSyncer) {
s.adjustmentWindow = adjustmentWindow
}
}

// WithPruneSleepTime overrides the default prune sleep time.
func WithPruneSleepTime(sleepTime int) Option {
return func(s *StatefulSyncer) {
s.pruneSleepTime = time.Duration(sleepTime) * time.Second
}
}
79 changes: 36 additions & 43 deletions statefulsyncer/stateful_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@ import (
"errors"
"fmt"
"log"
"math/big"
"time"

"github.com/coinbase/rosetta-sdk-go/fetcher"
"github.com/coinbase/rosetta-sdk-go/storage"
"github.com/coinbase/rosetta-sdk-go/syncer"
"github.com/coinbase/rosetta-sdk-go/types"
"github.com/coinbase/rosetta-sdk-go/utils"
)

var _ syncer.Handler = (*StatefulSyncer)(nil)
var _ syncer.Helper = (*StatefulSyncer)(nil)

const (
// pruneSleepTime is how long we sleep between
// DefaultPruneSleepTime is how long we sleep between
// pruning attempts.
pruneSleepTime = 10 * time.Second
DefaultPruneSleepTime = 30 * time.Minute

// pruneBuffer is the cushion we apply to pastBlockLimit
// when pruning.
Expand All @@ -54,9 +54,12 @@ type StatefulSyncer struct {
counterStorage *storage.CounterStorage
logger Logger
workers []storage.BlockWorker
cacheSize int
maxConcurrency int64
pastBlockLimit int

cacheSize int
maxConcurrency int64
pastBlockLimit int
adjustmentWindow int64
pruneSleepTime time.Duration
}

// Logger is used by the statefulsyncer to
Expand Down Expand Up @@ -87,22 +90,31 @@ func New(
logger Logger,
cancel context.CancelFunc,
workers []storage.BlockWorker,
cacheSize int,
maxConcurrency int64,
pastBlockLimit int,
options ...Option,
) *StatefulSyncer {
return &StatefulSyncer{
s := &StatefulSyncer{
network: network,
fetcher: fetcher,
cancel: cancel,
blockStorage: blockStorage,
counterStorage: counterStorage,
workers: workers,
logger: logger,
cacheSize: cacheSize,
maxConcurrency: maxConcurrency,
pastBlockLimit: pastBlockLimit,

// Optional args
cacheSize: syncer.DefaultCacheSize,
maxConcurrency: syncer.DefaultMaxConcurrency,
pastBlockLimit: syncer.DefaultPastBlockLimit,
adjustmentWindow: syncer.DefaultAdjustmentWindow,
pruneSleepTime: DefaultPruneSleepTime,
}

// Override defaults with any provided options
for _, opt := range options {
opt(s)
}

return s
}

// Sync starts a new sync run after properly initializing blockStorage.
Expand Down Expand Up @@ -135,6 +147,7 @@ func (s *StatefulSyncer) Sync(ctx context.Context, startIndex int64, endIndex in
syncer.WithPastBlocks(pastBlocks),
syncer.WithCacheSize(s.cacheSize),
syncer.WithMaxConcurrency(s.maxConcurrency),
syncer.WithAdjustmentWindow(s.adjustmentWindow),
)

return syncer.Sync(ctx, startIndex, endIndex)
Expand All @@ -148,10 +161,16 @@ func (s *StatefulSyncer) Sync(ctx context.Context, startIndex int64, endIndex in
// pruning strategies during syncing.
func (s *StatefulSyncer) Prune(ctx context.Context, helper PruneHelper) error {
for ctx.Err() == nil {
// We don't use a timer pattern because s.pruneSleepTime is defined
// as the time between pruning runs. Using a timer would only guarantee
// that the difference between starts of each pruning run are s.pruneSleepTime.
if err := utils.ContextSleep(ctx, s.pruneSleepTime); err != nil {
return err
}

headBlock, err := s.blockStorage.GetHeadBlockIdentifier(ctx)
if headBlock == nil && errors.Is(err, storage.ErrHeadBlockNotFound) {
// this will occur when we are waiting for the first block to be synced
time.Sleep(pruneSleepTime)
continue
}
if err != nil {
Expand All @@ -161,7 +180,6 @@ func (s *StatefulSyncer) Prune(ctx context.Context, helper PruneHelper) error {
oldestIndex, err := s.blockStorage.GetOldestBlockIndex(ctx)
if oldestIndex == -1 && errors.Is(err, storage.ErrOldestIndexMissing) {
// this will occur when we have yet to store the oldest index
time.Sleep(pruneSleepTime)
continue
}
if err != nil {
Expand All @@ -174,7 +192,6 @@ func (s *StatefulSyncer) Prune(ctx context.Context, helper PruneHelper) error {
}

if pruneableIndex < oldestIndex {
time.Sleep(pruneSleepTime)
continue
}

Expand All @@ -196,8 +213,6 @@ func (s *StatefulSyncer) Prune(ctx context.Context, helper PruneHelper) error {

log.Println(pruneMessage)
}

time.Sleep(pruneSleepTime)
}

return ctx.Err()
Expand All @@ -215,23 +230,7 @@ func (s *StatefulSyncer) BlockAdded(ctx context.Context, block *types.Block) err
)
}

if err := s.logger.AddBlockStream(ctx, block); err != nil {
return nil
}

// Update Counters
_, _ = s.counterStorage.Update(ctx, storage.BlockCounter, big.NewInt(1))
_, _ = s.counterStorage.Update(
ctx,
storage.TransactionCounter,
big.NewInt(int64(len(block.Transactions))),
)
opCount := int64(0)
for _, txn := range block.Transactions {
opCount += int64(len(txn.Operations))
}
_, _ = s.counterStorage.Update(ctx, storage.OperationCounter, big.NewInt(opCount))

_ = s.logger.AddBlockStream(ctx, block)
return nil
}

Expand All @@ -250,14 +249,8 @@ func (s *StatefulSyncer) BlockRemoved(
)
}

if err := s.logger.RemoveBlockStream(ctx, blockIdentifier); err != nil {
return nil
}

// Update Counters
_, _ = s.counterStorage.Update(ctx, storage.OrphanCounter, big.NewInt(1))

return err
_ = s.logger.RemoveBlockStream(ctx, blockIdentifier)
return nil
}

// NetworkStatus is called by the syncer to get the current
Expand Down
57 changes: 57 additions & 0 deletions storage/counter_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"
"math/big"

"github.com/coinbase/rosetta-sdk-go/types"
)

const (
Expand Down Expand Up @@ -76,6 +78,8 @@ const (
counterNamespace = "counter"
)

var _ BlockWorker = (*CounterStorage)(nil)

// CounterStorage implements counter-specific storage methods
// on top of a Database and DatabaseTransaction interface.
type CounterStorage struct {
Expand Down Expand Up @@ -162,3 +166,56 @@ func (c *CounterStorage) Get(ctx context.Context, counter string) (*big.Int, err

return transactionalGet(ctx, counter, transaction)
}

// AddingBlock is called by BlockStorage when adding a block.
func (c *CounterStorage) AddingBlock(
ctx context.Context,
block *types.Block,
transaction DatabaseTransaction,
) (CommitWorker, error) {
_, err := c.UpdateTransactional(
ctx,
transaction,
BlockCounter,
big.NewInt(1),
)
if err != nil {
return nil, err
}

_, err = c.UpdateTransactional(
ctx,
transaction,
TransactionCounter,
big.NewInt(int64(len(block.Transactions))),
)
if err != nil {
return nil, err
}

opCount := int64(0)
for _, txn := range block.Transactions {
opCount += int64(len(txn.Operations))
}
_, err = c.UpdateTransactional(
ctx,
transaction,
OperationCounter,
big.NewInt(opCount),
)
if err != nil {
return nil, err
}

return nil, nil
}

// RemovingBlock is called by BlockStorage when removing a block.
func (c *CounterStorage) RemovingBlock(
ctx context.Context,
block *types.Block,
transaction DatabaseTransaction,
) (CommitWorker, error) {
_, err := c.UpdateTransactional(ctx, transaction, OrphanCounter, big.NewInt(1))
return nil, err
}
7 changes: 7 additions & 0 deletions syncer/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,10 @@ func WithMaxConcurrency(concurrency int64) Option {
s.maxConcurrency = concurrency
}
}

// WithAdjustmentWindow overrides the default adjustment window.
func WithAdjustmentWindow(adjustmentWindow int64) Option {
return func(s *Syncer) {
s.adjustmentWindow = adjustmentWindow
}
}
23 changes: 12 additions & 11 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,17 @@ func New(
options ...Option,
) *Syncer {
s := &Syncer{
network: network,
helper: helper,
handler: handler,
concurrency: DefaultConcurrency,
cacheSize: DefaultCacheSize,
maxConcurrency: DefaultMaxConcurrency,
sizeMultiplier: DefaultSizeMultiplier,
cancel: cancel,
pastBlocks: []*types.BlockIdentifier{},
pastBlockLimit: DefaultPastBlockLimit,
network: network,
helper: helper,
handler: handler,
concurrency: DefaultConcurrency,
cacheSize: DefaultCacheSize,
maxConcurrency: DefaultMaxConcurrency,
sizeMultiplier: DefaultSizeMultiplier,
cancel: cancel,
pastBlocks: []*types.BlockIdentifier{},
pastBlockLimit: DefaultPastBlockLimit,
adjustmentWindow: DefaultAdjustmentWindow,
}

// Override defaults with any provided options
Expand Down Expand Up @@ -408,7 +409,7 @@ func (s *Syncer) adjustWorkers() bool {
shouldCreate := false
if estimatedMaxCache+max < float64(s.cacheSize) &&
s.concurrency < s.maxConcurrency &&
s.lastAdjustment > defaultAdjustmentWindow {
s.lastAdjustment > s.adjustmentWindow {
s.goalConcurrency++
s.concurrency++
s.lastAdjustment = 0
Expand Down
5 changes: 3 additions & 2 deletions syncer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ const (
// of block sizes to keep when adjusting concurrency.
defaultTrailingWindow = 1000

// defaultAdjustmentWindow is how frequently we will
// DefaultAdjustmentWindow is how frequently we will
// consider increasing our concurrency.
defaultAdjustmentWindow = 10
DefaultAdjustmentWindow = 5

// DefaultSizeMultiplier is used to pad our average size adjustment.
// This can be used to account for the overhead associated with processing
Expand Down Expand Up @@ -146,6 +146,7 @@ type Syncer struct {
goalConcurrency int64
recentBlockSizes []int
lastAdjustment int64
adjustmentWindow int64
concurrencyLock sync.Mutex

// doneLoading is used to coordinate adding goroutines
Expand Down