Skip to content

Commit

Permalink
feat: use traversal link budget instead of limitstore
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg authored and hannahhoward committed Apr 22, 2023
1 parent 7f2423d commit 6c69598
Show file tree
Hide file tree
Showing 13 changed files with 52 additions and 138 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2
github.com/benbjohnson/clock v1.3.0
github.com/dustin/go-humanize v1.0.1
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc5
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc5.0.20230406122351-99ba9d323b96
github.com/filecoin-project/go-state-types v0.10.0
github.com/google/uuid v1.3.0
github.com/hannahhoward/go-pubsub v1.0.0
github.com/ipfs/go-blockservice v0.5.0
github.com/ipfs/go-cid v0.4.0
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-graphsync v0.14.4
github.com/ipfs/go-graphsync v0.14.5-0.20230406121333-d40f493cf2d0
github.com/ipfs/go-ipfs-blockstore v1.3.0
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-delay v0.0.1
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ github.com/filecoin-project/go-commp-utils v0.1.3/go.mod h1:3ENlD1pZySaUout0p9AN
github.com/filecoin-project/go-commp-utils/nonffi v0.0.0-20220905160352-62059082a837/go.mod h1:e2YBjSblNVoBckkbv3PPqsq71q98oFkFqL7s1etViGo=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMXdBnCiXjfCYx/hLqFxccPoqsSveQFxVLvNxy9bus=
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc5 h1:lqjkVplfTRt5GV7Pxjo+H+Jhnh7tgUIhKFFLxGvjv1Y=
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc5/go.mod h1:YY4onZJ9LoSP19kdJWD7PZ0ZDJSQnbcEXYfjezMLTog=
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc5.0.20230406122351-99ba9d323b96 h1:D6zEwN3s1pWLGAGjPHDu8b2Bjl31MXqTFFdJ3avBihw=
github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc5.0.20230406122351-99ba9d323b96/go.mod h1:j2J58y4HLzDc6sXLJzuM+ONZm59SbJDEw4u1XA5JCfM=
github.com/filecoin-project/go-ds-versioning v0.1.2 h1:to4pTadv3IeV1wvgbCbN6Vqd+fu+7tveXgv/rCEZy6w=
github.com/filecoin-project/go-ds-versioning v0.1.2/go.mod h1:C9/l9PnB1+mwPa26BBVpCjG/XQCB0yj/q5CK2J8X1I4=
github.com/filecoin-project/go-fil-commcid v0.0.0-20200716160307-8f644712406f/go.mod h1:Eaox7Hvus1JgPrL5+M3+h7aSPHc0cVqpSxA+TxIEpZQ=
Expand Down Expand Up @@ -350,8 +350,8 @@ github.com/ipfs/go-datastore v0.6.0 h1:JKyz+Gvz1QEZw0LsX1IBn+JFCJQH4SJVFtM4uWU0M
github.com/ipfs/go-datastore v0.6.0/go.mod h1:rt5M3nNbSO/8q1t4LNkLyUwRs8HupMeN/8O4Vn9YAT8=
github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk=
github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps=
github.com/ipfs/go-graphsync v0.14.4 h1:ysazATpwsIjYtYEZH5CdD/HRaonCJd4pASUtnzESewk=
github.com/ipfs/go-graphsync v0.14.4/go.mod h1:yT0AfjFgicOoWdAlUJ96tQ5AkuGI4r1taIQX/aHbBQo=
github.com/ipfs/go-graphsync v0.14.5-0.20230406121333-d40f493cf2d0 h1:2VHJORBuN9wL2TmySBgHvlpZLM2Sq7BEW66Imho0lbA=
github.com/ipfs/go-graphsync v0.14.5-0.20230406121333-d40f493cf2d0/go.mod h1:yT0AfjFgicOoWdAlUJ96tQ5AkuGI4r1taIQX/aHbBQo=
github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk=
github.com/ipfs/go-ipfs-blockstore v1.3.0 h1:m2EXaWgwTzAfsmt5UdJ7Is6l4gJcaM/A12XwJyvYvMM=
github.com/ipfs/go-ipfs-blockstore v1.3.0/go.mod h1:KgtZyc9fq+P2xJUiCAzbRdhhqJHvsw8u2Dlqy2MyRTE=
Expand Down
1 change: 1 addition & 0 deletions pkg/internal/itest/client_retrieval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func runRetrieval(t *testing.T, ctx context.Context, mrn *mocknet.MockRetrievalN
mrn.Remotes[0].Host.ID(),
proposal,
selectorparse.CommonSelector_ExploreAllRecursively,
0,
subscriberLocal,
shutdown,
)
Expand Down
1 change: 1 addition & 0 deletions pkg/internal/testutil/mockclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (mc *MockClient) RetrieveFromPeer(
peerID peer.ID,
proposal *retrievaltypes.DealProposal,
selector ipld.Node,
maxBlocks uint64,
eventsCallback datatransfer.Subscriber,
gracefulShutdownRequested <-chan struct{},
) (*types.RetrievalStats, error) {
Expand Down
6 changes: 5 additions & 1 deletion pkg/net/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func (rc *RetrievalClient) RetrieveFromPeer(
peerID peer.ID,
proposal *retrievaltypes.DealProposal,
sel ipld.Node,
maxBlocks uint64,
eventsCallback datatransfer.Subscriber,
gracefulShutdownRequested <-chan struct{},
) (*types.RetrievalStats, error) {
Expand Down Expand Up @@ -309,7 +310,10 @@ func (rc *RetrievalClient) RetrieveFromPeer(
proposal.PayloadCID,
sel,
datatransfer.WithSubscriber(eventsCb),
datatransfer.WithTransportOptions(dttransport.UseStore(linkSystem)),
datatransfer.WithTransportOptions(
dttransport.UseStore(linkSystem),
dttransport.MaxLinks(maxBlocks),
),
)
if err != nil {
// We could fail before a successful proposal
Expand Down
2 changes: 2 additions & 0 deletions pkg/net/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestClient(t *testing.T) {
p,
proposal,
selector,
0,
eventsCb,
gracefulShutdownRequested,
)
Expand Down Expand Up @@ -106,6 +107,7 @@ func TestClient_BadSelector(t *testing.T) {
p,
proposal,
selector,
0,
eventsCb,
gracefulShutdownRequested,
)
Expand Down
17 changes: 16 additions & 1 deletion pkg/retriever/bitswapretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"math"
"os"
"sync/atomic"
"time"
Expand Down Expand Up @@ -229,7 +230,14 @@ func (br *bitswapRetrieval) RetrieveFromAsyncCandidates(ayncCandidates types.Inb
)

// run the retrieval
err = easyTraverse(ctx, cidlink.Link{Cid: br.request.Cid}, selector, storage.TraversalLinkSystem, storage.Preloader)
err = easyTraverse(
ctx,
cidlink.Link{Cid: br.request.Cid},
selector,
storage.TraversalLinkSystem,
storage.Preloader,
br.request.MaxBlocks,
)
storage.Stop()
cancel()

Expand Down Expand Up @@ -301,6 +309,7 @@ func easyTraverse(
traverseSelector datamodel.Node,
lsys *linking.LinkSystem,
preloader preload.Loader,
maxBlocks uint64,
) error {

protoChooser := dagpb.AddSupportToChooser(basicnode.Chooser)
Expand All @@ -323,6 +332,12 @@ func easyTraverse(
Preloader: preloader,
},
}
if maxBlocks > 0 {
progress.Budget = &traversal.Budget{
LinkBudget: int64(maxBlocks) - 1, // first block is already loaded
NodeBudget: math.MaxInt64,
}
}
progress.LastBlock.Link = root
compiledSelector, err := selector.ParseSelector(traverseSelector)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/retriever/graphsyncretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ func runRetrievalCandidate(
timeout,
candidate,
req.GetSelector(),
req.MaxBlocks,
eventsCallback,
)

Expand Down Expand Up @@ -418,6 +419,7 @@ func retrievalPhase(
timeout time.Duration,
candidate types.RetrievalCandidate,
selector ipld.Node,
maxBlocks uint64,
eventsCallback datatransfer.Subscriber,
) (*types.RetrievalStats, error) {

Expand Down Expand Up @@ -492,6 +494,7 @@ func retrievalPhase(
candidate.MinerPeer.ID,
proposal,
selector,
uint64(maxBlocks),
eventsSubscriber,
gracefulShutdownChan,
)
Expand Down
1 change: 1 addition & 0 deletions pkg/retriever/retrievalclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type RetrievalClient interface {
peerID peer.ID,
proposal *retrievaltypes.DealProposal,
selector ipld.Node,
maxLinks uint64,
eventsCallback datatransfer.Subscriber,
gracefulShutdownRequested <-chan struct{},
) (*types.RetrievalStats, error)
Expand Down
33 changes: 16 additions & 17 deletions pkg/server/http/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
lassie "github.com/filecoin-project/lassie/pkg/lassie"
"github.com/filecoin-project/lassie/pkg/retriever"
"github.com/filecoin-project/lassie/pkg/storage"
"github.com/filecoin-project/lassie/pkg/storage/limitstore"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-cid"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
Expand Down Expand Up @@ -176,21 +175,6 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response
}()
var store types.ReadableWritableStorage = carStore

// extract block limit from query param as needed
var blockLimit uint64
if req.URL.Query().Has("blockLimit") {
if parsedBlockLimit, err := strconv.ParseUint(req.URL.Query().Get("blockLimit"), 10, 64); err == nil {
blockLimit = parsedBlockLimit
}
}
if cfg.MaxBlocksPerRequest > 0 || blockLimit > 0 {
// use the lowest non-zero value for block limit
if blockLimit == 0 || (cfg.MaxBlocksPerRequest > 0 && blockLimit > cfg.MaxBlocksPerRequest) {
blockLimit = cfg.MaxBlocksPerRequest
}
store = limitstore.NewLimitStore(carStore, blockLimit)
}

carWriter.OnPut(func(int) {
// called once we start writing blocks into the CAR (on the first Put())
res.Header().Set("Content-Disposition", "attachment; filename="+filename)
Expand Down Expand Up @@ -224,7 +208,22 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response
request.PreloadLinkSystem.SetWriteStorage(preloadStore)
request.PreloadLinkSystem.TrustedStorage = true

log.Debugw("fetching CID", "retrievalId", retrievalId, "CID", rootCid.String(), "path", unixfsPath, "carScope", carScope)
// extract block limit from query param as needed
var blockLimit uint64
if req.URL.Query().Has("blockLimit") {
if parsedBlockLimit, err := strconv.ParseUint(req.URL.Query().Get("blockLimit"), 10, 64); err == nil {
blockLimit = parsedBlockLimit
}
}
if cfg.MaxBlocksPerRequest > 0 || blockLimit > 0 {
// use the lowest non-zero value for block limit
if blockLimit == 0 || (cfg.MaxBlocksPerRequest > 0 && blockLimit > cfg.MaxBlocksPerRequest) {
blockLimit = cfg.MaxBlocksPerRequest
}
request.MaxBlocks = blockLimit
}

log.Debugw("fetching CID", "retrievalId", retrievalId, "CID", rootCid.String(), "path", unixfsPath, "carScope", carScope)
stats, err := lassie.Fetch(req.Context(), request, func(re types.RetrievalEvent) {
header := servertiming.FromContext(req.Context())
if header == nil {
Expand Down
54 changes: 0 additions & 54 deletions pkg/storage/limitstore/limitstore.go

This file was deleted.

59 changes: 0 additions & 59 deletions pkg/storage/limitstore/limitstore_test.go

This file was deleted.

1 change: 1 addition & 0 deletions pkg/types/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type RetrievalRequest struct {
Selector ipld.Node
Protocols []multicodec.Code
PreloadLinkSystem ipld.LinkSystem
MaxBlocks uint64
}

// NewRequestForPath creates a new RetrievalRequest from the provided parameters
Expand Down

0 comments on commit 6c69598

Please sign in to comment.