Skip to content

Commit

Permalink
Feat/support custom metadata (#759)
Browse files Browse the repository at this point in the history
* feat(provider): support using custom metadata

* fix(provider): remove staged from sealing states

StorageDealStaged is pre-announcement, and shouldn't be included in AnnounceAllDealsToIndexer
  • Loading branch information
hannahhoward authored Oct 13, 2022
1 parent 7175401 commit b57e701
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 19 deletions.
40 changes: 28 additions & 12 deletions storagemarket/impl/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package storageimpl

import (
"context"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -59,6 +60,16 @@ type MeshCreator interface {
Connect(context.Context) error
}

type MetadataFunc func(storagemarket.MinerDeal) metadata.Metadata

func defaultMetadataFunc(deal storagemarket.MinerDeal) metadata.Metadata {
return metadata.New(&metadata.GraphsyncFilecoinV1{
PieceCID: deal.Proposal.PieceCID,
FastRetrieval: deal.FastRetrieval,
VerifiedDeal: deal.Proposal.VerifiedDeal,
})
}

// Provider is the production implementation of the StorageProvider interface
type Provider struct {
net network.StorageMarketNetwork
Expand All @@ -80,9 +91,10 @@ type Provider struct {

unsubDataTransfer datatransfer.Unsubscribe

dagStore stores.DAGStoreWrapper
indexProvider provider.Interface
stores *stores.ReadWriteBlockstores
dagStore stores.DAGStoreWrapper
indexProvider provider.Interface
metadataForDeal MetadataFunc
stores *stores.ReadWriteBlockstores
}

// StorageProviderOption allows custom configuration of a storage provider
Expand Down Expand Up @@ -113,6 +125,12 @@ func AwaitTransferRestartTimeout(waitTime time.Duration) StorageProviderOption {
}
}

func CustomMetadataGenerator(metadataFunc MetadataFunc) StorageProviderOption {
return func(p *Provider) {
p.metadataForDeal = metadataFunc
}
}

// NewProvider returns a new storage provider
func NewProvider(net network.StorageMarketNetwork,
ds datastore.Batching,
Expand Down Expand Up @@ -143,6 +161,7 @@ func NewProvider(net network.StorageMarketNetwork,
stores: stores.NewReadWriteBlockstores(),
awaitTransferRestartTimeout: defaultAwaitRestartTimeout,
indexProvider: indexer,
metadataForDeal: defaultMetadataFunc,
}
storageMigrations, err := migrations.ProviderMigrations.Build()
if err != nil {
Expand Down Expand Up @@ -541,17 +560,11 @@ func (p *Provider) AnnounceDealToIndexer(ctx context.Context, proposalCid cid.Ci
return xerrors.Errorf("failed getting deal %s: %w", proposalCid, err)
}

mt := metadata.New(&metadata.GraphsyncFilecoinV1{
PieceCID: deal.Proposal.PieceCID,
FastRetrieval: deal.FastRetrieval,
VerifiedDeal: deal.Proposal.VerifiedDeal,
})

if err := p.meshCreator.Connect(ctx); err != nil {
return fmt.Errorf("cannot publish index record as indexer host failed to connect to the full node: %w", err)
}

annCid, err := p.indexProvider.NotifyPut(ctx, deal.ProposalCid.Bytes(), mt)
annCid, err := p.indexProvider.NotifyPut(ctx, deal.ProposalCid.Bytes(), p.metadataForDeal(deal))
if err == nil {
log.Infow("deal announcement sent to index provider", "advertisementCid", annCid, "shard-key", deal.Proposal.PieceCID,
"proposalCid", deal.ProposalCid)
Expand Down Expand Up @@ -591,8 +604,11 @@ func (p *Provider) AnnounceAllDealsToIndexer(ctx context.Context) error {
}

if err := p.AnnounceDealToIndexer(ctx, d.ProposalCid); err != nil {
merr = multierror.Append(merr, err)
log.Errorw("failed to announce deal to Index provider", "proposalCid", d.ProposalCid, "err", err)
// don't log already advertised errors as errors - just skip them
if !errors.Is(err, provider.ErrAlreadyAdvertised) {
merr = multierror.Append(merr, err)
log.Errorw("failed to announce deal to Index provider", "proposalCid", d.ProposalCid, "err", err)
}
continue
}
shards[d.Proposal.PieceCID.String()] = struct{}{}
Expand Down
7 changes: 1 addition & 6 deletions storagemarket/impl/provider_environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/index-provider/metadata"

"github.com/filecoin-project/go-fil-markets/commp"
"github.com/filecoin-project/go-fil-markets/filestore"
Expand All @@ -40,11 +39,7 @@ func (p *providerDealEnvironment) RegisterShard(ctx context.Context, pieceCid ci
// AnnounceIndex informs indexer nodes that a new deal was received,
// so they can download its index
func (p *providerDealEnvironment) AnnounceIndex(ctx context.Context, deal storagemarket.MinerDeal) (advertCid cid.Cid, err error) {
mt := metadata.New(&metadata.GraphsyncFilecoinV1{
PieceCID: deal.Proposal.PieceCID,
FastRetrieval: deal.FastRetrieval,
VerifiedDeal: deal.Proposal.VerifiedDeal,
})
mt := p.p.metadataForDeal(deal)

// ensure we have a connection with the full node host so that the index provider gossip sub announcements make their
// way to the filecoin bootstrapper network
Expand Down
1 change: 0 additions & 1 deletion storagemarket/impl/providerstates/provider_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ var ProviderFinalityStates = []fsm.StateKey{
// StatesKnownBySealingSubsystem are the states on the happy path after hand-off to
// the sealing subsystem
var StatesKnownBySealingSubsystem = []fsm.StateKey{
storagemarket.StorageDealStaged,
storagemarket.StorageDealAwaitingPreCommit,
storagemarket.StorageDealSealing,
storagemarket.StorageDealFinalizing,
Expand Down

0 comments on commit b57e701

Please sign in to comment.