Skip to content

Commit

Permalink
Merge pull request #93 from filecoin-project/fix/catch-up-many
Browse files Browse the repository at this point in the history
Single mode sync
  • Loading branch information
whyrusleeping authored Jul 31, 2019
2 parents e3538af + a8b434a commit 785199c
Show file tree
Hide file tree
Showing 20 changed files with 598 additions and 358 deletions.
4 changes: 1 addition & 3 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ type FullNode interface {

MpoolPending(context.Context, *types.TipSet) ([]*types.SignedMessage, error)
MpoolPush(context.Context, *types.SignedMessage) error
MpoolGetNonce(context.Context, address.Address) (uint64, error)

// FullNodeStruct

Expand All @@ -99,9 +100,6 @@ type FullNode interface {
WalletSign(context.Context, address.Address, []byte) (*types.Signature, error)
WalletDefaultAddress(context.Context) (address.Address, error)

// Really not sure where this belongs. It could go on the wallet, or the message pool, or the chain...
MpoolGetNonce(context.Context, address.Address) (uint64, error)

// Other

// ClientImport imports file under the specified path into filestore
Expand Down
53 changes: 32 additions & 21 deletions chain/blocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
bserv "github.com/ipfs/go-blockservice"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/protocol"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
Expand Down Expand Up @@ -87,7 +88,7 @@ func (bss *BlockSyncService) HandleStream(s inet.Stream) {
log.Errorf("failed to read block sync request: %s", err)
return
}
log.Errorf("block sync request for: %s %d", req.Start, req.RequestLength)
log.Infof("block sync request for: %s %d", req.Start, req.RequestLength)

resp, err := bss.processRequest(&req)
if err != nil {
Expand Down Expand Up @@ -128,19 +129,16 @@ func (bss *BlockSyncService) collectChainSegment(start []cid.Cid, length uint64,
}

if opts.IncludeMessages {
log.Error("INCLUDING MESSAGES IN SYNC RESPONSE")
msgs, mincl, err := bss.gatherMessages(ts)
if err != nil {
return nil, err
}
log.Errorf("messages: ", msgs)

bst.Messages = msgs
bst.MsgIncludes = mincl
}

if opts.IncludeBlocks {
log.Error("INCLUDING BLOCKS IN SYNC RESPONSE")
bst.Blocks = ts.Blocks()
}

Expand All @@ -164,7 +162,7 @@ func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.SignedMe
if err != nil {
return nil, nil, err
}
log.Errorf("MESSAGES FOR BLOCK: %d", len(msgs))
log.Infof("MESSAGES FOR BLOCK: %d", len(msgs))

msgindexes := make([]int, 0, len(msgs))
for _, m := range msgs {
Expand Down Expand Up @@ -209,22 +207,7 @@ func (bs *BlockSync) getPeers() []peer.ID {
return out
}

func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) ([]*types.TipSet, error) {
peers := bs.getPeers()
perm := rand.Perm(len(peers))
// TODO: round robin through these peers on error

req := &BlockSyncRequest{
Start: tipset,
RequestLength: uint64(count),
Options: BSOptBlocks,
}

res, err := bs.sendRequestToPeer(ctx, peers[perm[0]], req)
if err != nil {
return nil, err
}

func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse) ([]*types.TipSet, error) {
switch res.Status {
case 0: // Success
return bs.processBlocksResponse(req, res)
Expand All @@ -241,6 +224,34 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int)
}
}

func (bs *BlockSync) GetBlocks(ctx context.Context, tipset []cid.Cid, count int) ([]*types.TipSet, error) {
peers := bs.getPeers()
perm := rand.Perm(len(peers))
// TODO: round robin through these peers on error

req := &BlockSyncRequest{
Start: tipset,
RequestLength: uint64(count),
Options: BSOptBlocks,
}

var err error
var res *BlockSyncResponse
for _, p := range perm {
res, err = bs.sendRequestToPeer(ctx, peers[p], req)
if err != nil {
log.Warnf("BlockSync request failed for peer %s: %s", peers[p].String(), err)
continue
}

ts, err := bs.processStatus(req, res)
if err == nil {
return ts, nil
}
}
return nil, xerrors.Errorf("GetBlocks failed with all peers: %w", err)
}

func (bs *BlockSync) GetFullTipSet(ctx context.Context, p peer.ID, h []cid.Cid) (*store.FullTipSet, error) {
// TODO: round robin through these peers on error

Expand Down
123 changes: 110 additions & 13 deletions chain/gen/gen.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
package gen

import (
"bytes"
"context"
"sync/atomic"

"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-car"
offline "github.com/ipfs/go-ipfs-exchange-offline"
"github.com/ipfs/go-merkledag"

"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/store"
Expand All @@ -17,6 +24,8 @@ import (

var log = logging.Logger("gen")

const msgsPerBlock = 20

type ChainGen struct {
accounts []address.Address

Expand All @@ -26,11 +35,18 @@ type ChainGen struct {

cs *store.ChainStore

genesis *types.BlockHeader

genesis *types.BlockHeader
curBlock *types.FullBlock

miner address.Address
w *wallet.Wallet

miner address.Address
receivers []address.Address
banker address.Address
bankerNonce uint64

r repo.Repo
lr repo.LockedRepo
}

type mybs struct {
Expand All @@ -40,26 +56,32 @@ type mybs struct {
func (m mybs) Get(c cid.Cid) (block.Block, error) {
b, err := m.Blockstore.Get(c)
if err != nil {
log.Errorf("Get failed: %s %s", c, err)
// change to error for stacktraces, don't commit with that pls
log.Warnf("Get failed: %s %s", c, err)
return nil, err
}

return b, nil
}

func NewGenerator() (*ChainGen, error) {

mr := repo.NewMemory(nil)
lr, err := mr.Lock()
if err != nil {
return nil, err
}

ds, err := lr.Datastore("/blocks")
ds, err := lr.Datastore("/metadata")
if err != nil {
return nil, err
}

bds, err := lr.Datastore("/blocks")
if err != nil {
return nil, err
}
bs := mybs{blockstore.NewBlockstore(ds)}

bs := mybs{blockstore.NewIdStore(blockstore.NewBlockstore(bds))}

ks, err := lr.KeyStore()
if err != nil {
Expand All @@ -76,12 +98,22 @@ func NewGenerator() (*ChainGen, error) {
return nil, err
}

banker, err := w.GenerateKey(types.KTBLS)
// KTBLS doesn't support signature verification or something like that yet
banker, err := w.GenerateKey(types.KTSecp256k1)
if err != nil {
return nil, err
}

receievers := make([]address.Address, msgsPerBlock)
for r := range receievers {
receievers[r], err = w.GenerateKey(types.KTBLS)
if err != nil {
return nil, err
}
}

genb, err := MakeGenesisBlock(bs, map[address.Address]types.BigInt{
miner: types.NewInt(5),
banker: types.NewInt(90000000),
})
if err != nil {
Expand All @@ -90,8 +122,6 @@ func NewGenerator() (*ChainGen, error) {

cs := store.NewChainStore(bs, ds)

msgsPerBlock := 10

genfb := &types.FullBlock{Header: genb.Genesis}

if err := cs.SetGenesis(genb.Genesis); err != nil {
Expand All @@ -103,8 +133,16 @@ func NewGenerator() (*ChainGen, error) {
cs: cs,
msgsPerBlock: msgsPerBlock,
genesis: genb.Genesis,
miner: miner,
curBlock: genfb,
w: w,

miner: miner,
banker: banker,
receivers: receievers,

curBlock: genfb,

r: mr,
lr: lr,
}

return gen, nil
Expand All @@ -114,6 +152,20 @@ func (cg *ChainGen) Genesis() *types.BlockHeader {
return cg.genesis
}

func (cg *ChainGen) GenesisCar() ([]byte, error) {
offl := offline.Exchange(cg.bs)
blkserv := blockservice.New(cg.bs, offl)
dserv := merkledag.NewDAGService(blkserv)

out := new(bytes.Buffer)

if err := car.WriteCar(context.TODO(), dserv, []cid.Cid{cg.Genesis().Cid()}, out); err != nil {
return nil, err
}

return out.Bytes(), nil
}

func (cg *ChainGen) nextBlockProof() (address.Address, types.ElectionProof, []types.Ticket, error) {
return cg.miner, []byte("cat in a box"), []types.Ticket{types.Ticket("im a ticket, promise")}, nil
}
Expand All @@ -124,7 +176,45 @@ func (cg *ChainGen) NextBlock() (*types.FullBlock, error) {
return nil, err
}

var msgs []*types.SignedMessage
// make some transfers from banker

msgs := make([]*types.SignedMessage, cg.msgsPerBlock)
for m := range msgs {
msg := types.Message{
To: cg.receivers[m],
From: cg.banker,

Nonce: atomic.AddUint64(&cg.bankerNonce, 1) - 1,

Value: types.NewInt(uint64(m + 1)),

Method: 0,

GasLimit: types.NewInt(10000),
GasPrice: types.NewInt(0),
}

unsigned, err := msg.Serialize()
if err != nil {
return nil, err
}

sig, err := cg.w.Sign(cg.banker, unsigned)
if err != nil {
return &types.FullBlock{}, err
}

msgs[m] = &types.SignedMessage{
Message: msg,
Signature: *sig,
}

if _, err := cg.cs.PutMessage(msgs[m]); err != nil {
return nil, err
}
}

// create block

parents, err := types.NewTipSet([]*types.BlockHeader{cg.curBlock.Header})
if err != nil {
Expand All @@ -144,3 +234,10 @@ func (cg *ChainGen) NextBlock() (*types.FullBlock, error) {

return fblk, nil
}

func (cg *ChainGen) YieldRepo() (repo.Repo, error) {
if err := cg.lr.Close(); err != nil {
return nil, err
}
return cg.r, nil
}
5 changes: 3 additions & 2 deletions chain/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/store"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/pkg/errors"
)

type MessagePool struct {
Expand Down Expand Up @@ -128,7 +129,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
for _, b := range ts.Blocks() {
msgs, err := mp.cs.MessagesForBlock(b)
if err != nil {
return err
return errors.Wrapf(err, "failed to get messages for revert block %s(height %d)", b.Cid(), b.Height)
}
for _, msg := range msgs {
if err := mp.Add(msg); err != nil {
Expand All @@ -142,7 +143,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
for _, b := range ts.Blocks() {
msgs, err := mp.cs.MessagesForBlock(b)
if err != nil {
return err
return errors.Wrapf(err, "failed to get messages for apply block %s(height %d) (msgroot = %s)", b.Cid(), b.Height, b.Messages)
}
for _, msg := range msgs {
mp.Remove(msg)
Expand Down
2 changes: 1 addition & 1 deletion chain/state/statetree.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewStateTree(cst *hamt.CborIpldStore) (*StateTree, error) {
func LoadStateTree(cst *hamt.CborIpldStore, c cid.Cid) (*StateTree, error) {
nd, err := hamt.LoadNode(context.Background(), cst, c)
if err != nil {
log.Errorf("loading hamt node failed: %s", err)
log.Errorf("loading hamt node %s failed: %s", c, err)
return nil, err
}

Expand Down
Loading

0 comments on commit 785199c

Please sign in to comment.