Skip to content

Commit

Permalink
Simplify dependency registration (#3139)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph committed Jun 27, 2024
1 parent 318da00 commit 24e9952
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 70 deletions.
2 changes: 1 addition & 1 deletion snow/engine/snowman/issuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (i *issuer) Execute(ctx context.Context, _ []ids.ID, abandoned []ids.ID) er
// If the parent block was abandoned, this block should be abandoned as
// well.
blkID := i.blk.ID()
i.t.removeFromPending(i.blk)
delete(i.t.pending, blkID)
i.t.addToNonVerifieds(i.blk)
return i.t.blocked.Abandon(ctx, blkID)
}
128 changes: 59 additions & 69 deletions snow/engine/snowman/transitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (t *Transitive) Put(ctx context.Context, nodeID ids.NodeID, requestID uint3
// receive requests to fill the ancestry. dependencies that have already
// been fetched, but with missing dependencies themselves won't be requested
// from the vdr.
if _, err := t.issueFrom(ctx, nodeID, blk, issuedMetric); err != nil {
if err := t.issueFrom(ctx, nodeID, blk, issuedMetric); err != nil {
return err
}
return t.executeDeferredWork(ctx)
Expand Down Expand Up @@ -305,7 +305,7 @@ func (t *Transitive) PullQuery(ctx context.Context, nodeID ids.NodeID, requestID

// Try to issue [blkID] to consensus.
// If we're missing an ancestor, request it from [vdr]
if _, err := t.issueFromByID(ctx, nodeID, blkID, issuedMetric); err != nil {
if err := t.issueFromByID(ctx, nodeID, blkID, issuedMetric); err != nil {
return err
}

Expand Down Expand Up @@ -346,7 +346,7 @@ func (t *Transitive) PushQuery(ctx context.Context, nodeID ids.NodeID, requestID
// receive requests to fill the ancestry. dependencies that have already
// been fetched, but with missing dependencies themselves won't be requested
// from the vdr.
if _, err := t.issueFrom(ctx, nodeID, blk, issuedMetric); err != nil {
if err := t.issueFrom(ctx, nodeID, blk, issuedMetric); err != nil {
return err
}

Expand All @@ -365,25 +365,23 @@ func (t *Transitive) Chits(ctx context.Context, nodeID ids.NodeID, requestID uin
)

issuedMetric := t.metrics.issued.WithLabelValues(pullGossipSource)

addedPreferred, err := t.issueFromByID(ctx, nodeID, preferredID, issuedMetric)
if err != nil {
if err := t.issueFromByID(ctx, nodeID, preferredID, issuedMetric); err != nil {
return err
}

var (
addedPreferredIDAtHeight = addedPreferred
preferredIDAtHeightShouldBlock bool
// Invariant: The order of [responseOptions] must be [preferredID] then
// (optionally) [preferredIDAtHeight]. During vote application, the
// first vote that can be applied will be used. So, the votes should be
// populated in order of decreasing height.
responseOptions = []ids.ID{preferredID}
)
if preferredID != preferredIDAtHeight {
addedPreferredIDAtHeight, err = t.issueFromByID(ctx, nodeID, preferredIDAtHeight, issuedMetric)
if err != nil {
if err := t.issueFromByID(ctx, nodeID, preferredIDAtHeight, issuedMetric); err != nil {
return err
}
preferredIDAtHeightShouldBlock = t.canDependOn(preferredIDAtHeight)
responseOptions = append(responseOptions, preferredIDAtHeight)
}

Expand All @@ -399,10 +397,10 @@ func (t *Transitive) Chits(ctx context.Context, nodeID ids.NodeID, requestID uin
// Wait until [preferredID] and [preferredIDAtHeight] have been issued to
// consensus before applying this chit.
var deps []ids.ID
if !addedPreferred {
if t.canDependOn(preferredID) {
deps = append(deps, preferredID)
}
if !addedPreferredIDAtHeight {
if preferredIDAtHeightShouldBlock {
deps = append(deps, preferredIDAtHeight)
}

Expand Down Expand Up @@ -682,16 +680,18 @@ func (t *Transitive) buildBlocks(ctx context.Context) error {
}

issuedMetric := t.metrics.issued.WithLabelValues(builtSource)
added, err := t.issueWithAncestors(ctx, blk, issuedMetric)
if err != nil {
if err := t.issueWithAncestors(ctx, blk, issuedMetric); err != nil {
return err
}

// issuing the block shouldn't have any missing dependencies
if added {
// TODO: Technically this may incorrectly log a warning if the block
// that was just built caused votes to be applied such that the block
// was rejected or was accepted along with one of its children. This
// should be cleaned up to never produce an invalid warning.
if t.canIssueChildOn(blk.ID()) {
t.Ctx.Log.Verbo("successfully issued new block from the VM")
} else {
t.Ctx.Log.Warn("built block with unissued ancestors")
t.Ctx.Log.Warn("block that was just built is not extendable")
}
}
return nil
Expand All @@ -709,47 +709,46 @@ func (t *Transitive) repoll(ctx context.Context) {
}
}

// issueFromByID attempts to issue the branch ending with a block [blkID] into consensus.
// issueFromByID attempts to issue the branch ending with a block [blkID] into
// consensus.
// If we do not have [blkID], request it.
// Returns true if the block is processing in consensus or is decided.
func (t *Transitive) issueFromByID(
ctx context.Context,
nodeID ids.NodeID,
blkID ids.ID,
issuedMetric prometheus.Counter,
) (bool, error) {
) error {
blk, err := t.getBlock(ctx, blkID)
if err != nil {
t.sendRequest(ctx, nodeID, blkID, issuedMetric)
return false, nil
return nil
}
return t.issueFrom(ctx, nodeID, blk, issuedMetric)
}

// issueFrom attempts to issue the branch ending with block [blkID] to consensus.
// Returns true if the block is processing in consensus or is decided.
// If a dependency is missing, request it from [vdr].
// issueFrom attempts to issue the branch ending with block [blkID] to
// consensus.
// If a dependency is missing, it will be requested it from [nodeID].
func (t *Transitive) issueFrom(
ctx context.Context,
nodeID ids.NodeID,
blk snowman.Block,
issuedMetric prometheus.Counter,
) (bool, error) {
) error {
// issue [blk] and its ancestors to consensus.
blkID := blk.ID()
for !t.wasIssued(blk) {
if err := t.issue(ctx, nodeID, blk, false, issuedMetric); err != nil {
return false, err
err := t.issue(ctx, nodeID, blk, false, issuedMetric)
if err != nil {
return err
}

// If we don't have this ancestor, request it from [nodeID]
blkID = blk.Parent()
var err error
blk, err = t.getBlock(ctx, blkID)

// If we don't have this ancestor, request it from [vdr]
if err != nil || !blk.Status().Fetched() {
if err != nil {
t.sendRequest(ctx, nodeID, blkID, issuedMetric)
return false, nil
return nil
}
}

Expand All @@ -758,55 +757,45 @@ func (t *Transitive) issueFrom(
delete(t.blkReqSourceMetric, req)
}

if !t.isDecided(blk) && !t.Consensus.Processing(blkID) {
return false, nil
// If this block isn't pending, make sure nothing is blocked on it.
if _, isPending := t.pending[blkID]; !isPending {
return t.blocked.Abandon(ctx, blkID)
}

// A dependency should never be waiting on a decided or processing block.
// However, if the block was marked as rejected by the VM, the dependencies
// may still be waiting. Therefore, they should abandoned.
return true, t.blocked.Abandon(ctx, blkID)
return nil
}

// issueWithAncestors attempts to issue the branch ending with [blk] to consensus.
// Returns true if the block is processing in consensus or is decided.
// If a dependency is missing and the dependency hasn't been requested, the issuance will be abandoned.
// issueWithAncestors attempts to issue the branch ending with [blk] to
// consensus.
// If a dependency is missing and the dependency hasn't been requested, the
// issuance will be abandoned.
func (t *Transitive) issueWithAncestors(
ctx context.Context,
blk snowman.Block,
issuedMetric prometheus.Counter,
) (bool, error) {
) error {
blkID := blk.ID()
// issue [blk] and its ancestors into consensus
status := blk.Status()
for status.Fetched() && !t.wasIssued(blk) {
for !t.wasIssued(blk) {
err := t.issue(ctx, t.Ctx.NodeID, blk, true, issuedMetric)
if err != nil {
return false, err
return err
}
blkID = blk.Parent()
blk, err = t.getBlock(ctx, blkID)
if err != nil {
status = choices.Unknown
break
}
status = blk.Status()
}

// The block was issued into consensus. This is the happy path.
if status != choices.Unknown && (t.isDecided(blk) || t.Consensus.Processing(blkID)) {
return true, nil
}

// There's an outstanding request for this block.
// We can just wait for that request to succeed or fail.
// There's an outstanding request for this block. We can wait for that
// request to succeed or fail.
if t.blkReqs.HasValue(blkID) {
return false, nil
return nil
}

// We don't have this block and have no reason to expect that we will get it.
// Abandon the block to avoid a memory leak.
return false, t.blocked.Abandon(ctx, blkID)
// If the block wasn't already issued, we have no reason to expect that it
// will be able to be issued.
return t.blocked.Abandon(ctx, blkID)
}

// If the block has been decided, then it is marked as having been issued.
Expand Down Expand Up @@ -956,12 +945,10 @@ func (t *Transitive) deliver(
) error {
// we are no longer waiting on adding the block to consensus, so it is no
// longer pending
t.removeFromPending(blk)
blkID := blk.ID()
delete(t.pending, blkID)

var (
parentID = blk.Parent()
blkID = blk.ID()
)
parentID := blk.Parent()
if !t.canIssueChildOn(parentID) || t.Consensus.Processing(blkID) {
// If the parent isn't processing or the last accepted block, then this
// block is effectively rejected.
Expand Down Expand Up @@ -1026,7 +1013,7 @@ func (t *Transitive) deliver(
t.sendQuery(ctx, blkID, blk.Bytes(), push)
}

t.removeFromPending(blk)
delete(t.pending, blkID)
if err := t.blocked.Fulfill(ctx, blkID); err != nil {
return err
}
Expand All @@ -1036,7 +1023,7 @@ func (t *Transitive) deliver(
}
for _, blk := range dropped {
blkID := blk.ID()
t.removeFromPending(blk)
delete(t.pending, blkID)
if err := t.blocked.Abandon(ctx, blkID); err != nil {
return err
}
Expand All @@ -1063,10 +1050,6 @@ func (t *Transitive) pendingContains(blkID ids.ID) bool {
return ok
}

func (t *Transitive) removeFromPending(blk snowman.Block) {
delete(t.pending, blk.ID())
}

func (t *Transitive) addToNonVerifieds(blk snowman.Block) {
// don't add this blk if it's decided or processing.
blkID := blk.ID()
Expand Down Expand Up @@ -1176,6 +1159,13 @@ func (t *Transitive) getProcessingAncestor(ctx context.Context, initialVote ids.
}
}

// canDependOn reports true if it is guaranteed for the provided block ID to
// eventually either be fulfilled or abandoned.
func (t *Transitive) canDependOn(blkID ids.ID) bool {
_, isPending := t.pending[blkID]
return isPending || t.blkReqs.HasValue(blkID)
}

// canIssueChildOn reports true if it is valid for a child of parentID to be
// verified and added to consensus.
func (t *Transitive) canIssueChildOn(parentID ids.ID) bool {
Expand Down

0 comments on commit 24e9952

Please sign in to comment.