Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: function to get available block range for chain-sync #242

Merged
merged 1 commit into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions cmd/gouroboros/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ import (
var oConn *ouroboros.Ouroboros

type chainSyncFlags struct {
flagset *flag.FlagSet
startEra string
tip bool
bulk bool
flagset *flag.FlagSet
startEra string
tip bool
bulk bool
blockRange bool
}

func newChainSyncFlags() *chainSyncFlags {
Expand All @@ -29,6 +30,7 @@ func newChainSyncFlags() *chainSyncFlags {
f.flagset.StringVar(&f.startEra, "start-era", "genesis", "era which to start chain-sync at")
f.flagset.BoolVar(&f.tip, "tip", false, "start chain-sync at current chain tip")
f.flagset.BoolVar(&f.bulk, "bulk", false, "use bulk chain-sync mode with NtN")
f.flagset.BoolVar(&f.blockRange, "range", false, "show start/end block of range")
return f
}

Expand All @@ -54,7 +56,7 @@ var eraIntersect = map[string]map[string][]interface{}{
"mainnet": map[string][]interface{}{
"genesis": []interface{}{},
// Chain genesis, but explicit
"byron": []interface{}{0, "89d9b5a5b8ddc8d7e5a6795e9774d97faf1efea59b2caf7eaf9f8c5b32059df4"},
"byron": []interface{}{},
// Last block of epoch 207 (Byron era)
"shelley": []interface{}{4492799, "f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457"},
// Last block of epoch 235 (Shelley era)
Expand Down Expand Up @@ -157,18 +159,27 @@ func testChainSync(f *globalFlags) {
} else {
point = common.NewPointOrigin()
}
if !f.ntnProto || !chainSyncFlags.bulk {
if chainSyncFlags.blockRange {
start, end, err := oConn.ChainSync().Client.GetAvailableBlockRange([]common.Point{point})
if err != nil {
fmt.Printf("ERROR: failed to get available block range: %s\n", err)
os.Exit(1)
}
fmt.Printf("Start: slot %d, hash %x\n", start.Slot, start.Hash)
fmt.Printf("End (tip): slot %d, hash %x\n", end.Slot, end.Hash)
return
} else if !f.ntnProto || !chainSyncFlags.bulk {
if err := oConn.ChainSync().Client.Sync([]common.Point{point}); err != nil {
fmt.Printf("ERROR: failed to start chain-sync: %s\n", err)
os.Exit(1)
}
} else {
tip, err := oConn.ChainSync().Client.GetCurrentTip()
start, end, err := oConn.ChainSync().Client.GetAvailableBlockRange([]common.Point{point})
if err != nil {
fmt.Printf("ERROR: failed to get chain tip: %s\n", err)
fmt.Printf("ERROR: failed to get available block range: %s\n", err)
os.Exit(1)
}
if err := oConn.BlockFetch().Client.GetBlockRange(point, tip.Point); err != nil {
if err := oConn.BlockFetch().Client.GetBlockRange(start, end); err != nil {
fmt.Printf("ERROR: failed to request block range: %s\n", err)
os.Exit(1)
}
Expand Down
115 changes: 93 additions & 22 deletions protocol/chainsync/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package chainsync

import (
"encoding/hex"
"fmt"
"sync"

Expand All @@ -18,6 +19,8 @@ type Client struct {
readyForNextBlockChan chan bool
wantCurrentTip bool
currentTipChan chan Tip
wantFirstBlock bool
firstBlockChan chan common.Point
}

// NewClient returns a new ChainSync client object
Expand All @@ -39,6 +42,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
intersectResultChan: make(chan error),
readyForNextBlockChan: make(chan bool),
currentTipChan: make(chan Tip),
firstBlockChan: make(chan common.Point),
}
// Update state map with timeouts
stateMap := StateMap.Copy()
Expand Down Expand Up @@ -72,6 +76,7 @@ func NewClient(protoOptions protocol.ProtocolOptions, cfg *Config) *Client {
close(c.intersectResultChan)
close(c.readyForNextBlockChan)
close(c.currentTipChan)
close(c.firstBlockChan)
}()
return c
}
Expand Down Expand Up @@ -120,6 +125,51 @@ func (c *Client) GetCurrentTip() (*Tip, error) {
return &tip, nil
}

// GetAvailableBlockRange returns the start and end of the range of available blocks given the provided intersect
// point(s).
func (c *Client) GetAvailableBlockRange(intersectPoints []common.Point) (common.Point, common.Point, error) {
c.busyMutex.Lock()
defer c.busyMutex.Unlock()
c.wantCurrentTip = true
c.wantFirstBlock = true
var start, end common.Point
msgFindIntersect := NewMsgFindIntersect(intersectPoints)
if err := c.SendMessage(msgFindIntersect); err != nil {
return start, end, err
}
select {
case tip := <-c.currentTipChan:
end = tip.Point
// Clear out intersect result channel to prevent blocking
<-c.intersectResultChan
case err := <-c.intersectResultChan:
return start, end, err
}
c.wantCurrentTip = false
// Request the next block. This should result in a rollback
msgRequestNext := NewMsgRequestNext()
if err := c.SendMessage(msgRequestNext); err != nil {
return start, end, err
}
for {
select {
case point := <-c.firstBlockChan:
start = point
c.wantFirstBlock = false
case <-c.readyForNextBlockChan:
// Request the next block
msg := NewMsgRequestNext()
if err := c.SendMessage(msg); err != nil {
return start, end, err
}
}
if !c.wantFirstBlock {
break
}
}
return start, end, nil
}

// Sync begins a chain-sync operation using the provided intersect point(s). Incoming blocks will be delivered
// via the RollForward callback function specified in the protocol config
func (c *Client) Sync(intersectPoints []common.Point) error {
Expand Down Expand Up @@ -172,13 +222,13 @@ func (c *Client) handleAwaitReply() error {
}

func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
if c.config.RollForwardFunc == nil {
if c.config.RollForwardFunc == nil && !c.wantFirstBlock {
return fmt.Errorf("received chain-sync RollForward message but no callback function is defined")
}
var callbackErr error
if c.Mode() == protocol.ProtocolModeNodeToNode {
msg := msgGeneric.(*MsgRollForwardNtN)
var blockHeader interface{}
var blockHeader ledger.BlockHeader
var blockType uint
blockEra := msg.WrappedHeader.Era
switch blockEra {
Expand All @@ -205,6 +255,15 @@ func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
return err
}
}
if c.wantFirstBlock {
blockHash, err := hex.DecodeString(blockHeader.Hash())
if err != nil {
return err
}
point := common.NewPoint(blockHeader.SlotNumber(), blockHash)
c.firstBlockChan <- point
return nil
}
// Call the user callback function
callbackErr = c.config.RollForwardFunc(blockType, blockHeader, msg.Tip)
} else {
Expand All @@ -213,32 +272,46 @@ func (c *Client) handleRollForward(msgGeneric protocol.Message) error {
if err != nil {
return err
}
if c.wantFirstBlock {
blockHash, err := hex.DecodeString(blk.Hash())
if err != nil {
return err
}
point := common.NewPoint(blk.SlotNumber(), blockHash)
c.firstBlockChan <- point
return nil
}
// Call the user callback function
callbackErr = c.config.RollForwardFunc(msg.BlockType(), blk, msg.Tip)
}
if callbackErr == StopSyncProcessError {
// Signal that we're cancelling the sync
c.readyForNextBlockChan <- false
if callbackErr != nil {
if callbackErr == StopSyncProcessError {
// Signal that we're cancelling the sync
c.readyForNextBlockChan <- false
} else {
return callbackErr
}
}
// Signal that we're ready for the next block
c.readyForNextBlockChan <- true
return nil
}

func (c *Client) handleRollBackward(msgGeneric protocol.Message) error {
if c.config.RollBackwardFunc == nil {
return fmt.Errorf("received chain-sync RollBackward message but no callback function is defined")
}
msg := msgGeneric.(*MsgRollBackward)
// Signal that we're ready for the next block after we finish handling the rollback
defer func() {
c.readyForNextBlockChan <- true
}()
// Call the user callback function
callbackErr := c.config.RollBackwardFunc(msg.Point, msg.Tip)
if callbackErr == StopSyncProcessError {
// Signal that we're cancelling the sync
c.readyForNextBlockChan <- false
if !c.wantFirstBlock {
if c.config.RollBackwardFunc == nil {
return fmt.Errorf("received chain-sync RollBackward message but no callback function is defined")
}
msg := msgGeneric.(*MsgRollBackward)
// Call the user callback function
if callbackErr := c.config.RollBackwardFunc(msg.Point, msg.Tip); callbackErr != nil {
if callbackErr == StopSyncProcessError {
// Signal that we're cancelling the sync
c.readyForNextBlockChan <- false
} else {
return callbackErr
}
}
}
// Signal that we're ready for the next block
c.readyForNextBlockChan <- true
Expand All @@ -249,18 +322,16 @@ func (c *Client) handleIntersectFound(msgGeneric protocol.Message) error {
if c.wantCurrentTip {
msgIntersectFound := msgGeneric.(*MsgIntersectFound)
c.currentTipChan <- msgIntersectFound.Tip
} else {
c.intersectResultChan <- nil
}
c.intersectResultChan <- nil
return nil
}

func (c *Client) handleIntersectNotFound(msgGeneric protocol.Message) error {
if c.wantCurrentTip {
msgIntersectNotFound := msgGeneric.(*MsgIntersectNotFound)
c.currentTipChan <- msgIntersectNotFound.Tip
} else {
c.intersectResultChan <- IntersectNotFoundError{}
}
c.intersectResultChan <- IntersectNotFoundError{}
return nil
}