Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make message wait work, use it for storage miner init #103

Merged
merged 3 commits into from
Jul 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions chain/blocksync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"math/rand"
"sync"

exchange "github.com/ipfs/go-ipfs-exchange-interface"
bserv "github.com/ipfs/go-blockservice"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/protocol"

Expand Down Expand Up @@ -81,7 +81,6 @@ func NewBlockSyncService(cs *store.ChainStore) *BlockSyncService {

func (bss *BlockSyncService) HandleStream(s inet.Stream) {
defer s.Close()
log.Info("handling block sync request")

var req BlockSyncRequest
if err := cborrpc.ReadCborRPC(bufio.NewReader(s), &req); err != nil {
Expand Down Expand Up @@ -185,16 +184,16 @@ func (bss *BlockSyncService) gatherMessages(ts *types.TipSet) ([]*types.SignedMe
}

type BlockSync struct {
bswap exchange.Interface
bserv bserv.BlockService
newStream NewStreamFunc

syncPeersLk sync.Mutex
syncPeers map[peer.ID]struct{}
}

func NewBlockSyncClient(bswap exchange.Interface, h host.Host) *BlockSync {
func NewBlockSyncClient(bserv bserv.BlockService, h host.Host) *BlockSync {
return &BlockSync{
bswap: bswap,
bserv: bserv,
newStream: h.NewStream,
syncPeers: make(map[peer.ID]struct{}),
}
Expand Down Expand Up @@ -379,7 +378,7 @@ func cidArrsEqual(a, b []cid.Cid) bool {
}

func (bs *BlockSync) GetBlock(ctx context.Context, c cid.Cid) (*types.BlockHeader, error) {
sb, err := bs.bswap.GetBlock(ctx, c)
sb, err := bs.bserv.GetBlock(ctx, c)
if err != nil {
return nil, err
}
Expand All @@ -396,10 +395,7 @@ func (bs *BlockSync) AddPeer(p peer.ID) {
func (bs *BlockSync) FetchMessagesByCids(cids []cid.Cid) ([]*types.SignedMessage, error) {
out := make([]*types.SignedMessage, len(cids))

resp, err := bs.bswap.GetBlocks(context.TODO(), cids)
if err != nil {
return nil, err
}
resp := bs.bserv.GetBlocks(context.TODO(), cids)

m := make(map[cid.Cid]int)
for i, c := range cids {
Expand Down
34 changes: 30 additions & 4 deletions chain/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,31 @@ type ChainStore struct {
}

func NewChainStore(bs bstore.Blockstore, ds dstore.Batching) *ChainStore {
return &ChainStore{
cs := &ChainStore{
bs: bs,
ds: ds,
bestTips: pubsub.New(64),
}

hcnf := func(rev, app []*types.TipSet) error {
for _, r := range rev {
cs.bestTips.Pub(&HeadChange{
Type: HCRevert,
Val: r,
}, "headchange")
}
for _, r := range app {
cs.bestTips.Pub(&HeadChange{
Type: HCApply,
Val: r,
}, "headchange")
}
return nil
}

cs.headChangeNotifs = append(cs.headChangeNotifs, hcnf)

return cs
}

func (cs *ChainStore) Load() error {
Expand Down Expand Up @@ -157,7 +177,9 @@ func (cs *ChainStore) PutTipSet(ts *FullTipSet) error {
}
}

cs.MaybeTakeHeavierTipSet(ts.TipSet())
if err := cs.MaybeTakeHeavierTipSet(ts.TipSet()); err != nil {
return errors.Wrap(err, "MaybeTakeHeavierTipSet failed in PutTipSet")
}
return nil
}

Expand All @@ -170,7 +192,9 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ts *types.TipSet) error {
return err
}
for _, hcf := range cs.headChangeNotifs {
hcf(revert, apply)
if err := hcf(revert, apply); err != nil {
return errors.Wrap(err, "head change func errored (BAD)")
}
}
log.Infof("New heaviest tipset! %s", ts.Cids())
cs.heaviest = ts
Expand Down Expand Up @@ -327,7 +351,9 @@ func (cs *ChainStore) AddBlock(b *types.BlockHeader) error {
}

ts, _ := types.NewTipSet([]*types.BlockHeader{b})
cs.MaybeTakeHeavierTipSet(ts)
if err := cs.MaybeTakeHeavierTipSet(ts); err != nil {
return errors.Wrap(err, "MaybeTakeHeavierTipSet failed")
}

return nil
}
Expand Down
4 changes: 2 additions & 2 deletions chain/sub/incoming.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
for {
msg, err := bsub.Next(ctx)
if err != nil {
fmt.Println("error from block subscription: ", err)
log.Error("error from block subscription: ", err)
continue
}

Expand All @@ -33,7 +33,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
log.Errorf("failed to fetch all messages for block received over pubusb: %s", err)
return
}
fmt.Println("inform new block over pubsub")
log.Info("inform new block over pubsub")
s.InformNewBlock(msg.GetFrom(), &types.FullBlock{
Header: blk.Header,
Messages: msgs,
Expand Down
8 changes: 6 additions & 2 deletions chain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,9 @@ func (syncer *Syncer) SyncBootstrap() {

head := blockSet[len(blockSet)-1]
log.Errorf("Finished syncing! new head: %s", head.Cids())
syncer.store.MaybeTakeHeavierTipSet(selectedHead)
if err := syncer.store.MaybeTakeHeavierTipSet(selectedHead); err != nil {
log.Errorf("MaybeTakeHeavierTipSet failed: %s", err)
}
syncer.head = head
syncer.syncMode = CaughtUp
}
Expand Down Expand Up @@ -486,7 +488,9 @@ func (syncer *Syncer) SyncCaughtUp(maybeHead *store.FullTipSet) error {
return errors.Wrap(err, "validate tipset failed")
}

syncer.store.PutTipSet(ts)
if err := syncer.store.PutTipSet(ts); err != nil {
return errors.Wrap(err, "PutTipSet failed in SyncCaughtUp")
}
}

if err := syncer.store.PutTipSet(maybeHead); err != nil {
Expand Down
10 changes: 8 additions & 2 deletions chain/vm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,10 @@ func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*types.Mess
defer span.End()

st := vm.cstate
st.Snapshot()
if err := st.Snapshot(); err != nil {
return nil, xerrors.Errorf("snapshot failed: %w", err)
}

fromActor, err := st.GetActor(msg.From)
if err != nil {
return nil, xerrors.Errorf("from actor not found: %w", err)
Expand Down Expand Up @@ -253,7 +256,10 @@ func (vm *VM) ApplyMessage(ctx context.Context, msg *types.Message) (*types.Mess

if errcode = aerrors.RetCode(err); errcode != 0 {
// revert all state changes since snapshot
st.Revert()
if err := st.Revert(); err != nil {
return nil, xerrors.Errorf("revert state failed: %w", err)
}

gascost := types.BigMul(vmctx.GasUsed(), msg.GasPrice)
if err := DeductFunds(fromActor, gascost); err != nil {
panic("invariant violated: " + err.Error())
Expand Down
14 changes: 12 additions & 2 deletions cmd/lotus-storage-miner/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/filecoin-project/go-lotus/build"
"github.com/filecoin-project/go-lotus/chain/actors"
"github.com/filecoin-project/go-lotus/chain/address"
"github.com/filecoin-project/go-lotus/chain/types"
"github.com/filecoin-project/go-lotus/chain/wallet"
lcli "github.com/filecoin-project/go-lotus/cli"
Expand Down Expand Up @@ -155,9 +156,18 @@ var initCmd = &cli.Command{

log.Infof("Waiting for confirmation")

// TODO: Wait
mw, err := api.ChainWaitMsg(ctx, signed.Cid())
if err != nil {
return err
}

addr, err := address.NewFromBytes(mw.Receipt.Return)
if err != nil {
return err
}

// create actors and stuff
// TODO: persist this address in the storage-miner repo
log.Infof("New storage miners address is: %s", addr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the worker address from params, isn't it? (The one we generate few lines above - https://github.com/filecoin-project/go-lotus/pull/103/files#diff-9e20bebb13ea36c4a8ae7c31d606791bR104-R113)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its actually going to be the address created for the new miner actor, the worker address gets put inside the actors state, and that actors address gets returned to us.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right. This feels a bit weird, but I can't think of anything better right now


// TODO: Point to setting storage price, maybe do it interactively or something
log.Info("Storage miner successfully created, you can now start it with 'lotus-storage-miner run'")
Expand Down
2 changes: 2 additions & 0 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ipfs/go-filestore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"

bserv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
ipld "github.com/ipfs/go-ipld-format"
Expand Down Expand Up @@ -190,6 +191,7 @@ func Online() Option {
Override(new(blockstore.GCLocker), blockstore.NewGCLocker),
Override(new(blockstore.GCBlockstore), blockstore.NewGCBlockstore),
Override(new(exchange.Interface), modules.Bitswap),
Override(new(bserv.BlockService), bserv.New),
Override(new(ipld.DAGService), testing.MemoryClientDag),

// Filecoin services
Expand Down
13 changes: 12 additions & 1 deletion node/impl/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,15 @@ func (a *FullNodeAPI) ChainGetRandomness(ctx context.Context, pts *types.TipSet)
}

func (a *FullNodeAPI) ChainWaitMsg(ctx context.Context, msg cid.Cid) (*api.MsgWait, error) {
panic("TODO")
blkcid, recpt, err := a.Chain.WaitForMessage(ctx, msg)
if err != nil {
return nil, err
}

return &api.MsgWait{
InBlock: blkcid,
Receipt: *recpt,
}, nil
}

func (a *FullNodeAPI) ChainGetBlock(ctx context.Context, msg cid.Cid) (*types.BlockHeader, error) {
Expand All @@ -88,6 +96,9 @@ func (a *FullNodeAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage)
if err != nil {
return err
}
if err := a.Mpool.Add(smsg); err != nil {
return err
}

return a.PubSub.Publish("/fil/messages", msgb)
}
Expand Down