Skip to content

Commit

Permalink
feat: batch publish deal messages
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Jan 14, 2021
1 parent 858994a commit b4dd2ac
Show file tree
Hide file tree
Showing 14 changed files with 983 additions and 341 deletions.
300 changes: 156 additions & 144 deletions api/test/deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,103 +8,45 @@ import (
"math/rand"
"os"
"path/filepath"
"sync/atomic"
"testing"
"time"

"github.com/filecoin-project/go-state-types/abi"

"github.com/stretchr/testify/require"

"github.com/ipfs/go-cid"
files "github.com/ipfs/go-ipfs-files"
"github.com/ipld/go-car"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/storagemarket"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
"github.com/filecoin-project/lotus/chain/types"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/markets/storageadapter"
"github.com/filecoin-project/lotus/node"
"github.com/filecoin-project/lotus/node/config"
"github.com/filecoin-project/lotus/node/impl"
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
dstest "github.com/ipfs/go-merkledag/test"
unixfile "github.com/ipfs/go-unixfs/file"

"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/node/impl"
ipld "github.com/ipfs/go-ipld-format"
)

func TestDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, carExport, fastRet bool, startEpoch abi.ChainEpoch) {
s := setupOneClientOneMiner(t, b, blocktime)
defer s.blockMiner.Stop()

ctx := context.Background()
n, sn := b(t, OneFull, OneMiner)
client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0]

addrinfo, err := client.NetAddrsListen(ctx)
if err != nil {
t.Fatal(err)
}

if err := miner.NetConnect(ctx, addrinfo); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)

mine := int64(1)
done := make(chan struct{})
go func() {
defer close(done)
for atomic.LoadInt64(&mine) == 1 {
time.Sleep(blocktime)
if err := sn[0].MineOne(ctx, MineNext); err != nil {
t.Error(err)
}
}
}()

MakeDeal(t, ctx, 6, client, miner, carExport, fastRet, startEpoch)

atomic.AddInt64(&mine, -1)
fmt.Println("shutting down mining")
<-done
MakeDeal(t, s.ctx, 6, s.client, s.miner, carExport, fastRet, startEpoch)
}

func TestDoubleDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
s := setupOneClientOneMiner(t, b, blocktime)
defer s.blockMiner.Stop()

ctx := context.Background()
n, sn := b(t, OneFull, OneMiner)
client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0]

addrinfo, err := client.NetAddrsListen(ctx)
if err != nil {
t.Fatal(err)
}

if err := miner.NetConnect(ctx, addrinfo); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)

mine := int64(1)
done := make(chan struct{})

go func() {
defer close(done)
for atomic.LoadInt64(&mine) == 1 {
time.Sleep(blocktime)
if err := sn[0].MineOne(ctx, MineNext); err != nil {
t.Error(err)
}
}
}()

MakeDeal(t, ctx, 6, client, miner, false, false, startEpoch)
MakeDeal(t, ctx, 7, client, miner, false, false, startEpoch)

atomic.AddInt64(&mine, -1)
fmt.Println("shutting down mining")
<-done
MakeDeal(t, s.ctx, 6, s.client, s.miner, false, false, startEpoch)
MakeDeal(t, s.ctx, 7, s.client, s.miner, false, false, startEpoch)
}

func MakeDeal(t *testing.T, ctx context.Context, rseed int, client api.FullNode, miner TestStorageNode, carExport, fastRet bool, startEpoch abi.ChainEpoch) {
Expand Down Expand Up @@ -151,96 +93,133 @@ func CreateClientFile(ctx context.Context, client api.FullNode, rseed int) (*api
return res, data, nil
}

func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {

ctx := context.Background()
n, sn := b(t, OneFull, OneMiner)
func TestPublishDealsBatching(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
publishPeriod := 10 * time.Second
maxDealsPerMsg := uint64(2)

// Set max deals per publish deals message to 2
minerDef := []StorageMiner{{
Full: 0,
Opts: node.Override(
new(*storageadapter.DealPublisher),
storageadapter.NewDealPublisher(nil, &config.PublishMsgConfig{
PublishPeriod: config.Duration(publishPeriod),
MaxDealsPerMsg: maxDealsPerMsg,
})),
Preseal: PresealGenesis,
}}

// Create a connect client and miner node
n, sn := b(t, OneFull, minerDef)
client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0]
s := connectAndStartMining(t, b, blocktime, client, miner)
defer s.blockMiner.Stop()

addrinfo, err := client.NetAddrsListen(ctx)
if err != nil {
t.Fatal(err)
// Starts a deal and waits until it's published
runDealTillPublish := func(rseed int) {
res, _, err := CreateClientFile(s.ctx, s.client, rseed)
require.NoError(t, err)

upds, err := client.ClientGetDealUpdates(s.ctx)
require.NoError(t, err)

startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch)

// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)

done := make(chan struct{})
go func() {
for upd := range upds {
if upd.DataRef.Root == res.Root && upd.State == storagemarket.StorageDealAwaitingPreCommit {
done <- struct{}{}
}
}
}()
<-done
}

if err := miner.NetConnect(ctx, addrinfo); err != nil {
t.Fatal(err)
// Run three deals in parallel
done := make(chan struct{}, maxDealsPerMsg+1)
for rseed := 1; rseed <= 3; rseed++ {
rseed := rseed
go func() {
runDealTillPublish(rseed)
done <- struct{}{}
}()
}
time.Sleep(time.Second)

mine := int64(1)
done := make(chan struct{})
go func() {
defer close(done)
for atomic.LoadInt64(&mine) == 1 {
time.Sleep(blocktime)
if err := sn[0].MineOne(ctx, MineNext); err != nil {
t.Error(err)
}
// Wait for two of the deals to be published
for i := 0; i < int(maxDealsPerMsg); i++ {
<-done
}

// Expect a single PublishStorageDeals message that includes the first two deals
msgCids, err := s.client.StateListMessages(s.ctx, &api.MessageMatch{To: market.Address}, types.EmptyTSK, 1)
require.NoError(t, err)
count := 0
for _, msgCid := range msgCids {
msg, err := s.client.ChainGetMessage(s.ctx, msgCid)
require.NoError(t, err)

if msg.Method == market.Methods.PublishStorageDeals {
count++
var pubDealsParams market2.PublishStorageDealsParams
err = pubDealsParams.UnmarshalCBOR(bytes.NewReader(msg.Params))
require.NoError(t, err)
require.Len(t, pubDealsParams.Deals, int(maxDealsPerMsg))
}
}()
}
require.Equal(t, 1, count)

// The third deal should be published once the publish period expires.
// Allow a little padding as it takes a moment for the state change to
// be noticed by the client.
padding := 10 * time.Second
select {
case <-time.After(publishPeriod + padding):
require.Fail(t, "Expected 3rd deal to be published once publish period elapsed")
case <-done: // Success
}
}

func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
s := setupOneClientOneMiner(t, b, blocktime)
defer s.blockMiner.Stop()

data := make([]byte, 1600)
rand.New(rand.NewSource(int64(8))).Read(data)

r := bytes.NewReader(data)
fcid, err := client.ClientImportLocal(ctx, r)
fcid, err := s.client.ClientImportLocal(s.ctx, r)
if err != nil {
t.Fatal(err)
}

fmt.Println("FILE CID: ", fcid)

deal := startDeal(t, ctx, miner, client, fcid, true, startEpoch)
deal := startDeal(t, s.ctx, s.miner, s.client, fcid, true, startEpoch)

waitDealPublished(t, ctx, miner, deal)
waitDealPublished(t, s.ctx, s.miner, deal)
fmt.Println("deal published, retrieving")
// Retrieval
info, err := client.ClientGetDealInfo(ctx, *deal)
info, err := s.client.ClientGetDealInfo(s.ctx, *deal)
require.NoError(t, err)

testRetrieval(t, ctx, client, fcid, &info.PieceCID, false, data)
atomic.AddInt64(&mine, -1)
fmt.Println("shutting down mining")
<-done
testRetrieval(t, s.ctx, s.client, fcid, &info.PieceCID, false, data)
}

func TestSecondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration) {

ctx := context.Background()
n, sn := b(t, OneFull, OneMiner)
client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0]

addrinfo, err := client.NetAddrsListen(ctx)
if err != nil {
t.Fatal(err)
}

if err := miner.NetConnect(ctx, addrinfo); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)

mine := int64(1)
done := make(chan struct{})

go func() {
defer close(done)
for atomic.LoadInt64(&mine) == 1 {
time.Sleep(blocktime)
if err := sn[0].MineOne(ctx, MineNext); err != nil {
t.Error(err)
}
}
}()
s := setupOneClientOneMiner(t, b, blocktime)
defer s.blockMiner.Stop()

{
data1 := make([]byte, 800)
rand.New(rand.NewSource(int64(3))).Read(data1)
r := bytes.NewReader(data1)

fcid1, err := client.ClientImportLocal(ctx, r)
fcid1, err := s.client.ClientImportLocal(s.ctx, r)
if err != nil {
t.Fatal(err)
}
Expand All @@ -249,35 +228,31 @@ func TestSecondDealRetrieval(t *testing.T, b APIBuilder, blocktime time.Duration
rand.New(rand.NewSource(int64(9))).Read(data2)
r2 := bytes.NewReader(data2)

fcid2, err := client.ClientImportLocal(ctx, r2)
fcid2, err := s.client.ClientImportLocal(s.ctx, r2)
if err != nil {
t.Fatal(err)
}

deal1 := startDeal(t, ctx, miner, client, fcid1, true, 0)
deal1 := startDeal(t, s.ctx, s.miner, s.client, fcid1, true, 0)

// TODO: this sleep is only necessary because deals don't immediately get logged in the dealstore, we should fix this
time.Sleep(time.Second)
waitDealSealed(t, ctx, miner, client, deal1, true)
waitDealSealed(t, s.ctx, s.miner, s.client, deal1, true)

deal2 := startDeal(t, ctx, miner, client, fcid2, true, 0)
deal2 := startDeal(t, s.ctx, s.miner, s.client, fcid2, true, 0)

time.Sleep(time.Second)
waitDealSealed(t, ctx, miner, client, deal2, false)
waitDealSealed(t, s.ctx, s.miner, s.client, deal2, false)

// Retrieval
info, err := client.ClientGetDealInfo(ctx, *deal2)
info, err := s.client.ClientGetDealInfo(s.ctx, *deal2)
require.NoError(t, err)

rf, _ := miner.SectorsRefs(ctx)
rf, _ := s.miner.SectorsRefs(s.ctx)
fmt.Printf("refs: %+v\n", rf)

testRetrieval(t, ctx, client, fcid2, &info.PieceCID, false, data2)
testRetrieval(t, s.ctx, s.client, fcid2, &info.PieceCID, false, data2)
}

atomic.AddInt64(&mine, -1)
fmt.Println("shutting down mining")
<-done
}

func startDeal(t *testing.T, ctx context.Context, miner TestStorageNode, client api.FullNode, fcid cid.Cid, fastRet bool, startEpoch abi.ChainEpoch) *cid.Cid {
Expand Down Expand Up @@ -459,3 +434,40 @@ func extractCarData(t *testing.T, ctx context.Context, rdata []byte, rpath strin
}
return rdata
}

type dealsScaffold struct {
ctx context.Context
client *impl.FullNodeAPI
miner TestStorageNode
blockMiner *BlockMiner
}

func setupOneClientOneMiner(t *testing.T, b APIBuilder, blocktime time.Duration) *dealsScaffold {
n, sn := b(t, OneFull, OneMiner)
client := n[0].FullNode.(*impl.FullNodeAPI)
miner := sn[0]
return connectAndStartMining(t, b, blocktime, client, miner)
}

func connectAndStartMining(t *testing.T, b APIBuilder, blocktime time.Duration, client *impl.FullNodeAPI, miner TestStorageNode) *dealsScaffold {
ctx := context.Background()
addrinfo, err := client.NetAddrsListen(ctx)
if err != nil {
t.Fatal(err)
}

if err := miner.NetConnect(ctx, addrinfo); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)

blockMiner := NewBlockMiner(ctx, t, miner, blocktime)
blockMiner.MineBlocks()

return &dealsScaffold{
ctx: ctx,
client: client,
miner: miner,
blockMiner: blockMiner,
}
}
Loading

0 comments on commit b4dd2ac

Please sign in to comment.