Skip to content

Commit

Permalink
feat(sharedcounter): persist counter to disk (#125)
Browse files Browse the repository at this point in the history
make a shared counter util for persisting a unique deal id counter to disk
  • Loading branch information
hannahhoward authored Feb 27, 2020
1 parent 5ee324c commit 9b5564b
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 43 deletions.
25 changes: 13 additions & 12 deletions retrievalmarket/impl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,18 @@ import (
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/blockio"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/clientstates"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
"github.com/filecoin-project/go-fil-markets/storedcounter"
"github.com/filecoin-project/go-statemachine/fsm"
"github.com/filecoin-project/specs-actors/actors/abi"
)

var log = logging.Logger("retrieval")

type client struct {
network rmnet.RetrievalMarketNetwork
bs blockstore.Blockstore
node retrievalmarket.RetrievalClientNode
// The parameters should be replaced by RetrievalClientNode

nextDealLk sync.RWMutex
nextDealID retrievalmarket.DealID
network rmnet.RetrievalMarketNetwork
bs blockstore.Blockstore
node retrievalmarket.RetrievalClientNode
storedCounter *storedcounter.StoredCounter

subscribersLk sync.RWMutex
subscribers []retrievalmarket.ClientSubscriber
Expand All @@ -55,12 +53,14 @@ func NewClient(
node retrievalmarket.RetrievalClientNode,
resolver retrievalmarket.PeerResolver,
ds datastore.Batching,
storedCounter *storedcounter.StoredCounter,
) (retrievalmarket.RetrievalClient, error) {
c := &client{
network: network,
bs: bs,
node: node,
resolver: resolver,
storedCounter: storedCounter,
dealStreams: make(map[retrievalmarket.DealID]rmnet.RetrievalDealStream),
blockVerifiers: make(map[retrievalmarket.DealID]blockio.BlockVerifier),
}
Expand Down Expand Up @@ -111,10 +111,11 @@ func (c *client) Query(ctx context.Context, p retrievalmarket.RetrievalPeer, pay

// Retrieve begins the process of requesting the data referred to by payloadCID, after a deal is accepted
func (c *client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrievalmarket.Params, totalFunds abi.TokenAmount, miner peer.ID, clientWallet address.Address, minerWallet address.Address) (retrievalmarket.DealID, error) {
c.nextDealLk.Lock()
c.nextDealID++
dealID := c.nextDealID
c.nextDealLk.Unlock()
next, err := c.storedCounter.Next()
if err != nil {
return 0, err
}
dealID := retrievalmarket.DealID(next)

dealState := retrievalmarket.ClientDealState{
DealProposal: retrievalmarket.DealProposal{
Expand All @@ -135,7 +136,7 @@ func (c *client) Retrieve(ctx context.Context, payloadCID cid.Cid, params retrie
}

// start the deal processing
err := c.stateMachines.Begin(dealState.ID, &dealState)
err = c.stateMachines.Begin(dealState.ID, &dealState)
if err != nil {
return 0, err
}
Expand Down
38 changes: 29 additions & 9 deletions retrievalmarket/impl/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import (
"github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
rmnet "github.com/filecoin-project/go-fil-markets/retrievalmarket/network"
tut "github.com/filecoin-project/go-fil-markets/shared_testutil"
"github.com/filecoin-project/go-fil-markets/storedcounter"
"github.com/filecoin-project/specs-actors/actors/abi"
)

func TestClient_Query(t *testing.T) {
ctx := context.Background()

ds := dss.MutexWrap(datastore.NewMapDatastore())
storedCounter := storedcounter.New(ds, datastore.NewKey("nextDealID"))
bs := bstore.NewBlockstore(ds)

pcid := tut.GenerateCids(1)[0]
Expand Down Expand Up @@ -59,7 +61,13 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: tut.ExpectPeerOnQueryStreamBuilder(t, expectedPeer, qsb, "Peers should match"),
})
c, err := retrievalimpl.NewClient(net, bs, testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), &testPeerResolver{}, ds)
c, err := retrievalimpl.NewClient(
net,
bs,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
&testPeerResolver{},
ds,
storedCounter)
require.NoError(t, err)

resp, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
Expand All @@ -72,8 +80,13 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: tut.FailNewQueryStream,
})
c, err := retrievalimpl.NewClient(net, bs,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), &testPeerResolver{}, ds)
c, err := retrievalimpl.NewClient(
net,
bs,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
&testPeerResolver{},
ds,
storedCounter)
require.NoError(t, err)

_, err = c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
Expand All @@ -93,8 +106,13 @@ func TestClient_Query(t *testing.T) {
net := tut.NewTestRetrievalMarketNetwork(tut.TestNetworkParams{
QueryStreamBuilder: qsbuilder,
})
c, err := retrievalimpl.NewClient(net, bs,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}), &testPeerResolver{}, ds)
c, err := retrievalimpl.NewClient(
net,
bs,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
&testPeerResolver{},
ds,
storedCounter)
require.NoError(t, err)

statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
Expand All @@ -118,7 +136,8 @@ func TestClient_Query(t *testing.T) {
bs,
testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{}),
&testPeerResolver{},
ds)
ds,
storedCounter)
require.NoError(t, err)

statusCode, err := c.Query(ctx, rpeer, pcid, retrievalmarket.QueryParams{})
Expand All @@ -129,6 +148,7 @@ func TestClient_Query(t *testing.T) {

func TestClient_FindProviders(t *testing.T) {
ds := dss.MutexWrap(datastore.NewMapDatastore())
storedCounter := storedcounter.New(ds, datastore.NewKey("nextDealID"))
bs := bstore.NewBlockstore(ds)
expectedPeer := peer.ID("somevalue")

Expand All @@ -146,7 +166,7 @@ func TestClient_FindProviders(t *testing.T) {
peers := tut.RequireGenerateRetrievalPeers(t, 3)
testResolver := testPeerResolver{peers: peers}

c, err := retrievalimpl.NewClient(net, bs, &testnodes.TestRetrievalClientNode{}, &testResolver, ds)
c, err := retrievalimpl.NewClient(net, bs, &testnodes.TestRetrievalClientNode{}, &testResolver, ds, storedCounter)
require.NoError(t, err)

testCid := tut.GenerateCids(1)[0]
Expand All @@ -155,7 +175,7 @@ func TestClient_FindProviders(t *testing.T) {

t.Run("when there is an error, returns empty provider list", func(t *testing.T) {
testResolver := testPeerResolver{peers: []retrievalmarket.RetrievalPeer{}, resolverError: errors.New("boom")}
c, err := retrievalimpl.NewClient(net, bs, &testnodes.TestRetrievalClientNode{}, &testResolver, ds)
c, err := retrievalimpl.NewClient(net, bs, &testnodes.TestRetrievalClientNode{}, &testResolver, ds, storedCounter)
require.NoError(t, err)

badCid := tut.GenerateCids(1)[0]
Expand All @@ -164,7 +184,7 @@ func TestClient_FindProviders(t *testing.T) {

t.Run("when there are no providers", func(t *testing.T) {
testResolver := testPeerResolver{peers: []retrievalmarket.RetrievalPeer{}}
c, err := retrievalimpl.NewClient(net, bs, &testnodes.TestRetrievalClientNode{}, &testResolver, ds)
c, err := retrievalimpl.NewClient(net, bs, &testnodes.TestRetrievalClientNode{}, &testResolver, ds, storedCounter)
require.NoError(t, err)

testCid := tut.GenerateCids(1)[0]
Expand Down
6 changes: 3 additions & 3 deletions retrievalmarket/impl/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func requireSetupTestClientAndProvider(bgCtx context.Context, t *testing.T, payC
testData := tut.NewLibp2pTestData(bgCtx, t)
nw1 := rmnet.NewFromLibp2pHost(testData.Host1)
rcNode1 := testnodes.NewTestRetrievalClientNode(testnodes.TestRetrievalClientNodeParams{PayCh: payChAddr})
client, err := retrievalimpl.NewClient(nw1, testData.Bs1, rcNode1, &testPeerResolver{}, testData.Ds1)
client, err := retrievalimpl.NewClient(nw1, testData.Bs1, rcNode1, &testPeerResolver{}, testData.Ds1, testData.StoredCounter1)
require.NoError(t, err)
nw2 := rmnet.NewFromLibp2pHost(testData.Host2)
providerNode := testnodes.NewTestRetrievalProviderNode()
Expand Down Expand Up @@ -293,7 +293,7 @@ CurrentInterval: %d

// *** Retrieve the piece
did, err := client.Retrieve(bgCtx, payloadCID, rmParams, expectedTotal, retrievalPeer.ID, clientPaymentChannel, retrievalPeer.Address)
assert.Equal(t, did, retrievalmarket.DealID(1))
assert.Equal(t, did, retrievalmarket.DealID(0))
require.NoError(t, err)

ctx, cancel := context.WithTimeout(bgCtx, 10*time.Second)
Expand Down Expand Up @@ -365,7 +365,7 @@ func setupClient(
AllocateLaneRecorder: laneRecorder,
PaymentVoucherRecorder: paymentVoucherRecorder,
})
client, err := retrievalimpl.NewClient(nw1, testData.Bs1, clientNode, &testPeerResolver{}, testData.Ds1)
client, err := retrievalimpl.NewClient(nw1, testData.Bs1, clientNode, &testPeerResolver{}, testData.Ds1, testData.StoredCounter1)
return &createdChan, &newLaneAddr, &createdVoucher, client, err
}

Expand Down
45 changes: 26 additions & 19 deletions shared_testutil/mocknet.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"path/filepath"
"testing"

"github.com/filecoin-project/go-fil-markets/storedcounter"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -38,25 +39,27 @@ import (
)

type Libp2pTestData struct {
Ctx context.Context
Ds1 datastore.Batching
Ds2 datastore.Batching
Bs1 bstore.Blockstore
Bs2 bstore.Blockstore
DagService1 ipldformat.DAGService
DagService2 ipldformat.DAGService
GraphSync1 graphsync.GraphExchange
GraphSync2 graphsync.GraphExchange
Loader1 ipld.Loader
Loader2 ipld.Loader
Storer1 ipld.Storer
Storer2 ipld.Storer
Host1 host.Host
Host2 host.Host
Bridge1 ipldbridge.IPLDBridge
Bridge2 ipldbridge.IPLDBridge
AllSelector ipld.Node
OrigBytes []byte
Ctx context.Context
Ds1 datastore.Batching
Ds2 datastore.Batching
StoredCounter1 *storedcounter.StoredCounter
StoredCounter2 *storedcounter.StoredCounter
Bs1 bstore.Blockstore
Bs2 bstore.Blockstore
DagService1 ipldformat.DAGService
DagService2 ipldformat.DAGService
GraphSync1 graphsync.GraphExchange
GraphSync2 graphsync.GraphExchange
Loader1 ipld.Loader
Loader2 ipld.Loader
Storer1 ipld.Storer
Storer2 ipld.Storer
Host1 host.Host
Host2 host.Host
Bridge1 ipldbridge.IPLDBridge
Bridge2 ipldbridge.IPLDBridge
AllSelector ipld.Node
OrigBytes []byte
}

func NewLibp2pTestData(ctx context.Context, t *testing.T) *Libp2pTestData {
Expand Down Expand Up @@ -96,6 +99,10 @@ func NewLibp2pTestData(ctx context.Context, t *testing.T) *Libp2pTestData {
}
testData.Ds1 = dss.MutexWrap(datastore.NewMapDatastore())
testData.Ds2 = dss.MutexWrap(datastore.NewMapDatastore())

testData.StoredCounter1 = storedcounter.New(testData.Ds1, datastore.NewKey("nextDealID"))
testData.StoredCounter2 = storedcounter.New(testData.Ds2, datastore.NewKey("nextDealID"))

// make a bstore and dag service
testData.Bs1 = bstore.NewBlockstore(testData.Ds1)
testData.Bs2 = bstore.NewBlockstore(testData.Ds2)
Expand Down
41 changes: 41 additions & 0 deletions storedcounter/storedcounter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package storedcounter

import (
"encoding/binary"

"github.com/ipfs/go-datastore"
)

// StoredCounter is a counter that persists to a datastore as it increments
type StoredCounter struct {
ds datastore.Datastore
name datastore.Key
}

// New returns a new StoredCounter for the given datastore and key
func New(ds datastore.Datastore, name datastore.Key) *StoredCounter {
return &StoredCounter{ds, name}
}

// Next returns the next counter value, updating it on disk in the process
// if no counter is present, it creates one and returns a 0 value
func (sc *StoredCounter) Next() (uint64, error) {
has, err := sc.ds.Has(sc.name)
if err != nil {
return 0, err
}

var next uint64 = 0
if has {
curBytes, err := sc.ds.Get(sc.name)
if err != nil {
return 0, err
}
cur, _ := binary.Uvarint(curBytes)
next = cur + 1
}
buf := make([]byte, binary.MaxVarintLen64)
size := binary.PutUvarint(buf, next)

return next, sc.ds.Put(sc.name, buf[:size])
}
61 changes: 61 additions & 0 deletions storedcounter/storedcounter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package storedcounter_test

import (
"testing"

"github.com/ipfs/go-datastore"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-fil-markets/storedcounter"
)


func TestStoredCounter(t * testing.T) {
ds := datastore.NewMapDatastore()

t.Run("test two instances with same data store and key count together", func(t *testing.T) {
key := datastore.NewKey("counter")
sc1 := storedcounter.New(ds, key)
next, err := sc1.Next()
require.NoError(t, err)
require.Equal(t, next, uint64(0))

sc2 := storedcounter.New(ds, key)
next, err = sc2.Next()
require.NoError(t, err)
require.Equal(t, next, uint64(1))

next, err = sc1.Next()
require.NoError(t, err)
require.Equal(t, next, uint64(2))

next, err = sc2.Next()
require.NoError(t, err)
require.Equal(t, next, uint64(3))
})


t.Run("test two instances with same data store but different keys count seperate", func(t *testing.T) {

key1 := datastore.NewKey("counter 1")
key2 := datastore.NewKey("counter 2")

sc1 := storedcounter.New(ds, key1)
next, err := sc1.Next()
require.NoError(t, err)
require.Equal(t, next, uint64(0))

sc2 := storedcounter.New(ds, key2)
next, err = sc2.Next()
require.NoError(t, err)
require.Equal(t, next, uint64(0))

next, err = sc1.Next()
require.NoError(t, err)
require.Equal(t, next, uint64(1))

next, err = sc2.Next()
require.NoError(t, err)
require.Equal(t, next, uint64(1))
})
}

0 comments on commit 9b5564b

Please sign in to comment.