Skip to content

Commit

Permalink
Persist retrieval ask on disk (#410)
Browse files Browse the repository at this point in the history
* Bugfix: Store datastore key in the storage ask repository

* Add an AskStore interface/impl for retrievalmarket

* Use AskStore in provider impl
  • Loading branch information
ingar authored Sep 23, 2020
1 parent ccb2cbf commit c99bb3c
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 30 deletions.
18 changes: 18 additions & 0 deletions retrievalmarket/defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package retrievalmarket

import "github.com/filecoin-project/go-state-types/abi"

// DefaultPricePerByte is the charge per byte retrieved if the miner does
// not specifically set it
var DefaultPricePerByte = abi.NewTokenAmount(2)

// DefaultUnsealPrice is the default charge to unseal a sector for retrieval
var DefaultUnsealPrice = abi.NewTokenAmount(0)

// DefaultPaymentInterval is the baseline interval, set to 1Mb
// if the miner does not explicitly set it otherwise
var DefaultPaymentInterval = uint64(1 << 20)

// DefaultPaymentIntervalIncrease is the amount interval increases on each payment,
// set to to 1Mb if the miner does not explicitly set it otherwise
var DefaultPaymentIntervalIncrease = uint64(1 << 20)
116 changes: 116 additions & 0 deletions retrievalmarket/impl/askstore/askstore_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package askstore

import (
"bytes"
"sync"

"github.com/ipfs/go-datastore"
"golang.org/x/xerrors"

cborutil "github.com/filecoin-project/go-cbor-util"

"github.com/filecoin-project/go-fil-markets/retrievalmarket"
)

// AskStoreImpl implements AskStore, persisting a retrieval Ask
// to disk. It also maintains a cache of the current Ask in memory
type AskStoreImpl struct {
lk sync.RWMutex
ask *retrievalmarket.Ask
ds datastore.Batching
key datastore.Key
}

// NewAskStore returns a new instance of AskStoreImpl
// It will initialize a new default ask and store it if one is not set.
// Otherwise it loads the current Ask from disk
func NewAskStore(ds datastore.Batching, key datastore.Key) (*AskStoreImpl, error) {
s := &AskStoreImpl{
ds: ds,
key: key,
}

if err := s.tryLoadAsk(); err != nil {
return nil, err
}

if s.ask == nil {
// for now set a default retrieval ask
defaultAsk := &retrievalmarket.Ask{
PricePerByte: retrievalmarket.DefaultPricePerByte,
UnsealPrice: retrievalmarket.DefaultUnsealPrice,
PaymentInterval: retrievalmarket.DefaultPaymentInterval,
PaymentIntervalIncrease: retrievalmarket.DefaultPaymentIntervalIncrease,
}

if err := s.SetAsk(defaultAsk); err != nil {
return nil, xerrors.Errorf("failed setting a default retrieval ask: %w", err)
}
}
return s, nil
}

// SetAsk stores retrieval provider's ask
func (s *AskStoreImpl) SetAsk(ask *retrievalmarket.Ask) error {
s.lk.Lock()
defer s.lk.Unlock()

return s.saveAsk(ask)
}

// GetAsk returns the current retrieval ask, or nil if one does not exist.
func (s *AskStoreImpl) GetAsk() *retrievalmarket.Ask {
s.lk.RLock()
defer s.lk.RUnlock()
if s.ask == nil {
return nil
}
ask := *s.ask
return &ask
}

func (s *AskStoreImpl) tryLoadAsk() error {
s.lk.Lock()
defer s.lk.Unlock()

err := s.loadAsk()

if err != nil {
if xerrors.Is(err, datastore.ErrNotFound) {
// this is expected
return nil
}
return err
}

return nil
}

func (s *AskStoreImpl) loadAsk() error {
askb, err := s.ds.Get(s.key)
if err != nil {
return xerrors.Errorf("failed to load most recent retrieval ask from disk: %w", err)
}

var ask retrievalmarket.Ask
if err := cborutil.ReadCborRPC(bytes.NewReader(askb), &ask); err != nil {
return err
}

s.ask = &ask
return nil
}

func (s *AskStoreImpl) saveAsk(a *retrievalmarket.Ask) error {
b, err := cborutil.Dump(a)
if err != nil {
return err
}

if err := s.ds.Put(s.key, b); err != nil {
return err
}

s.ask = a
return nil
}
49 changes: 49 additions & 0 deletions retrievalmarket/impl/askstore/askstore_impl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package askstore_test

import (
"testing"

"github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/require"

"github.com/filecoin-project/go-state-types/abi"

"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/askstore"
)

func TestAskStoreImpl(t *testing.T) {
ds := dss.MutexWrap(datastore.NewMapDatastore())
store, err := askstore.NewAskStore(ds, datastore.NewKey("retrieval-ask"))
require.NoError(t, err)

// A new store returns the default ask
ask := store.GetAsk()
require.NotNil(t, ask)

require.Equal(t, retrievalmarket.DefaultUnsealPrice, ask.UnsealPrice)
require.Equal(t, retrievalmarket.DefaultPricePerByte, ask.PricePerByte)
require.Equal(t, retrievalmarket.DefaultPaymentInterval, ask.PaymentInterval)
require.Equal(t, retrievalmarket.DefaultPaymentIntervalIncrease, ask.PaymentIntervalIncrease)

// Store a new ask
newAsk := &retrievalmarket.Ask{
PricePerByte: abi.NewTokenAmount(123),
UnsealPrice: abi.NewTokenAmount(456),
PaymentInterval: 789,
PaymentIntervalIncrease: 789,
}
err = store.SetAsk(newAsk)
require.NoError(t, err)

// Fetch new ask
stored := store.GetAsk()
require.Equal(t, newAsk, stored)

// Construct a new AskStore and make sure it returns the previously-stored ask
newStore, err := askstore.NewAskStore(ds, datastore.NewKey("retrieval-ask"))
require.NoError(t, err)
stored = newStore.GetAsk()
require.Equal(t, newAsk, stored)
}
45 changes: 15 additions & 30 deletions retrievalmarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package retrievalimpl
import (
"context"
"errors"
"sync"

"github.com/hannahhoward/go-pubsub"
"github.com/ipfs/go-cid"
Expand All @@ -13,11 +12,11 @@ import (
"github.com/filecoin-project/go-address"
datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-multistore"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-statemachine/fsm"

"github.com/filecoin-project/go-fil-markets/piecestore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/askstore"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/dtutils"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/providerstates"
"github.com/filecoin-project/go-fil-markets/retrievalmarket/impl/requestvalidation"
Expand All @@ -43,9 +42,7 @@ type Provider struct {
subscribers *pubsub.PubSub
stateMachines fsm.Group
dealDecider DealDecider

askLk sync.Mutex
ask *retrievalmarket.Ask
askStore retrievalmarket.AskStore
}

type internalProviderEvent struct {
Expand All @@ -68,18 +65,6 @@ func providerDispatcher(evt pubsub.Event, subscriberFn pubsub.SubscriberFn) erro

var _ retrievalmarket.RetrievalProvider = new(Provider)

// DefaultPricePerByte is the charge per byte retrieved if the miner does
// not specifically set it
var DefaultPricePerByte = abi.NewTokenAmount(2)

// DefaultPaymentInterval is the baseline interval, set to 1Mb
// if the miner does not explicitly set it otherwise
var DefaultPaymentInterval = uint64(1 << 20)

// DefaultPaymentIntervalIncrease is the amount interval increases on each payment,
// set to to 1Mb if the miner does not explicitly set it otherwise
var DefaultPaymentIntervalIncrease = uint64(1 << 20)

// DealDeciderOpt sets a custom protocol
func DealDeciderOpt(dd DealDecider) RetrievalProviderOption {
return func(provider *Provider) {
Expand All @@ -106,13 +91,14 @@ func NewProvider(minerAddress address.Address,
minerAddress: minerAddress,
pieceStore: pieceStore,
subscribers: pubsub.New(providerDispatcher),
ask: &retrievalmarket.Ask{
PricePerByte: DefaultPricePerByte,
PaymentInterval: DefaultPaymentInterval,
PaymentIntervalIncrease: DefaultPaymentIntervalIncrease,
UnsealPrice: abi.NewTokenAmount(0),
},
}

askStore, err := askstore.NewAskStore(ds, datastore.NewKey("retrieval-ask"))
if err != nil {
return nil, err
}
p.askStore = askStore

statemachines, err := fsm.New(ds, fsm.Parameters{
Environment: &providerDealEnvironment{p},
StateType: retrievalmarket.ProviderDealState{},
Expand Down Expand Up @@ -174,17 +160,16 @@ func (p *Provider) SubscribeToEvents(subscriber retrievalmarket.ProviderSubscrib

// GetAsk returns the current deal parameters this provider accepts
func (p *Provider) GetAsk() *retrievalmarket.Ask {
p.askLk.Lock()
defer p.askLk.Unlock()
a := *p.ask
return &a
return p.askStore.GetAsk()
}

// SetAsk sets the deal parameters this provider accepts
func (p *Provider) SetAsk(ask *retrievalmarket.Ask) {
p.askLk.Lock()
defer p.askLk.Unlock()
p.ask = ask
err := p.askStore.SetAsk(ask)

if err != nil {
log.Warnf("Error setting retrieval ask: %w", err)
}
}

// ListDeals lists all known retrieval deals
Expand Down
6 changes: 6 additions & 0 deletions retrievalmarket/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,9 @@ type RetrievalProvider interface {

ListDeals() map[ProviderDealIdentifier]ProviderDealState
}

// AskStore is an interface which provides access to a persisted retrieval Ask
type AskStore interface {
GetAsk() *Ask
SetAsk(ask *Ask) error
}
1 change: 1 addition & 0 deletions storagemarket/impl/storedask/storedask.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func NewStoredAsk(ds datastore.Batching, dsKey datastore.Key, spn storagemarket.
ds: ds,
spn: spn,
actor: actor,
dsKey: dsKey,
}

if err := s.tryLoadAsk(); err != nil {
Expand Down

0 comments on commit c99bb3c

Please sign in to comment.