Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce entity that will keep info about services current state and integrate it into DASer and Syncer #725

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (
logging "github.com/ipfs/go-log/v2"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/libs/service"
"github.com/celestiaorg/celestia-node/service/share"
)

var log = logging.Logger("das")

// DASer continuously validates availability of data committed to headers.
type DASer struct {
service.Service

da share.Availability
hsub header.Subscriber

Expand Down Expand Up @@ -73,6 +76,7 @@ func (d *DASer) Start(context.Context) error {
dasCtx, cancel := context.WithCancel(context.Background())
d.cancel = cancel

d.SetState(service.Running)
// kick off catch-up routine manager
go d.catchUpManager(dasCtx, checkpoint)
// kick off sampling routine for recently received headers
Expand All @@ -82,6 +86,11 @@ func (d *DASer) Start(context.Context) error {

// Stop stops sampling.
func (d *DASer) Stop(ctx context.Context) error {
if d.State() == service.Stopped {
log.Debug("Daser is stopped")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.Debug("Daser is stopped")
log.Debug("DASer is stopped")

return nil
}
defer d.SetState(service.Stopped)
d.cancel()
// wait for both sampling routines to exit
for i := 0; i < 2; i++ {
Expand Down
9 changes: 9 additions & 0 deletions header/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
tmbytes "github.com/tendermint/tendermint/libs/bytes"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/libs/service"
)

var log = logging.Logger("header/sync")
Expand All @@ -32,6 +33,8 @@ var log = logging.Logger("header/sync")
// * adds the header to pending cache(making it the latest known trusted header)
// * and triggers syncing loop to catch up to that point.
type Syncer struct {
service.Service

sub header.Subscriber
exchange header.Exchange
store header.Store
Expand Down Expand Up @@ -69,6 +72,7 @@ func (s *Syncer) Start(context.Context) error {
}

ctx, cancel := context.WithCancel(context.Background())
s.SetState(service.Running)
go s.syncLoop(ctx)
s.wantSync()
s.cancel = cancel
Expand All @@ -77,6 +81,11 @@ func (s *Syncer) Start(context.Context) error {

// Stop stops Syncer.
func (s *Syncer) Stop(context.Context) error {
if s.Service.State() == service.Stopped {
log.Debug("Syncer is stopped")
return nil
}
s.SetState(service.Stopped)
s.cancel()
s.cancel = nil
return nil
Expand Down
25 changes: 25 additions & 0 deletions libs/service/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package service

import "sync/atomic"

type State uint32

const (
Stopped State = iota
Running
)

// Service provides an interface to change an check service state
type Service struct {
state uint32
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

Suggested change
state uint32
state State

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case storing and loading state will be more complex:

* atomic.StoreUint32((*uint32)(&s.state), uint32(state))
* return State(atomic.LoadUint32((*uint32)(&s.state)))

IMO, better to leave uint32

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then maybe the naming of state here is inconsistent? s.state seems to be a key for the state, not a state? If so, it it's 1) not clearly named or documented, and 2) should be uint64, unless there's a very specific reason it must be a uint32.

}

// SetState allows to change service state
func (s *Service) SetState(state State) {
atomic.StoreUint32(&s.state, uint32(state))
}

// State returns service current state
func (s *Service) State() State {
return State(atomic.LoadUint32(&s.state))
}