Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable govet fieldalignment #2856

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ issues:
- path: _test\.go
linters:
- gosec
- govet
- linters:
- lll
source: "https://"
Expand All @@ -66,3 +67,6 @@ linters-settings:
local-prefixes: github.com/celestiaorg/celestia-node
dupl:
threshold: 200
govet:
enable:
- fieldalignment
13 changes: 7 additions & 6 deletions api/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ import (
var log = logging.Logger("rpc")

type Server struct {
srv *http.Server
rpc *jsonrpc.RPCServer
listener net.Listener
authDisabled bool

started atomic.Bool
listener net.Listener

auth jwt.Signer

srv *http.Server
rpc *jsonrpc.RPCServer

started atomic.Bool
authDisabled bool
}

func NewServer(address, port string, authDisabled bool, secret jwt.Signer) *Server {
Expand Down
7 changes: 3 additions & 4 deletions blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,12 @@ func (p Proof) equal(input Proof) error {

// Blob represents any application-specific binary data that anyone can submit to Celestia.
type Blob struct {
types.Blob `json:"blob"`

Commitment Commitment `json:"commitment"`

// the celestia-node's namespace type
// this is to avoid converting to and from app's type
namespace share.Namespace
namespace share.Namespace
types.Blob `json:"blob"`

// index represents the index of the blob's first share in the EDS.
// Only retrieved, on-chain blobs will have the index set. Default is -1.
Expand Down Expand Up @@ -129,8 +128,8 @@ func (b *Blob) compareCommitments(com Commitment) bool {
type jsonBlob struct {
Namespace share.Namespace `json:"namespace"`
Data []byte `json:"data"`
ShareVersion uint32 `json:"share_version"`
Commitment Commitment `json:"commitment"`
ShareVersion uint32 `json:"share_version"`
Index int `json:"index"`
}

Expand Down
2 changes: 1 addition & 1 deletion blob/blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ func TestBlob(t *testing.T) {
require.NoError(t, err)

var test = []struct {
name string
expectedRes func(t *testing.T)
name string
}{
{
name: "new blob",
Expand Down
4 changes: 2 additions & 2 deletions blob/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (
// * shares needed to build the blob;
// * extra condition to verify the final blob.
type parser struct {
verifyFn func(blob *Blob) bool
shares []shares.Share
index int
length int
shares []shares.Share
verifyFn func(blob *Blob) bool
}

// NOTE: passing shares here needed to detect padding shares(as we do not need this check in addShares)
Expand Down
2 changes: 1 addition & 1 deletion blob/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ func TestBlobService_Get(t *testing.T) {

service := createService(ctx, t, append(blobs0, blobs1...))
var test = []struct {
name string
doFn func() (interface{}, error)
expectedResult func(interface{}, error)
name string
}{
{
name: "get single blob",
Expand Down
9 changes: 5 additions & 4 deletions core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,21 @@ var (
// broadcasts the new `ExtendedHeader` to the header-sub gossipsub
// network.
type Listener struct {
fetcher *BlockFetcher
headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader]
fetcher *BlockFetcher

construct header.ConstructFn
store *eds.Store

headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader]
hashBroadcaster shrexsub.BroadcastFn
hashBroadcaster shrexsub.BroadcastFn

metrics *listenerMetrics

cancel context.CancelFunc

chainID string

listenerTimeout time.Duration
cancel context.CancelFunc
}

func NewListener(
Expand Down
3 changes: 1 addition & 2 deletions core/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import "github.com/celestiaorg/celestia-node/nodebuilder/p2p"
type Option func(*params)

type params struct {
metrics bool

chainID string
metrics bool
}

// WithMetrics is a functional option that enables metrics
Expand Down
8 changes: 4 additions & 4 deletions das/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ import (
)

type checkpoint struct {
SampleFrom uint64 `json:"sample_from"`
NetworkHead uint64 `json:"network_head"`
// Failed heights will be retried
Failed map[uint64]int `json:"failed,omitempty"`
// Workers will resume on restart from previous state
Workers []workerCheckpoint `json:"workers,omitempty"`
Workers []workerCheckpoint `json:"workers,omitempty"`
SampleFrom uint64 `json:"sample_from"`
NetworkHead uint64 `json:"network_head"`
}

// workerCheckpoint will be used to resume worker on restart
type workerCheckpoint struct {
JobType jobType `json:"job_type"`
From uint64 `json:"from"`
To uint64 `json:"to"`
JobType jobType `json:"job_type"`
}

func newCheckpoint(stats SamplingStats) checkpoint {
Expand Down
19 changes: 10 additions & 9 deletions das/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,33 @@ import (

// samplingCoordinator runs and coordinates sampling workers and updates current sampling state
type samplingCoordinator struct {
concurrencyLimit int
samplingTimeout time.Duration

getter libhead.Getter[*header.ExtendedHeader]
sampleFn sampleFn
broadcastFn shrexsub.BroadcastFn

state coordinatorState

// resultCh fans-in sampling results from worker to coordinator
resultCh chan result
// updHeadCh signals to update network head header height
updHeadCh chan *header.ExtendedHeader
// waitCh signals to block coordinator for external access to state
waitCh chan *sync.WaitGroup

workersWg sync.WaitGroup
metrics *metrics
metrics *metrics

done

state coordinatorState

workersWg sync.WaitGroup
concurrencyLimit int
samplingTimeout time.Duration
}

// result will carry errors to coordinator after worker finishes the job
type result struct {
job
failed map[uint64]int
err error
failed map[uint64]int
job
}

func newSamplingCoordinator(
Expand Down
13 changes: 7 additions & 6 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,21 @@ var log = logging.Logger("das")

// DASer continuously validates availability of data committed to headers.
type DASer struct {
params Parameters

da share.Availability
bcast fraud.Broadcaster[*header.ExtendedHeader]
hsub libhead.Subscriber[*header.ExtendedHeader] // listens for new headers in the network
getter libhead.Getter[*header.ExtendedHeader] // retrieves past headers

sampler *samplingCoordinator
store checkpointStore
subscriber subscriber
sampler *samplingCoordinator

cancel context.CancelFunc
subscriberDone chan struct{}
running int32
store checkpointStore
subscriber subscriber

params Parameters

running int32
}

type listenFn func(context.Context, *header.ExtendedHeader)
Expand Down
2 changes: 1 addition & 1 deletion das/done.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
)

type done struct {
name string
finished chan struct{}
name string
}

func newDone(name string) done {
Expand Down
22 changes: 12 additions & 10 deletions das/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,26 @@ import (

// coordinatorState represents the current state of sampling process
type coordinatorState struct {
// sampleFrom is the height from which the DASer will start sampling
sampleFrom uint64
// samplingRange is the maximum amount of headers processed in one job.
samplingRange uint64

// keeps track of running workers
inProgress map[int]func() workerState

// retryStrategy implements retry backoff
retryStrategy retryStrategy
// stores heights of failed headers with amount of retry attempt as value
failed map[uint64]retryAttempt
// inRetry stores (height -> attempt count) of failed headers that are currently being retried by
// workers
inRetry map[uint64]retryAttempt

// catchUpDoneCh blocks until all headers are sampled
catchUpDoneCh chan struct{}

// retryStrategy implements retry backoff
retryStrategy retryStrategy
// sampleFrom is the height from which the DASer will start sampling
sampleFrom uint64
// samplingRange is the maximum amount of headers processed in one job.
samplingRange uint64

// nextJobID is a unique identifier that will be used for creation of next job
nextJobID int
// all headers before next were sent to workers
Expand All @@ -35,16 +39,14 @@ type coordinatorState struct {

// catchUpDone indicates if all headers are sampled
catchUpDone atomic.Bool
// catchUpDoneCh blocks until all headers are sampled
catchUpDoneCh chan struct{}
}

// retryAttempt represents a retry attempt with a backoff delay.
type retryAttempt struct {
// count specifies the number of retry attempts made so far.
count int
// after specifies the time for the next retry attempt.
after time.Time
// count specifies the number of retry attempts made so far.
count int
}

// newCoordinatorState initiates state for samplingCoordinator
Expand Down
14 changes: 7 additions & 7 deletions das/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ package das

// SamplingStats collects information about the DASer process.
type SamplingStats struct {
// Failed contains all skipped headers heights with corresponding try count
Failed map[uint64]int `json:"failed,omitempty"`
// Workers has information about each currently running worker stats
Workers []WorkerStats `json:"workers,omitempty"`
// all headers before SampledChainHead were successfully sampled
SampledChainHead uint64 `json:"head_of_sampled_chain"`
// all headers before CatchupHead were submitted to sampling workers. They could be either already
// sampled, failed or still in progress. For in progress items check Workers stat.
CatchupHead uint64 `json:"head_of_catchup"`
// NetworkHead is the height of the most recent header in the network
NetworkHead uint64 `json:"network_head_height"`
// Failed contains all skipped headers heights with corresponding try count
Failed map[uint64]int `json:"failed,omitempty"`
// Workers has information about each currently running worker stats
Workers []WorkerStats `json:"workers,omitempty"`
// Concurrency amount of currently running parallel workers
Concurrency int `json:"concurrency"`
// CatchUpDone indicates whether all known headers are sampled
Expand All @@ -23,11 +23,11 @@ type SamplingStats struct {

type WorkerStats struct {
JobType jobType `json:"job_type"`
Curr uint64 `json:"current"`
From uint64 `json:"from"`
To uint64 `json:"to"`

ErrMsg string `json:"error,omitempty"`
Curr uint64 `json:"current"`
From uint64 `json:"from"`
To uint64 `json:"to"`
}

// totalSampled returns the total amount of sampled headers
Expand Down
14 changes: 7 additions & 7 deletions das/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ const (
)

type worker struct {
lock sync.Mutex
state workerState

getter libhead.Getter[*header.ExtendedHeader]
sampleFn sampleFn
broadcast shrexsub.BroadcastFn
metrics *metrics
state workerState

lock sync.Mutex
}

// workerState contains important information about the state of a
Expand All @@ -41,13 +41,13 @@ type jobType string

// job represents headers interval to be processed by worker
type job struct {
id int

// header is set only for recentJobs, avoiding an unnecessary call to the header store
header *header.ExtendedHeader
jobType jobType
id int
from uint64
to uint64

// header is set only for recentJobs, avoiding an unnecessary call to the header store
header *header.ExtendedHeader
}

func newWorker(j job,
Expand Down
6 changes: 3 additions & 3 deletions header/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ type RawHeader = core.Header
// information necessary for Celestia Nodes to be notified of new
// block headers and perform Data Availability Sampling.
type ExtendedHeader struct {
RawHeader `json:"header"`
Commit *core.Commit `json:"commit"`
ValidatorSet *core.ValidatorSet `json:"validator_set"`
DAH *da.DataAvailabilityHeader `json:"dah"`
RawHeader `json:"header"`
}

// MakeExtendedHeader assembles new ExtendedHeader.
Expand Down Expand Up @@ -239,9 +239,9 @@ func (eh *ExtendedHeader) MarshalJSON() ([]byte, error) {
return nil, err
}
return json.Marshal(&struct {
*Alias
RawHeader json.RawMessage `json:"header"`
ValidatorSet json.RawMessage `json:"validator_set"`
*Alias
}{
ValidatorSet: validatorSet,
RawHeader: rawHeader,
Expand All @@ -254,9 +254,9 @@ func (eh *ExtendedHeader) MarshalJSON() ([]byte, error) {
func (eh *ExtendedHeader) UnmarshalJSON(data []byte) error {
type Alias ExtendedHeader
aux := &struct {
*Alias
RawHeader json.RawMessage `json:"header"`
ValidatorSet json.RawMessage `json:"validator_set"`
*Alias
}{
Alias: (*Alias)(eh),
}
Expand Down
Loading
Loading