Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor chain manager subnets #2711

Merged
merged 10 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 17 additions & 55 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ var (
// Bootstrapping prefixes for ChainVMs
ChainBootstrappingDBPrefix = []byte("bs")

errUnknownVMType = errors.New("the vm should have type avalanche.DAGVM or snowman.ChainVM")
errCreatePlatformVM = errors.New("attempted to create a chain running the PlatformVM")
errNotBootstrapped = errors.New("subnets not bootstrapped")
errNoPrimaryNetworkConfig = errors.New("no subnet config for primary network found")
errPartialSyncAsAValidator = errors.New("partial sync should not be configured for a validator")
errUnknownVMType = errors.New("the vm should have type avalanche.DAGVM or snowman.ChainVM")
errCreatePlatformVM = errors.New("attempted to create a chain running the PlatformVM")
errNotBootstrapped = errors.New("subnets not bootstrapped")
errPrimaryNetworkNotRunning = errors.New("node is not running the primary network")
errPartialSyncAsAValidator = errors.New("partial sync should not be configured for a validator")

fxs = map[ids.ID]fx.Factory{
secp256k1fx.ID: &secp256k1fx.Factory{},
Expand Down Expand Up @@ -233,6 +233,8 @@ type ManagerConfig struct {
StateSyncBeacons []ids.NodeID

ChainDataDir string

Subnets *Subnets
}

type manager struct {
Expand All @@ -256,10 +258,7 @@ type manager struct {
chainCreatorShutdownCh chan struct{}
chainCreatorExited sync.WaitGroup

subnetsLock sync.RWMutex
// Key: Subnet's ID
// Value: Subnet description
subnets map[ids.ID]subnets.Subnet
subnets *Subnets
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved

chainsLock sync.Mutex
// Key: Chain's ID
Expand All @@ -277,7 +276,6 @@ func New(config *ManagerConfig) Manager {
ManagerConfig: *config,
stakingSigner: config.StakingTLSCert.PrivateKey.(crypto.Signer),
stakingCert: staking.CertificateFromX509(config.StakingTLSCert.Leaf),
subnets: make(map[ids.ID]subnets.Subnet),
chains: make(map[ids.ID]handler.Handler),
chainsQueue: buffer.NewUnboundedBlockingDeque[ChainParameters](initialQueueSize),
unblockChainCreatorCh: make(chan struct{}),
Expand All @@ -288,25 +286,11 @@ func New(config *ManagerConfig) Manager {
// QueueChainCreation queues a chain creation request
// Invariant: Tracked Subnet must be checked before calling this function
func (m *manager) QueueChainCreation(chainParams ChainParameters) {
m.subnetsLock.Lock()
subnetID := chainParams.SubnetID
sb, exists := m.subnets[subnetID]
if !exists {
sbConfig, ok := m.SubnetConfigs[subnetID]
if !ok {
// default to primary subnet config
sbConfig = m.SubnetConfigs[constants.PrimaryNetworkID]
}
sb = subnets.New(m.NodeID, sbConfig)
m.subnets[chainParams.SubnetID] = sb
}
addedChain := sb.AddChain(chainParams.ID)
m.subnetsLock.Unlock()

if !addedChain {
_ = m.subnets.Add(chainParams.SubnetID)
if sb, _ := m.subnets.Get(chainParams.SubnetID); !sb.AddChain(chainParams.ID) {
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
m.Log.Debug("skipping chain creation",
zap.String("reason", "chain already staged"),
zap.Stringer("subnetID", subnetID),
zap.Stringer("subnetID", chainParams.SubnetID),
zap.Stringer("chainID", chainParams.ID),
zap.Stringer("vmID", chainParams.VMID),
)
Expand All @@ -316,7 +300,7 @@ func (m *manager) QueueChainCreation(chainParams ChainParameters) {
if ok := m.chainsQueue.PushRight(chainParams); !ok {
m.Log.Warn("skipping chain creation",
zap.String("reason", "couldn't enqueue chain"),
zap.Stringer("subnetID", subnetID),
zap.Stringer("subnetID", chainParams.SubnetID),
zap.Stringer("chainID", chainParams.ID),
zap.Stringer("vmID", chainParams.VMID),
)
Expand All @@ -334,9 +318,7 @@ func (m *manager) createChain(chainParams ChainParameters) {
zap.Stringer("vmID", chainParams.VMID),
)

m.subnetsLock.RLock()
sb := m.subnets[chainParams.SubnetID]
m.subnetsLock.RUnlock()
sb, _ := m.subnets.Get(chainParams.SubnetID)

// Note: buildChain builds all chain's relevant objects (notably engine and handler)
// but does not start their operations. Starting of the handler (which could potentially
Expand Down Expand Up @@ -1307,23 +1289,9 @@ func (m *manager) IsBootstrapped(id ids.ID) bool {
return chain.Context().State.Get().State == snow.NormalOp
}

func (m *manager) subnetsNotBootstrapped() []ids.ID {
m.subnetsLock.RLock()
defer m.subnetsLock.RUnlock()

subnetsBootstrapping := make([]ids.ID, 0, len(m.subnets))
for subnetID, subnet := range m.subnets {
if !subnet.IsBootstrapped() {
subnetsBootstrapping = append(subnetsBootstrapping, subnetID)
}
}
return subnetsBootstrapping
}

func (m *manager) registerBootstrappedHealthChecks() error {
bootstrappedCheck := health.CheckerFunc(func(context.Context) (interface{}, error) {
subnetIDs := m.subnetsNotBootstrapped()
if len(subnetIDs) != 0 {
if subnetIDs := m.subnets.Bootstrapping(); len(subnetIDs) != 0 {
return subnetIDs, errNotBootstrapped
}
return []ids.ID{}, nil
Expand Down Expand Up @@ -1365,18 +1333,12 @@ func (m *manager) registerBootstrappedHealthChecks() error {

// Starts chain creation loop to process queued chains
func (m *manager) StartChainCreator(platformParams ChainParameters) error {
// Get the Primary Network's subnet config. If it wasn't registered, then we
// throw a fatal error.
sbConfig, ok := m.SubnetConfigs[constants.PrimaryNetworkID]
// Add the P-Chain to the Primary Network
sb, ok := m.subnets.Get(constants.PrimaryNetworkID)
if !ok {
return errNoPrimaryNetworkConfig
return errPrimaryNetworkNotRunning
}

sb := subnets.New(m.NodeID, sbConfig)
m.subnetsLock.Lock()
m.subnets[platformParams.SubnetID] = sb
sb.AddChain(platformParams.ID)
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
m.subnetsLock.Unlock()

// The P-chain is created synchronously to ensure that `VM.Initialize` has
// finished before returning from this function. This is required because
Expand Down
90 changes: 90 additions & 0 deletions chains/subnets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (C) 2019-2024, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package chains

import (
"errors"
"sync"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/subnets"
"github.com/ava-labs/avalanchego/utils/constants"
)

var ErrNoPrimaryNetworkConfig = errors.New("no subnet config for primary network found")

// Subnets holds the currently running subnets on this node
type Subnets struct {
nodeID ids.NodeID
configs map[ids.ID]subnets.Config

lock sync.Mutex
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
subnets map[ids.ID]subnets.Subnet
}
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved

// Add a subnet that is being run on this node. Returns if the node was added
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
// or not.
func (s *Subnets) Add(subnetID ids.ID) bool {
s.lock.Lock()
defer s.lock.Unlock()

if _, ok := s.subnets[subnetID]; ok {
return false
}

// Default to the primary network config if a subnet config was not
// specified
config, ok := s.configs[subnetID]
if !ok {
config = s.configs[constants.PrimaryNetworkID]
}

s.subnets[subnetID] = subnets.New(s.nodeID, config)
return true
}

// Get returns a subnet if it is being run on this node. Returns the subnet
// if it was present.
func (s *Subnets) Get(subnetID ids.ID) (subnets.Subnet, bool) {
s.lock.Lock()
defer s.lock.Unlock()

subnet, ok := s.subnets[subnetID]
return subnet, ok
}

// Bootstrapping returns the subnetIDs of any chains that are still
// bootstrapping.
func (s *Subnets) Bootstrapping() []ids.ID {
s.lock.Lock()
defer s.lock.Unlock()

subnetsBootstrapping := make([]ids.ID, 0, len(s.subnets))
for subnetID, subnet := range s.subnets {
if !subnet.IsBootstrapped() {
subnetsBootstrapping = append(subnetsBootstrapping, subnetID)
}
}

return subnetsBootstrapping
}

// NewSubnets returns an instance of Subnets
func NewSubnets(
nodeID ids.NodeID,
configs map[ids.ID]subnets.Config,
) (*Subnets, error) {
if _, ok := configs[constants.PrimaryNetworkID]; !ok {
return nil, ErrNoPrimaryNetworkConfig
}

s := &Subnets{
nodeID: nodeID,
configs: configs,
subnets: make(map[ids.ID]subnets.Subnet),
}

_ = s.Add(constants.PrimaryNetworkID)
return s, nil
}
Loading
Loading