Skip to content

Commit

Permalink
extract wrapper into folder
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs committed May 17, 2022
1 parent 7cb96a9 commit 08f4472
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 27 deletions.
13 changes: 6 additions & 7 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ import (
logging "github.com/ipfs/go-log/v2"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/libs/utils"
"github.com/celestiaorg/celestia-node/libs/service"
"github.com/celestiaorg/celestia-node/service/share"
)

var log = logging.Logger("das")

// DASer continuously validates availability of data committed to headers.
type DASer struct {
service.Service

da share.Availability
hsub header.Subscriber

Expand All @@ -32,8 +34,6 @@ type DASer struct {

sampleDn chan struct{} // done signal for sample loop
catchUpDn chan struct{} // done signal for catchUp loop

state *utils.StateWrapper
}

// NewDASer creates a new DASer.
Expand All @@ -52,7 +52,6 @@ func NewDASer(
jobsCh: make(chan *catchUpJob, 16),
sampleDn: make(chan struct{}),
catchUpDn: make(chan struct{}),
state: utils.NewStateWrapper(),
}
}

Expand All @@ -77,7 +76,7 @@ func (d *DASer) Start(context.Context) error {
dasCtx, cancel := context.WithCancel(context.Background())
d.cancel = cancel

d.state.SetState(utils.Running)
d.SetState(service.Running)
// kick off catch-up routine manager
go d.catchUpManager(dasCtx, checkpoint)
// kick off sampling routine for recently received headers
Expand All @@ -87,11 +86,11 @@ func (d *DASer) Start(context.Context) error {

// Stop stops sampling.
func (d *DASer) Stop(ctx context.Context) error {
if d.state.State() == utils.Stopped {
if d.State() == service.Stopped {
log.Debug("Daser is stopped")
return nil
}
defer d.state.SetState(utils.Stopped)
defer d.SetState(service.Stopped)
d.cancel()
// wait for both sampling routines to exit
for i := 0; i < 2; i++ {
Expand Down
21 changes: 10 additions & 11 deletions header/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
tmbytes "github.com/tendermint/tendermint/libs/bytes"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/libs/utils"
"github.com/celestiaorg/celestia-node/libs/service"
)

var log = logging.Logger("header/sync")
Expand All @@ -33,6 +33,8 @@ var log = logging.Logger("header/sync")
// * adds the header to pending cache(making it the latest known trusted header)
// * and triggers syncing loop to catch up to that point.
type Syncer struct {
service.Service

sub header.Subscriber
exchange header.Exchange
store header.Store
Expand All @@ -46,18 +48,15 @@ type Syncer struct {
pending ranges
// cancel cancels syncLoop's context
cancel context.CancelFunc

serviceState *utils.StateWrapper
}

// NewSyncer creates a new instance of Syncer.
func NewSyncer(exchange header.Exchange, store header.Store, sub header.Subscriber) *Syncer {
return &Syncer{
sub: sub,
exchange: exchange,
store: store,
triggerSync: make(chan struct{}, 1), // should be buffered
serviceState: utils.NewStateWrapper(),
sub: sub,
exchange: exchange,
store: store,
triggerSync: make(chan struct{}, 1), // should be buffered
}
}

Expand All @@ -73,7 +72,7 @@ func (s *Syncer) Start(context.Context) error {
}

ctx, cancel := context.WithCancel(context.Background())
s.serviceState.SetState(utils.Running)
s.SetState(service.Running)
go s.syncLoop(ctx)
s.wantSync()
s.cancel = cancel
Expand All @@ -82,11 +81,11 @@ func (s *Syncer) Start(context.Context) error {

// Stop stops Syncer.
func (s *Syncer) Stop(context.Context) error {
if s.serviceState.State() == utils.Stopped {
if s.Service.State() == service.Stopped {
log.Debug("Syncer is stopped")
return nil
}
s.serviceState.SetState(utils.Stopped)
s.SetState(service.Stopped)
s.cancel()
s.cancel = nil
return nil
Expand Down
14 changes: 5 additions & 9 deletions libs/utils/state_wrapper.go → libs/service/service.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package utils
package service

import "sync/atomic"

Expand All @@ -9,21 +9,17 @@ const (
Running
)

// StateWrapper provides an interface to change an check service state
type StateWrapper struct {
// Service provides an interface to change an check service state
type Service struct {
state uint32
}

func NewStateWrapper() *StateWrapper {
return &StateWrapper{}
}

// SetState allows to change service state
func (s *StateWrapper) SetState(state State) {
func (s *Service) SetState(state State) {
atomic.StoreUint32(&s.state, uint32(state))
}

// State returns service current state
func (s *StateWrapper) State() State {
func (s *Service) State() State {
return State(atomic.LoadUint32(&s.state))
}

0 comments on commit 08f4472

Please sign in to comment.