From 5996325fe54b1e9c8e04415e3847a244747bbb35 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 23 May 2023 11:27:07 -0700 Subject: [PATCH 01/24] WIP --- pkg/storage/cachingtempstore.go | 4 +- pkg/storage/deferredcarwriter.go | 11 +- pkg/storage/duplicateAdder.go | 61 +++++++++ pkg/verifiedcar/verifiedcar.go | 70 +++++++--- pkg/verifiedcar/verifiedcar_test.go | 195 +++++++++++++++++++++------- 5 files changed, 268 insertions(+), 73 deletions(-) create mode 100644 pkg/storage/duplicateAdder.go diff --git a/pkg/storage/cachingtempstore.go b/pkg/storage/cachingtempstore.go index 2f7990d8..30d08654 100644 --- a/pkg/storage/cachingtempstore.go +++ b/pkg/storage/cachingtempstore.go @@ -40,9 +40,9 @@ type CachingTempStore struct { preloadKeys map[string]struct{} } -func NewCachingTempStore(outWriter linking.BlockWriteOpener, tempDir string) *CachingTempStore { +func NewCachingTempStore(outWriter linking.BlockWriteOpener, store *DeferredStorageCar) *CachingTempStore { return &CachingTempStore{ - store: NewDeferredStorageCar(tempDir), + store: store, outWriter: outWriter, preloadKeys: make(map[string]struct{}), } diff --git a/pkg/storage/deferredcarwriter.go b/pkg/storage/deferredcarwriter.go index abe3e49a..bcc396f5 100644 --- a/pkg/storage/deferredcarwriter.go +++ b/pkg/storage/deferredcarwriter.go @@ -38,20 +38,21 @@ type DeferredCarWriter struct { f *os.File w carstorage.WritableCar putCb []putCb + opts []carv2.Option } // NewDeferredCarWriterForPath creates a DeferredCarWriter that will write to a // file designated by the supplied path. The file will only be created on the // first Put() operation. -func NewDeferredCarWriterForPath(root cid.Cid, outPath string) *DeferredCarWriter { - return &DeferredCarWriter{root: root, outPath: outPath} +func NewDeferredCarWriterForPath(root cid.Cid, outPath string, opts ...carv2.Option) *DeferredCarWriter { + return &DeferredCarWriter{root: root, outPath: outPath, opts: opts} } // NewDeferredCarWriterForStream creates a DeferredCarWriter that will write to // the supplied stream. The stream will only be written to on the first Put() // operation. -func NewDeferredCarWriterForStream(root cid.Cid, outStream io.Writer) *DeferredCarWriter { - return &DeferredCarWriter{root: root, outStream: outStream} +func NewDeferredCarWriterForStream(root cid.Cid, outStream io.Writer, opts ...carv2.Option) *DeferredCarWriter { + return &DeferredCarWriter{root: root, outStream: outStream, opts: opts} } // OnPut will call a callback when each Put() operation is started. The argument @@ -120,7 +121,7 @@ func (dcw *DeferredCarWriter) writer() (carstorage.WritableCar, error) { dcw.f = openedFile outStream = openedFile } - w, err := carstorage.NewWritable(outStream, []cid.Cid{dcw.root}, carv2.WriteAsCarV1(true)) + w, err := carstorage.NewWritable(outStream, []cid.Cid{dcw.root}, append([]carv2.Option{carv2.WriteAsCarV1(true)}, dcw.opts...)...) if err != nil { return nil, err } diff --git a/pkg/storage/duplicateAdder.go b/pkg/storage/duplicateAdder.go new file mode 100644 index 00000000..fc0547f1 --- /dev/null +++ b/pkg/storage/duplicateAdder.go @@ -0,0 +1,61 @@ +package storage + +import ( + "context" + "io" + + "github.com/filecoin-project/lassie/pkg/verifiedcar" + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/linking" + "github.com/ipld/go-ipld-prime/traversal/selector" +) + +type DuplicateAdder struct { + ctx context.Context + cancel context.CancelFunc + root cid.Cid + selector ipld.Node + *DeferredCarWriter + outgoing *DeferredCarWriter +} + +func NewDuplicateAdderCarForStream(ctx context.Context, root cid.Cid, selector ipld.Node, store *DeferredStorageCar, outStream io.Writer) { + +} + +func (da *DuplicateAdder) addDupes(ctx context.Context) error { + + sel, err := selector.CompileSelector(da.selector) + if err != nil { + return nil, err + } + + cfg := verifiedcar.Config{ + Root: da.root, + Selector: sel, + } + + blockCount, byteCount, err := cfg.Verify(ctx, rdr, retrieval.request.LinkSystem) + if err != nil { + return nil, err + } + +} + +// OnPut will call a callback when each Put() operation is started. The argument +// to the callback is the number of bytes being written. If once is true, the +// callback will be removed after the first call. +func (da *DuplicateAdder) OnPut(cb func(int), once bool) { + +} + +// Close closes the underlying file, if one was created. +func (dcw *DeferredCarWriter) Close() error { + +} + +// BlockWriteOpener returns a BlockWriteOpener that operates on this storage. +func (da *DuplicateAdder) BlockWriteOpener() linking.BlockWriteOpener { + return da.incoming.BlockWriteOpener() +} diff --git a/pkg/verifiedcar/verifiedcar.go b/pkg/verifiedcar/verifiedcar.go index 347daef5..29695ad9 100644 --- a/pkg/verifiedcar/verifiedcar.go +++ b/pkg/verifiedcar/verifiedcar.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "github.com/ipfs/go-cid" format "github.com/ipfs/go-ipld-format" @@ -33,9 +34,11 @@ var ( var protoChooser = dagpb.AddSupportToChooser(basicnode.Chooser) type Config struct { - Root cid.Cid // The single root we expect to appear in the CAR and that we use to run our traversal against - AllowCARv2 bool // If true, allow CARv2 files to be received, otherwise strictly only allow CARv1 - Selector selector.Selector // The selector to execute, starting at the provided Root, to verify the contents of the CAR + Root cid.Cid // The single root we expect to appear in the CAR and that we use to run our traversal against + AllowCARv2 bool // If true, allow CARv2 files to be received, otherwise strictly only allow CARv1 + Selector selector.Selector // The selector to execute, starting at the provided Root, to verify the contents of the CAR + AllowDuplicatesIn bool // Handles whether the incoming stream has duplicates + WriteDuplicatesOut bool // Handles whether duplicates should be written a second time as blocks } func visitNoop(p traversal.Progress, n datamodel.Node, r traversal.VisitReason) error { return nil } @@ -75,11 +78,11 @@ func (cfg Config) Verify(ctx context.Context, rdr io.Reader, lsys linking.LinkSy cr := &carReader{ cbr: cbr, } - + bt := &writeTracker{} lsys.TrustedStorage = true // we can rely on the CAR decoder to check CID integrity unixfsnode.AddUnixFSReificationToLinkSystem(&lsys) - lsys.StorageReadOpener = nextBlockReadOpener(ctx, cr, lsys) + lsys.StorageReadOpener = cfg.nextBlockReadOpener(ctx, cr, bt, lsys) // run traversal in this goroutine progress := traversal.Progress{ @@ -110,24 +113,44 @@ func (cfg Config) Verify(ctx context.Context, rdr io.Reader, lsys linking.LinkSy } // wait for parser to finish and provide errors or stats - return cr.blocks, cr.bytes, nil + return bt.blocks, bt.bytes, nil } -func nextBlockReadOpener(ctx context.Context, cr *carReader, lsys linking.LinkSystem) linking.BlockReadOpener { +func (cfg *Config) nextBlockReadOpener(ctx context.Context, cr *carReader, bt *writeTracker, lsys linking.LinkSystem) linking.BlockReadOpener { seen := make(map[cid.Cid]struct{}) return func(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) { cid := l.(cidlink.Link).Cid + var data []byte + var err error if _, ok := seen[cid]; ok { - // duplicate block, rely on the supplied LinkSystem to have stored this - return lsys.StorageReadOpener(lc, l) - } - - seen[cid] = struct{}{} - data, err := cr.readNextBlock(ctx, cid) - if err != nil { - return nil, err + if cfg.AllowDuplicatesIn { + data, err = cr.readNextBlock(ctx, cid) + if err != nil { + return nil, err + } + if !cfg.WriteDuplicatesOut { + return bytes.NewReader(data), nil + } + } else { + // duplicate block, rely on the supplied LinkSystem to have stored this + rdr, err := lsys.StorageReadOpener(lc, l) + if !cfg.WriteDuplicatesOut { + return rdr, err + } + data, err = ioutil.ReadAll(rdr) + if err != nil { + return nil, err + } + } + } else { + seen[cid] = struct{}{} + data, err = cr.readNextBlock(ctx, cid) + if err != nil { + return nil, err + } } + bt.recordBlock(data) w, wc, err := lsys.StorageWriteOpener(lc) if err != nil { return nil, err @@ -147,9 +170,7 @@ func nextBlockReadOpener(ctx context.Context, cr *carReader, lsys linking.LinkSy } type carReader struct { - cbr *car.BlockReader - blocks uint64 - bytes uint64 + cbr *car.BlockReader } func (cr *carReader) readNextBlock(ctx context.Context, expected cid.Cid) ([]byte, error) { @@ -164,12 +185,19 @@ func (cr *carReader) readNextBlock(ctx context.Context, expected cid.Cid) ([]byt if blk.Cid() != expected { return nil, fmt.Errorf("%w: %s != %s", ErrUnexpectedBlock, blk.Cid(), expected) } - - cr.bytes += uint64(len(blk.RawData())) - cr.blocks++ return blk.RawData(), nil } +type writeTracker struct { + blocks uint64 + bytes uint64 +} + +func (bt *writeTracker) recordBlock(data []byte) { + bt.blocks++ + bt.bytes += uint64(len(data)) +} + func traversalError(original error) error { err := original for { diff --git a/pkg/verifiedcar/verifiedcar_test.go b/pkg/verifiedcar/verifiedcar_test.go index 950b5a97..a974fde2 100644 --- a/pkg/verifiedcar/verifiedcar_test.go +++ b/pkg/verifiedcar/verifiedcar_test.go @@ -64,6 +64,8 @@ func TestVerifiedCar(t *testing.T) { unixfsFile := unixfs.GenerateFile(t, &lsys, rndReader, 4<<20) unixfsFileBlocks := toBlocks(t, lsys, unixfsFile.Root, allSelector) + unixfsFileWithDups := unixfs.GenerateFile(t, &lsys, zeroReader{}, 4<<20) + unixfsFileWithDupsBlocks := toBlocks(t, lsys, unixfsFileWithDups.Root, allSelector) var unixfsDir unixfs.DirEntry var unixfsDirBlocks []block for { @@ -106,16 +108,17 @@ func TestVerifiedCar(t *testing.T) { unixfsExclusiveWrappedShardedDirOnlyBlocks := toBlocks(t, lsys, unixfsExclusiveWrappedShardedDir.Root, unixfsWrappedPreloadPathSelector) testCases := []struct { - name string - blocks []block - roots []cid.Cid - carv2 bool - err string - cfg verifiedcar.Config + name string + blocks []expectedBlock + roots []cid.Cid + carv2 bool + err string + cfg verifiedcar.Config + incomingHasDups bool }{ { name: "complete carv1", - blocks: allBlocks, + blocks: consumedBlocks(allBlocks), roots: []cid.Cid{root1}, cfg: verifiedcar.Config{ Root: root1, @@ -124,7 +127,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "carv2 without AllowCARv2 errors", - blocks: allBlocks, + blocks: consumedBlocks(allBlocks), roots: []cid.Cid{root1}, carv2: true, err: "bad CAR version", @@ -135,7 +138,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "complete carv2 with AllowCARv2", - blocks: allBlocks, + blocks: consumedBlocks(allBlocks), roots: []cid.Cid{root1}, carv2: true, cfg: verifiedcar.Config{ @@ -146,7 +149,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "carv1 with multiple roots errors", - blocks: allBlocks, + blocks: consumedBlocks(allBlocks), roots: []cid.Cid{root1, root1}, err: "root CID mismatch", cfg: verifiedcar.Config{ @@ -156,7 +159,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "carv1 with wrong root errors", - blocks: allBlocks, + blocks: consumedBlocks(allBlocks), roots: []cid.Cid{tbc1.AllBlocks()[1].Cid()}, err: "root CID mismatch", cfg: verifiedcar.Config{ @@ -166,7 +169,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "carv1 with extraneous trailing block errors", - blocks: append(append([]block{}, allBlocks...), block{extraneousLnk.(cidlink.Link).Cid, extraneousByts}), + blocks: append(consumedBlocks(append([]block{}, allBlocks...)), expectedBlock{block{extraneousLnk.(cidlink.Link).Cid, extraneousByts}, true}), roots: []cid.Cid{root1}, err: "extraneous block in CAR", cfg: verifiedcar.Config{ @@ -176,7 +179,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "carv1 with extraneous leading block errors", - blocks: append([]block{{extraneousLnk.(cidlink.Link).Cid, extraneousByts}}, allBlocks...), + blocks: append(consumedBlocks([]block{{extraneousLnk.(cidlink.Link).Cid, extraneousByts}}), consumedBlocks(allBlocks)...), roots: []cid.Cid{root1}, err: "unexpected block in CAR: " + extraneousLnk.(cidlink.Link).Cid.String() + " != " + allBlocks[0].cid.String(), cfg: verifiedcar.Config{ @@ -186,7 +189,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "carv1 with out-of-order blocks errors", - blocks: append(append([]block{}, allBlocks[50:]...), allBlocks[0:50]...), + blocks: consumedBlocks(append(append([]block{}, allBlocks[50:]...), allBlocks[0:50]...)), roots: []cid.Cid{root1}, err: "unexpected block in CAR: " + allBlocks[50].cid.String() + " != " + allBlocks[0].cid.String(), cfg: verifiedcar.Config{ @@ -196,7 +199,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "carv1 with mismatching CID errors", - blocks: append(append([]block{}, allBlocks[0:99]...), block{allBlocks[99].cid, extraneousByts}), + blocks: consumedBlocks(append(append([]block{}, allBlocks[0:99]...), block{allBlocks[99].cid, extraneousByts})), roots: []cid.Cid{root1}, err: "mismatch in content integrity", cfg: verifiedcar.Config{ @@ -206,7 +209,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded file", - blocks: unixfsFileBlocks, + blocks: consumedBlocks(unixfsFileBlocks), roots: []cid.Cid{unixfsFile.Root}, cfg: verifiedcar.Config{ Root: unixfsFile.Root, @@ -215,7 +218,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large directory", - blocks: unixfsDirBlocks, + blocks: consumedBlocks(unixfsDirBlocks), roots: []cid.Cid{unixfsDir.Root}, cfg: verifiedcar.Config{ Root: unixfsDir.Root, @@ -224,7 +227,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded directory", - blocks: unixfsShardedDirBlocks, + blocks: consumedBlocks(unixfsShardedDirBlocks), roots: []cid.Cid{unixfsShardedDir.Root}, cfg: verifiedcar.Config{ Root: unixfsShardedDir.Root, @@ -233,7 +236,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded file with file scope", - blocks: unixfsFileBlocks, + blocks: consumedBlocks(unixfsFileBlocks), roots: []cid.Cid{unixfsFile.Root}, cfg: verifiedcar.Config{ Root: unixfsFile.Root, @@ -248,7 +251,7 @@ func TestVerifiedCar(t *testing.T) { // traversal, why is unixfs-preload making a difference for just matching a // directory.UnixFSBasicDir. name: "unixfs: all of large directory with file scope, errors", - blocks: unixfsDirBlocks, + blocks: consumedBlocks(unixfsDirBlocks), roots: []cid.Cid{unixfsDir.Root}, err: "extraneous block in CAR", cfg: verifiedcar.Config{ @@ -258,7 +261,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: all of large sharded directory with file scope, errors", - blocks: unixfsShardedDirBlocks, + blocks: consumedBlocks(unixfsShardedDirBlocks), roots: []cid.Cid{unixfsShardedDir.Root}, err: "extraneous block in CAR", cfg: verifiedcar.Config{ @@ -268,7 +271,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: all of large directory with file scope", - blocks: unixfsPreloadDirBlocks, + blocks: consumedBlocks(unixfsPreloadDirBlocks), roots: []cid.Cid{unixfsDir.Root}, cfg: verifiedcar.Config{ Root: unixfsDir.Root, @@ -277,7 +280,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: all of large sharded directory with file scope", - blocks: unixfsPreloadShardedDirBlocks, + blocks: consumedBlocks(unixfsPreloadShardedDirBlocks), roots: []cid.Cid{unixfsShardedDir.Root}, cfg: verifiedcar.Config{ Root: unixfsShardedDir.Root, @@ -286,7 +289,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: pathed subset inside large directory with file scope, errors", - blocks: unixfsDirBlocks, + blocks: consumedBlocks(unixfsDirBlocks), roots: []cid.Cid{unixfsDir.Root}, err: "unexpected block in CAR", cfg: verifiedcar.Config{ @@ -296,7 +299,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded file wrapped in directories", - blocks: unixfsWrappedFileBlocks, + blocks: consumedBlocks(unixfsWrappedFileBlocks), roots: []cid.Cid{unixfsWrappedFile.Root}, cfg: verifiedcar.Config{ Root: unixfsWrappedFile.Root, @@ -306,7 +309,7 @@ func TestVerifiedCar(t *testing.T) { { // our wrapped file has additional in the nested directories name: "unixfs: large sharded file wrapped in directories, pathed, errors", - blocks: unixfsWrappedFileBlocks, + blocks: consumedBlocks(unixfsWrappedFileBlocks), roots: []cid.Cid{unixfsWrappedFile.Root}, err: "unexpected block in CAR", cfg: verifiedcar.Config{ @@ -316,7 +319,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded file wrapped in directories, trimmed, pathed", - blocks: unixfsTrimmedWrappedFileBlocks, + blocks: consumedBlocks(unixfsTrimmedWrappedFileBlocks), roots: []cid.Cid{unixfsWrappedFile.Root}, cfg: verifiedcar.Config{ Root: unixfsWrappedFile.Root, @@ -325,7 +328,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded file wrapped in directories, trimmed, all, errors", - blocks: unixfsTrimmedWrappedFileBlocks, + blocks: consumedBlocks(unixfsTrimmedWrappedFileBlocks), roots: []cid.Cid{unixfsWrappedFile.Root}, err: "unexpected block in CAR", cfg: verifiedcar.Config{ @@ -335,7 +338,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded file wrapped in directories, exclusive, pathed", - blocks: unixfsExclusiveWrappedFileBlocks, + blocks: consumedBlocks(unixfsExclusiveWrappedFileBlocks), roots: []cid.Cid{unixfsExclusiveWrappedFile.Root}, cfg: verifiedcar.Config{ Root: unixfsExclusiveWrappedFile.Root, @@ -344,7 +347,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded dir wrapped in directories", - blocks: unixfsWrappedShardedDirBlocks, + blocks: consumedBlocks(unixfsWrappedShardedDirBlocks), roots: []cid.Cid{unixfsWrappedShardedDir.Root}, cfg: verifiedcar.Config{ Root: unixfsWrappedShardedDir.Root, @@ -354,7 +357,7 @@ func TestVerifiedCar(t *testing.T) { { // our wrapped dir has additional in the nested directories name: "unixfs: large sharded dir wrapped in directories, pathed, errors", - blocks: unixfsWrappedShardedDirBlocks, + blocks: consumedBlocks(unixfsWrappedShardedDirBlocks), roots: []cid.Cid{unixfsWrappedShardedDir.Root}, err: "unexpected block in CAR", cfg: verifiedcar.Config{ @@ -364,7 +367,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded dir wrapped in directories, trimmed, pathed", - blocks: unixfsTrimmedWrappedShardedDirBlocks, + blocks: consumedBlocks(unixfsTrimmedWrappedShardedDirBlocks), roots: []cid.Cid{unixfsWrappedShardedDir.Root}, cfg: verifiedcar.Config{ Root: unixfsWrappedShardedDir.Root, @@ -373,7 +376,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded dir wrapped in directories, trimmed, preload, pathed", - blocks: unixfsTrimmedWrappedShardedDirOnlyBlocks, + blocks: consumedBlocks(unixfsTrimmedWrappedShardedDirOnlyBlocks), roots: []cid.Cid{unixfsWrappedShardedDir.Root}, cfg: verifiedcar.Config{ Root: unixfsWrappedShardedDir.Root, @@ -382,7 +385,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded dir wrapped in directories, trimmed, all, errors", - blocks: unixfsTrimmedWrappedShardedDirBlocks, + blocks: consumedBlocks(unixfsTrimmedWrappedShardedDirBlocks), roots: []cid.Cid{unixfsWrappedShardedDir.Root}, err: "unexpected block in CAR", cfg: verifiedcar.Config{ @@ -392,7 +395,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded dir wrapped in directories, exclusive, pathed", - blocks: unixfsExclusiveWrappedShardedDirBlocks, + blocks: consumedBlocks(unixfsExclusiveWrappedShardedDirBlocks), roots: []cid.Cid{unixfsExclusiveWrappedShardedDir.Root}, cfg: verifiedcar.Config{ Root: unixfsExclusiveWrappedShardedDir.Root, @@ -401,13 +404,66 @@ func TestVerifiedCar(t *testing.T) { }, { name: "unixfs: large sharded dir wrapped in directories, exclusive, preload, pathed", - blocks: unixfsExclusiveWrappedShardedDirOnlyBlocks, + blocks: consumedBlocks(unixfsExclusiveWrappedShardedDirOnlyBlocks), roots: []cid.Cid{unixfsExclusiveWrappedShardedDir.Root}, cfg: verifiedcar.Config{ Root: unixfsExclusiveWrappedShardedDir.Root, Selector: unixfsWrappedPreloadPathSelector, }, }, + { + name: "unixfs: file with dups", + blocks: append(append(consumedBlocks(unixfsFileWithDupsBlocks[:2]), skippedBlocks(unixfsFileWithDupsBlocks[2:len(unixfsFileWithDupsBlocks)-1])...), consumedBlocks(unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1:])...), + roots: []cid.Cid{unixfsFileWithDups.Root}, + cfg: verifiedcar.Config{ + Root: unixfsFileWithDups.Root, + Selector: allSelector, + }, + }, + { + name: "unixfs: file with dups, incoming has dups, not allowed", + blocks: append(append(consumedBlocks(unixfsFileWithDupsBlocks[:2]), skippedBlocks(unixfsFileWithDupsBlocks[2:len(unixfsFileWithDupsBlocks)-1])...), consumedBlocks(unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1:])...), + err: "unexpected block in CAR: " + unixfsFileWithDupsBlocks[2].cid.String() + " != " + unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1].cid.String(), + roots: []cid.Cid{unixfsFileWithDups.Root}, + cfg: verifiedcar.Config{ + Root: unixfsFileWithDups.Root, + Selector: allSelector, + }, + incomingHasDups: true, + }, + { + name: "unixfs: file with dups, incoming has dups, allowed", + blocks: append(append(consumedBlocks(unixfsFileWithDupsBlocks[:2]), skippedBlocks(unixfsFileWithDupsBlocks[2:len(unixfsFileWithDupsBlocks)-1])...), consumedBlocks(unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1:])...), + roots: []cid.Cid{unixfsFileWithDups.Root}, + cfg: verifiedcar.Config{ + Root: unixfsFileWithDups.Root, + Selector: allSelector, + AllowDuplicatesIn: true, + }, + incomingHasDups: true, + }, + { + name: "unixfs: file with dups, duplicate writes on", + blocks: consumedBlocks(unixfsFileWithDupsBlocks), + roots: []cid.Cid{unixfsFileWithDups.Root}, + cfg: verifiedcar.Config{ + Root: unixfsFileWithDups.Root, + Selector: allSelector, + WriteDuplicatesOut: true, + }, + }, + { + name: "unixfs: file with dups, duplicate writes on, incoming dups", + blocks: consumedBlocks(unixfsFileWithDupsBlocks), + roots: []cid.Cid{unixfsFileWithDups.Root}, + cfg: verifiedcar.Config{ + Root: unixfsFileWithDups.Root, + Selector: allSelector, + WriteDuplicatesOut: true, + AllowDuplicatesIn: true, + }, + incomingHasDups: true, + }, } for _, testCase := range testCases { @@ -428,11 +484,15 @@ func TestVerifiedCar(t *testing.T) { lsys.SetWriteStorage(store) bwo := lsys.StorageWriteOpener var writeCounter int + var skipped int lsys.StorageWriteOpener = func(lc linking.LinkContext) (io.Writer, linking.BlockWriteCommitter, error) { var buf bytes.Buffer return &buf, func(l datamodel.Link) error { - req.Equal(testCase.blocks[writeCounter].cid, l.(cidlink.Link).Cid, "block %d", writeCounter) - req.Equal(testCase.blocks[writeCounter].data, buf.Bytes(), "block %d", writeCounter) + for testCase.blocks[writeCounter+skipped].skipped { + skipped++ + } + req.Equal(testCase.blocks[writeCounter+skipped].cid, l.(cidlink.Link).Cid, "block %d", writeCounter) + req.Equal(testCase.blocks[writeCounter+skipped].data, buf.Bytes(), "block %d", writeCounter) writeCounter++ w, wc, err := bwo(lc) if err != nil { @@ -443,18 +503,21 @@ func TestVerifiedCar(t *testing.T) { }, nil } - carStream := makeCarStream(t, ctx, testCase.roots, testCase.blocks, testCase.carv2, testCase.err != "") + carStream := makeCarStream(t, ctx, testCase.roots, testCase.blocks, testCase.carv2, testCase.err != "", testCase.incomingHasDups) blockCount, byteCount, err := testCase.cfg.Verify(ctx, carStream, lsys) + // read the rest of data + io.ReadAll(carStream) + if testCase.err != "" { req.ErrorContains(err, testCase.err) req.Equal(uint64(0), blockCount) req.Equal(uint64(0), byteCount) } else { req.NoError(err) - req.Equal(uint64(len(testCase.blocks)), blockCount) + req.Equal(count(testCase.blocks), blockCount) req.Equal(sizeOf(testCase.blocks), byteCount) - req.Equal(len(testCase.blocks), writeCounter) + req.Equal(int(count(testCase.blocks)), writeCounter) } }) } @@ -464,9 +527,10 @@ func makeCarStream( t *testing.T, ctx context.Context, roots []cid.Cid, - blocks []block, + blocks []expectedBlock, carv2 bool, expectErrors bool, + allowDuplicatePuts bool, ) io.Reader { r, w := io.Pipe() @@ -490,7 +554,7 @@ func makeCarStream( carW = v2f } - carWriter, err := storage.NewWritable(carW, roots, car.WriteAsCarV1(!carv2), car.AllowDuplicatePuts(false)) + carWriter, err := storage.NewWritable(carW, roots, car.WriteAsCarV1(!carv2), car.AllowDuplicatePuts(allowDuplicatePuts)) req.NoError(err) if err != nil { return @@ -530,11 +594,43 @@ type block struct { cid cid.Cid data []byte } +type expectedBlock struct { + block + skipped bool +} + +func consumedBlocks(blocks []block) []expectedBlock { + expectedBlocks := make([]expectedBlock, 0, len(blocks)) + for _, block := range blocks { + expectedBlocks = append(expectedBlocks, expectedBlock{block, false}) + } + return expectedBlocks +} + +func skippedBlocks(blocks []block) []expectedBlock { + expectedBlocks := make([]expectedBlock, 0, len(blocks)) + for _, block := range blocks { + expectedBlocks = append(expectedBlocks, expectedBlock{block, true}) + } + return expectedBlocks +} + +func count(blocks []expectedBlock) uint64 { + total := uint64(0) + for _, block := range blocks { + if !block.skipped { + total++ + } + } + return total +} -func sizeOf(blocks []block) uint64 { +func sizeOf(blocks []expectedBlock) uint64 { total := uint64(0) for _, block := range blocks { - total += uint64(len(block.data)) + if !block.skipped { + total += uint64(len(block.data)) + } } return total } @@ -602,3 +698,12 @@ func mustCompile(selNode datamodel.Node) selector.Selector { } return sel } + +type zeroReader struct{} + +func (zeroReader) Read(b []byte) (n int, err error) { + for i := range b { + b[i] = 0 + } + return len(b), nil +} From 8e75735541eaed1f6f0a51d2d1dc6c7a0b39af95 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Tue, 23 May 2023 22:44:21 -0700 Subject: [PATCH 02/24] feat(http): support IPFS 412 parameters --- cmd/lassie/fetch.go | 3 +- pkg/internal/testutil/correctedmemstore.go | 30 ++++ pkg/internal/testutil/gen.go | 9 ++ pkg/internal/testutil/toblocks.go | 58 ++++++++ pkg/retriever/bitswapretriever_test.go | 26 +--- pkg/retriever/httpretriever.go | 12 +- pkg/retriever/httpretriever_test.go | 2 +- pkg/server/http/ipfs.go | 52 ++++++- pkg/storage/cachingtempstore_test.go | 2 +- pkg/storage/deferredcarwriter.go | 7 + pkg/storage/duplicateAdder.go | 61 -------- pkg/storage/duplicateaddercar.go | 126 ++++++++++++++++ pkg/storage/duplicateaddercar_test.go | 64 ++++++++ pkg/storage/storage_test.go | 4 +- pkg/types/request.go | 8 +- pkg/types/types.go | 7 +- pkg/verifiedcar/verifiedcar.go | 16 +- pkg/verifiedcar/verifiedcar_test.go | 165 ++++++--------------- 18 files changed, 414 insertions(+), 238 deletions(-) create mode 100644 pkg/internal/testutil/correctedmemstore.go create mode 100644 pkg/internal/testutil/toblocks.go delete mode 100644 pkg/storage/duplicateAdder.go create mode 100644 pkg/storage/duplicateaddercar.go create mode 100644 pkg/storage/duplicateaddercar_test.go diff --git a/cmd/lassie/fetch.go b/cmd/lassie/fetch.go index 0538a742..5bf7cc64 100644 --- a/cmd/lassie/fetch.go +++ b/cmd/lassie/fetch.go @@ -215,7 +215,8 @@ func Fetch(cctx *cli.Context) error { } else { carWriter = storage.NewDeferredCarWriterForPath(rootCid, outfile) } - carStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), tempDir) + tempStore := storage.NewDeferredStorageCar(tempDir) + carStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), tempStore) defer carStore.Close() var blockCount int diff --git a/pkg/internal/testutil/correctedmemstore.go b/pkg/internal/testutil/correctedmemstore.go new file mode 100644 index 00000000..4d41a67f --- /dev/null +++ b/pkg/internal/testutil/correctedmemstore.go @@ -0,0 +1,30 @@ +package testutil + +import ( + "context" + "io" + + format "github.com/ipfs/go-ipld-format" + "github.com/ipld/go-ipld-prime/storage/memstore" +) + +// TODO: remove when this is fixed in IPLD prime +type CorrectedMemStore struct { + *memstore.Store +} + +func (cms *CorrectedMemStore) Get(ctx context.Context, key string) ([]byte, error) { + data, err := cms.Store.Get(ctx, key) + if err != nil && err.Error() == "404" { + err = format.ErrNotFound{} + } + return data, err +} + +func (cms *CorrectedMemStore) GetStream(ctx context.Context, key string) (io.ReadCloser, error) { + rc, err := cms.Store.GetStream(ctx, key) + if err != nil && err.Error() == "404" { + err = format.ErrNotFound{} + } + return rc, err +} diff --git a/pkg/internal/testutil/gen.go b/pkg/internal/testutil/gen.go index a2992fae..dcb4e9be 100644 --- a/pkg/internal/testutil/gen.go +++ b/pkg/internal/testutil/gen.go @@ -139,3 +139,12 @@ func GenerateRetrievalIDs(t *testing.T, n int) []types.RetrievalID { } return retrievalIDs } + +type ZeroReader struct{} + +func (ZeroReader) Read(b []byte) (n int, err error) { + for i := range b { + b[i] = 0 + } + return len(b), nil +} diff --git a/pkg/internal/testutil/toblocks.go b/pkg/internal/testutil/toblocks.go new file mode 100644 index 00000000..4173fa57 --- /dev/null +++ b/pkg/internal/testutil/toblocks.go @@ -0,0 +1,58 @@ +package testutil + +import ( + "bytes" + "io" + "testing" + + "github.com/ipfs/go-cid" + "github.com/ipfs/go-libipfs/blocks" + "github.com/ipfs/go-unixfsnode" + dagpb "github.com/ipld/go-codec-dagpb" + "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/linking" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/node/basicnode" + "github.com/ipld/go-ipld-prime/traversal" + "github.com/ipld/go-ipld-prime/traversal/selector" + "github.com/stretchr/testify/require" +) + +// ToBlocks makes a block array from ordered blocks in a traversal +func ToBlocks(t *testing.T, lsys linking.LinkSystem, root cid.Cid, selNode datamodel.Node) []blocks.Block { + sel, err := selector.CompileSelector(selNode) + require.NoError(t, err) + traversedBlocks := make([]blocks.Block, 0) + unixfsnode.AddUnixFSReificationToLinkSystem(&lsys) + osro := lsys.StorageReadOpener + lsys.StorageReadOpener = func(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) { + r, err := osro(lc, l) + if err != nil { + return nil, err + } + byts, err := io.ReadAll(r) + if err != nil { + return nil, err + } + blk, err := blocks.NewBlockWithCid(byts, l.(cidlink.Link).Cid) + traversedBlocks = append(traversedBlocks, blk) + return bytes.NewReader(byts), nil + } + var proto datamodel.NodePrototype = basicnode.Prototype.Any + if root.Prefix().Codec == cid.DagProtobuf { + proto = dagpb.Type.PBNode + } + rootNode, err := lsys.Load(linking.LinkContext{}, cidlink.Link{Cid: root}, proto) + require.NoError(t, err) + prog := traversal.Progress{ + Cfg: &traversal.Config{ + LinkSystem: lsys, + LinkTargetNodePrototypeChooser: dagpb.AddSupportToChooser(basicnode.Chooser), + }, + } + vf := func(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error { return nil } + err = prog.WalkAdv(rootNode, sel, vf) + require.NoError(t, err) + + return traversedBlocks +} diff --git a/pkg/retriever/bitswapretriever_test.go b/pkg/retriever/bitswapretriever_test.go index d95cf949..42bd5490 100644 --- a/pkg/retriever/bitswapretriever_test.go +++ b/pkg/retriever/bitswapretriever_test.go @@ -17,7 +17,6 @@ import ( "github.com/ipfs/go-cid" gstestutil "github.com/ipfs/go-graphsync/testutil" exchange "github.com/ipfs/go-ipfs-exchange-interface" - format "github.com/ipfs/go-ipld-format" "github.com/ipfs/go-libipfs/blocks" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/linking" @@ -32,7 +31,7 @@ import ( func TestBitswapRetriever(t *testing.T) { ctx := context.Background() - store := &correctedMemStore{&memstore.Store{ + store := &testutil.CorrectedMemStore{&memstore.Store{ Bag: make(map[string][]byte), }} lsys := cidlink.DefaultLinkSystem() @@ -514,7 +513,7 @@ func makeLsys(blocks []blocks.Block) *linking.LinkSystem { bag[cidlink.Link{Cid: block.Cid()}.Binary()] = block.RawData() } lsys := cidlink.DefaultLinkSystem() - store := &correctedMemStore{&memstore.Store{Bag: bag}} + store := &testutil.CorrectedMemStore{&memstore.Store{Bag: bag}} lsys.SetReadStorage(store) lsys.SetWriteStorage(store) return &lsys @@ -528,27 +527,6 @@ func sizeOf(blocks []blocks.Block) uint64 { return total } -// TODO: remove when this is fixed in IPLD prime -type correctedMemStore struct { - *memstore.Store -} - -func (cms *correctedMemStore) Get(ctx context.Context, key string) ([]byte, error) { - data, err := cms.Store.Get(ctx, key) - if err != nil && err.Error() == "404" { - err = format.ErrNotFound{} - } - return data, err -} - -func (cms *correctedMemStore) GetStream(ctx context.Context, key string) (io.ReadCloser, error) { - rc, err := cms.Store.GetStream(ctx, key) - if err != nil && err.Error() == "404" { - err = format.ErrNotFound{} - } - return rc, err -} - type mockInProgressCids struct { incremented []cid.Cid decremented []cid.Cid diff --git a/pkg/retriever/httpretriever.go b/pkg/retriever/httpretriever.go index c3fe7235..19dd0fc6 100644 --- a/pkg/retriever/httpretriever.go +++ b/pkg/retriever/httpretriever.go @@ -15,7 +15,6 @@ import ( "github.com/filecoin-project/lassie/pkg/types" "github.com/filecoin-project/lassie/pkg/verifiedcar" "github.com/ipfs/go-cid" - "github.com/ipld/go-ipld-prime/traversal/selector" "github.com/ipni/go-libipni/metadata" "github.com/multiformats/go-multicodec" ) @@ -114,15 +113,10 @@ func (ph *ProtocolHttp) Retrieve( ttfb = retrieval.Clock.Since(phaseStartTime) shared.sendEvent(events.FirstByte(retrieval.Clock.Now(), retrieval.request.RetrievalID, phaseStartTime, candidate)) }) - - sel, err := selector.CompileSelector(retrieval.request.GetSelector()) - if err != nil { - return nil, err - } - cfg := verifiedcar.Config{ - Root: retrieval.request.Cid, - Selector: sel, + Root: retrieval.request.Cid, + Selector: retrieval.request.GetSelector(), + AllowDuplicatesIn: true, } blockCount, byteCount, err := cfg.Verify(ctx, rdr, retrieval.request.LinkSystem) diff --git a/pkg/retriever/httpretriever_test.go b/pkg/retriever/httpretriever_test.go index 1089195d..3a7f71bf 100644 --- a/pkg/retriever/httpretriever_test.go +++ b/pkg/retriever/httpretriever_test.go @@ -30,7 +30,7 @@ import ( func TestHTTPRetriever(t *testing.T) { ctx := context.Background() - store := &correctedMemStore{&memstore.Store{ + store := &testutil.CorrectedMemStore{&memstore.Store{ Bag: make(map[string][]byte), }} lsys := cidlink.DefaultLinkSystem() diff --git a/pkg/server/http/ipfs.go b/pkg/server/http/ipfs.go index d49ba37b..8311078a 100644 --- a/pkg/server/http/ipfs.go +++ b/pkg/server/http/ipfs.go @@ -50,13 +50,50 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response hasAccept := req.Header.Get("Accept") != "" acceptTypes := strings.Split(req.Header.Get("Accept"), ",") validAccept := false + includeDupes := true for _, acceptType := range acceptTypes { typeParts := strings.Split(acceptType, ";") if typeParts[0] == "*/*" || typeParts[0] == "application/*" || typeParts[0] == "application/vnd.ipld.car" { validAccept = true - break + if typeParts[0] == "application/vnd.ipld.car" { + // parse 412 car attributes + for _, nextPart := range typeParts[1:] { + pair := strings.Split(nextPart, "=") + if len(pair) == 2 { + attr := strings.TrimSpace(pair[0]) + value := strings.TrimSpace(pair[1]) + switch attr { + case "dups": + switch value { + case "y": + case "n": + includeDupes = false + default: + // don't accept un expected values + validAccept = false + } + case "version": + switch value { + case "1": + default: + // don't accept any version but 1 + validAccept = false + } + case "order": + // for now, dfs traversal satisfies all known orders + default: + // ignore others + } + } + } + } + // only break if further validation didn't fail + if validAccept { + break + } } } + if hasAccept && !validAccept { statusLogger.logStatus(http.StatusBadRequest, "No acceptable content type") res.WriteHeader(http.StatusBadRequest) @@ -188,9 +225,18 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response // the response writer. Once closed, no other content should be written. bytesWritten := make(chan struct{}, 1) - carWriter := storage.NewDeferredCarWriterForStream(rootCid, res) - carStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), cfg.TempDir) + tempStore := storage.NewDeferredStorageCar(cfg.TempDir) + var carWriter storage.DeferredWriter + if includeDupes { + carWriter = storage.NewDuplicateAdderCarForStream(req.Context(), rootCid, path.String(), dagScope, tempStore, res) + } else { + carWriter = storage.NewDeferredCarWriterForStream(rootCid, res) + } + carStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), tempStore) defer func() { + if err := carWriter.Close(); err != nil { + logger.Errorf("error closing car writer: %s", err) + } if err := carStore.Close(); err != nil { logger.Errorf("error closing temp store: %s", err) } diff --git a/pkg/storage/cachingtempstore_test.go b/pkg/storage/cachingtempstore_test.go index 66466ceb..a69efbf9 100644 --- a/pkg/storage/cachingtempstore_test.go +++ b/pkg/storage/cachingtempstore_test.go @@ -49,7 +49,7 @@ func TestDeferredCarWriterWritesCARv1(t *testing.T) { var buf bytes.Buffer cw := NewDeferredCarWriterForStream(testCid1, &buf) - ss := NewCachingTempStore(cw.BlockWriteOpener(), "") + ss := NewCachingTempStore(cw.BlockWriteOpener(), NewDeferredStorageCar("")) t.Cleanup(func() { ss.Close() }) if tt.readBeforeWrite { diff --git a/pkg/storage/deferredcarwriter.go b/pkg/storage/deferredcarwriter.go index bcc396f5..276b3ef1 100644 --- a/pkg/storage/deferredcarwriter.go +++ b/pkg/storage/deferredcarwriter.go @@ -22,6 +22,13 @@ type putCb struct { var _ ipldstorage.WritableStorage = (*DeferredCarWriter)(nil) var _ io.Closer = (*DeferredCarWriter)(nil) +type DeferredWriter interface { + ipldstorage.WritableStorage + io.Closer + BlockWriteOpener() linking.BlockWriteOpener + OnPut(cb func(int), once bool) +} + // DeferredCarWriter creates a write-only CARv1 either to an existing stream or // to a file designated by a supplied path. CARv1 content (including header) // only begins when the first Put() operation is performed. If the output is a diff --git a/pkg/storage/duplicateAdder.go b/pkg/storage/duplicateAdder.go deleted file mode 100644 index fc0547f1..00000000 --- a/pkg/storage/duplicateAdder.go +++ /dev/null @@ -1,61 +0,0 @@ -package storage - -import ( - "context" - "io" - - "github.com/filecoin-project/lassie/pkg/verifiedcar" - "github.com/ipfs/go-cid" - "github.com/ipld/go-ipld-prime" - "github.com/ipld/go-ipld-prime/linking" - "github.com/ipld/go-ipld-prime/traversal/selector" -) - -type DuplicateAdder struct { - ctx context.Context - cancel context.CancelFunc - root cid.Cid - selector ipld.Node - *DeferredCarWriter - outgoing *DeferredCarWriter -} - -func NewDuplicateAdderCarForStream(ctx context.Context, root cid.Cid, selector ipld.Node, store *DeferredStorageCar, outStream io.Writer) { - -} - -func (da *DuplicateAdder) addDupes(ctx context.Context) error { - - sel, err := selector.CompileSelector(da.selector) - if err != nil { - return nil, err - } - - cfg := verifiedcar.Config{ - Root: da.root, - Selector: sel, - } - - blockCount, byteCount, err := cfg.Verify(ctx, rdr, retrieval.request.LinkSystem) - if err != nil { - return nil, err - } - -} - -// OnPut will call a callback when each Put() operation is started. The argument -// to the callback is the number of bytes being written. If once is true, the -// callback will be removed after the first call. -func (da *DuplicateAdder) OnPut(cb func(int), once bool) { - -} - -// Close closes the underlying file, if one was created. -func (dcw *DeferredCarWriter) Close() error { - -} - -// BlockWriteOpener returns a BlockWriteOpener that operates on this storage. -func (da *DuplicateAdder) BlockWriteOpener() linking.BlockWriteOpener { - return da.incoming.BlockWriteOpener() -} diff --git a/pkg/storage/duplicateaddercar.go b/pkg/storage/duplicateaddercar.go new file mode 100644 index 00000000..6761004c --- /dev/null +++ b/pkg/storage/duplicateaddercar.go @@ -0,0 +1,126 @@ +package storage + +import ( + "context" + "io" + "sync" + + "github.com/filecoin-project/lassie/pkg/types" + "github.com/filecoin-project/lassie/pkg/verifiedcar" + "github.com/ipfs/go-cid" + log "github.com/ipfs/go-log/v2" + "github.com/ipfs/go-unixfsnode" + carv2 "github.com/ipld/go-car/v2" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" +) + +var logger = log.Logger("lassie/storage") + +type DuplicateAdderCar struct { + *DeferredCarWriter + bufferStream io.WriteCloser + ctx context.Context + streamCompletion chan error + streamCompletionLk sync.Mutex +} + +func NewDuplicateAdderCarForStream(ctx context.Context, root cid.Cid, path string, scope types.DagScope, store *DeferredStorageCar, outStream io.Writer) *DuplicateAdderCar { + // create an io pipe from incoming data in a regular, dedupped car file + // to outgoing data piped to a verification that inserts dups + incomingStream, bufferStream := io.Pipe() + + // create the dedupped car writer we'll write to first + buffer := NewDeferredCarWriterForStream(root, bufferStream) + + // create the car writer for the final stream + outgoing := NewDeferredCarWriterForStream(root, outStream, carv2.AllowDuplicatePuts(true)) + da := &DuplicateAdderCar{ + DeferredCarWriter: buffer, + bufferStream: bufferStream, + ctx: ctx, + } + // on first write to the dedupped writer, start the process of verifying and inserting dups + buffer.OnPut(func(int) { + da.addDupes(root, path, scope, store, outgoing, incomingStream) + }, true) + return da +} + +func (da *DuplicateAdderCar) addDupes(root cid.Cid, path string, scope types.DagScope, store *DeferredStorageCar, outgoing *DeferredCarWriter, incomingStream io.Reader) { + // first, check if we have a stream completion channel, and abort if this is called twice + da.streamCompletionLk.Lock() + if da.streamCompletion != nil { + da.streamCompletionLk.Unlock() + logger.Warnf("attempted to start duplicate streaming for the second time") + return + } + da.streamCompletion = make(chan error, 1) + da.streamCompletionLk.Unlock() + // start a go routine to verify the outgoing data and insert dups + go func() { + var err error + defer func() { + select { + case <-da.ctx.Done(): + case da.streamCompletion <- err: + } + }() + sel := types.PathScopeSelector(path, scope) + + // we're going to do a verified car where we add dupes back in + cfg := verifiedcar.Config{ + Root: root, + Selector: sel, + WriteDuplicatesOut: true, + } + + lsys := cidlink.DefaultLinkSystem() + // use the final car writer to write blocks + lsys.SetWriteStorage(outgoing) + // use the deferred storage car to read in any dups we need + // to serve + // TODO: we access the readWrite method interface directly to avoid + // locking the store, which can deadlock with a simultaneous call to + // Put because of the io.Pipe. We should find a better way to do this. + var rw ReadableWritableStorage + if rw, err = store.readWrite(); err != nil { + return + } + lsys.SetReadStorage(rw) + lsys.TrustedStorage = true + unixfsnode.AddUnixFSReificationToLinkSystem(&lsys) + + // run the verification + _, _, err = cfg.Verify(da.ctx, incomingStream, lsys) + return + }() +} + +// Close closes the dup stream, verifying completion, if one was created. +func (da *DuplicateAdderCar) Close() error { + // close the buffer writer + err := da.DeferredCarWriter.Close() + if err != nil { + return err + } + // close the write side of the io.Pipe -- this should trigger finishing + // the dupe verifcation + err = da.bufferStream.Close() + if err != nil { + return err + } + + // wait for the dupe stream to complete + da.streamCompletionLk.Lock() + streamCompletion := da.streamCompletion + da.streamCompletionLk.Unlock() + if streamCompletion == nil { + return nil + } + select { + case <-da.ctx.Done(): + return da.ctx.Err() + case err := <-streamCompletion: + return err + } +} diff --git a/pkg/storage/duplicateaddercar_test.go b/pkg/storage/duplicateaddercar_test.go new file mode 100644 index 00000000..df916770 --- /dev/null +++ b/pkg/storage/duplicateaddercar_test.go @@ -0,0 +1,64 @@ +package storage_test + +import ( + "bytes" + "context" + "io" + "testing" + + "github.com/filecoin-project/lassie/pkg/internal/testutil" + "github.com/filecoin-project/lassie/pkg/storage" + "github.com/filecoin-project/lassie/pkg/types" + "github.com/ipfs/go-libipfs/blocks" + unixfs "github.com/ipfs/go-unixfsnode/testutil" + carv2 "github.com/ipld/go-car/v2" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/storage/memstore" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" + "github.com/stretchr/testify/require" +) + +func TestDuplicateAdderCar(t *testing.T) { + + setupStore := &testutil.CorrectedMemStore{&memstore.Store{ + Bag: make(map[string][]byte), + }} + lsys := cidlink.DefaultLinkSystem() + lsys.TrustedStorage = true + lsys.SetReadStorage(setupStore) + lsys.SetWriteStorage(setupStore) + + unixfsFileWithDups := unixfs.GenerateFile(t, &lsys, testutil.ZeroReader{}, 4<<20) + unixfsFileWithDupsBlocks := testutil.ToBlocks(t, lsys, unixfsFileWithDups.Root, selectorparse.CommonSelector_ExploreAllRecursively) + buf := new(bytes.Buffer) + + store := storage.NewDeferredStorageCar("") + ctx := context.Background() + carWriter := storage.NewDuplicateAdderCarForStream(ctx, unixfsFileWithDups.Root, "", types.DagScopeAll, store, buf) + cachingTempStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), store) + + // write the root block + cachingTempStore.Put(ctx, unixfsFileWithDupsBlocks[0].Cid().KeyString(), unixfsFileWithDupsBlocks[0].RawData()) + // write the duped block + cachingTempStore.Put(ctx, unixfsFileWithDupsBlocks[1].Cid().KeyString(), unixfsFileWithDupsBlocks[1].RawData()) + // write the last block + cachingTempStore.Put(ctx, unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1].Cid().KeyString(), unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1].RawData()) + err := carWriter.Close() + require.NoError(t, err) + err = cachingTempStore.Close() + require.NoError(t, err) + + // now, verify the traversal output a whole car with dups. + reader, err := carv2.NewBlockReader(buf) + require.NoError(t, err) + receivedBlocks := make([]blocks.Block, 0, len(unixfsFileWithDupsBlocks)) + for { + blk, err := reader.Next() + if err == io.EOF { + break + } + require.NoError(t, err) + receivedBlocks = append(receivedBlocks, blk) + } + require.Equal(t, unixfsFileWithDupsBlocks, receivedBlocks) +} diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index 1b16da10..7b8a558e 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -40,7 +40,7 @@ func TestTempCarStorage(t *testing.T) { return nil }, nil } - cw = NewCachingTempStore(bwo, tempDir) + cw = NewCachingTempStore(bwo, NewDeferredStorageCar(tempDir)) } else { cw = NewDeferredStorageCar(tempDir) } @@ -154,7 +154,7 @@ func TestPreloadStore(t *testing.T) { return nil }, nil } - mainStore := NewCachingTempStore(bwo, t.TempDir()) + mainStore := NewCachingTempStore(bwo, NewDeferredStorageCar(t.TempDir())) t.Cleanup(func() { require.NoError(t, mainStore.Close()) }) diff --git a/pkg/types/request.go b/pkg/types/request.go index 02918f01..5f235a1d 100644 --- a/pkg/types/request.go +++ b/pkg/types/request.go @@ -88,14 +88,18 @@ func NewRequestForPath(store ipldstorage.WritableStorage, cid cid.Cid, path stri }, nil } +func PathScopeSelector(path string, scope DagScope) ipld.Node { + // Turn the path / scope into a selector + return unixfsnode.UnixFSPathSelectorBuilder(path, scope.TerminalSelectorSpec(), false) +} + // GetSelector will safely return a selector for this request. If none has been // set, it will generate one for the path & scope. func (r RetrievalRequest) GetSelector() ipld.Node { if r.Selector != nil { // custom selector return r.Selector } - // Turn the path / scope into a selector - return unixfsnode.UnixFSPathSelectorBuilder(r.Path, r.Scope.TerminalSelectorSpec(), false) + return PathScopeSelector(r.Path, r.Scope) } // GetUrlPath returns a URL path and query string valid with the Trusted HTTP diff --git a/pkg/types/types.go b/pkg/types/types.go index 6433aed5..a6472068 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -310,10 +310,5 @@ func (ds DagScope) TerminalSelectorSpec() builder.SelectorSpec { } func (ds DagScope) AcceptHeader() string { - switch ds { - case DagScopeBlock: - return "application/vnd.ipld.block" - default: - return "application/vnd.ipld.car" - } + return "application/vnd.ipld.car;version=1;order=dfs;dups=y" } diff --git a/pkg/verifiedcar/verifiedcar.go b/pkg/verifiedcar/verifiedcar.go index 29695ad9..55dd6c7b 100644 --- a/pkg/verifiedcar/verifiedcar.go +++ b/pkg/verifiedcar/verifiedcar.go @@ -34,11 +34,11 @@ var ( var protoChooser = dagpb.AddSupportToChooser(basicnode.Chooser) type Config struct { - Root cid.Cid // The single root we expect to appear in the CAR and that we use to run our traversal against - AllowCARv2 bool // If true, allow CARv2 files to be received, otherwise strictly only allow CARv1 - Selector selector.Selector // The selector to execute, starting at the provided Root, to verify the contents of the CAR - AllowDuplicatesIn bool // Handles whether the incoming stream has duplicates - WriteDuplicatesOut bool // Handles whether duplicates should be written a second time as blocks + Root cid.Cid // The single root we expect to appear in the CAR and that we use to run our traversal against + AllowCARv2 bool // If true, allow CARv2 files to be received, otherwise strictly only allow CARv1 + Selector datamodel.Node // The selector to execute, starting at the provided Root, to verify the contents of the CAR + AllowDuplicatesIn bool // Handles whether the incoming stream has duplicates + WriteDuplicatesOut bool // Handles whether duplicates should be written a second time as blocks } func visitNoop(p traversal.Progress, n datamodel.Node, r traversal.VisitReason) error { return nil } @@ -55,6 +55,10 @@ func visitNoop(p traversal.Progress, n datamodel.Node, r traversal.VisitReason) // // * https://specs.ipfs.tech/http-gateways/path-gateway/ func (cfg Config) Verify(ctx context.Context, rdr io.Reader, lsys linking.LinkSystem) (uint64, uint64, error) { + sel, err := selector.CompileSelector(cfg.Selector) + if err != nil { + return 0, 0, err + } cbr, err := car.NewBlockReader(rdr, car.WithTrustedCAR(false)) if err != nil { @@ -102,7 +106,7 @@ func (cfg Config) Verify(ctx context.Context, rdr io.Reader, lsys linking.LinkSy if err != nil { return 0, 0, err } - if err := progress.WalkAdv(rootNode, cfg.Selector, visitNoop); err != nil { + if err := progress.WalkAdv(rootNode, sel, visitNoop); err != nil { return 0, 0, traversalError(err) } diff --git a/pkg/verifiedcar/verifiedcar_test.go b/pkg/verifiedcar/verifiedcar_test.go index a974fde2..ef6a158b 100644 --- a/pkg/verifiedcar/verifiedcar_test.go +++ b/pkg/verifiedcar/verifiedcar_test.go @@ -9,22 +9,20 @@ import ( "testing" "time" + "github.com/filecoin-project/lassie/pkg/internal/testutil" "github.com/filecoin-project/lassie/pkg/verifiedcar" "github.com/ipfs/go-cid" gstestutil "github.com/ipfs/go-graphsync/testutil" - format "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-libipfs/blocks" "github.com/ipfs/go-unixfsnode" unixfs "github.com/ipfs/go-unixfsnode/testutil" "github.com/ipld/go-car/v2" "github.com/ipld/go-car/v2/storage" - dagpb "github.com/ipld/go-codec-dagpb" "github.com/ipld/go-ipld-prime/datamodel" "github.com/ipld/go-ipld-prime/linking" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/node/basicnode" "github.com/ipld/go-ipld-prime/storage/memstore" - "github.com/ipld/go-ipld-prime/traversal" - "github.com/ipld/go-ipld-prime/traversal/selector" selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" "github.com/stretchr/testify/require" ) @@ -38,7 +36,7 @@ func TestVerifiedCar(t *testing.T) { t.Logf("random seed: %d", rndSeed) var rndReader io.Reader = rand.New(rand.NewSource(rndSeed)) - store := &correctedMemStore{&memstore.Store{ + store := &testutil.CorrectedMemStore{&memstore.Store{ Bag: make(map[string][]byte), }} lsys := cidlink.DefaultLinkSystem() @@ -48,65 +46,65 @@ func TestVerifiedCar(t *testing.T) { tbc1 := gstestutil.SetupBlockChain(ctx, t, lsys, 1000, 100) root1 := tbc1.TipLink.(cidlink.Link).Cid - allBlocks := make([]block, 0, 100) - for _, b := range tbc1.AllBlocks() { - allBlocks = append(allBlocks, block{b.Cid(), b.RawData()}) - } + allBlocks := tbc1.AllBlocks() extraneousLnk, err := lsys.Store(linking.LinkContext{}, cidlink.LinkPrototype{Prefix: cid.Prefix{Version: 1, Codec: 0x71, MhType: 0x12, MhLength: 32}}, basicnode.NewString("borp")) req.NoError(err) extraneousByts, err := lsys.LoadRaw(linking.LinkContext{}, extraneousLnk) req.NoError(err) + extraneousBlk, err := blocks.NewBlockWithCid(extraneousByts, extraneousLnk.(cidlink.Link).Cid) + req.NoError(err) - allSelector := mustCompile(selectorparse.CommonSelector_ExploreAllRecursively) + allSelector := selectorparse.CommonSelector_ExploreAllRecursively wrapPath := "/some/path/to/content" unixfsFile := unixfs.GenerateFile(t, &lsys, rndReader, 4<<20) - unixfsFileBlocks := toBlocks(t, lsys, unixfsFile.Root, allSelector) + unixfsFileBlocks := testutil.ToBlocks(t, lsys, unixfsFile.Root, allSelector) - unixfsFileWithDups := unixfs.GenerateFile(t, &lsys, zeroReader{}, 4<<20) - unixfsFileWithDupsBlocks := toBlocks(t, lsys, unixfsFileWithDups.Root, allSelector) + unixfsFileWithDups := unixfs.GenerateFile(t, &lsys, testutil.ZeroReader{}, 4<<20) + unixfsFileWithDupsBlocks := testutil.ToBlocks(t, lsys, unixfsFileWithDups.Root, allSelector) var unixfsDir unixfs.DirEntry - var unixfsDirBlocks []block + var unixfsDirBlocks []blocks.Block for { unixfsDir = unixfs.GenerateDirectory(t, &lsys, rndReader, 8<<20, false) - unixfsDirBlocks = toBlocks(t, lsys, unixfsDir.Root, allSelector) + unixfsDirBlocks = testutil.ToBlocks(t, lsys, unixfsDir.Root, allSelector) if len(unixfsDir.Children) > 2 { // we want at least 3 children to test the path subset selector break } } unixfsShardedDir := unixfs.GenerateDirectory(t, &lsys, rndReader, 8<<20, true) - unixfsShardedDirBlocks := toBlocks(t, lsys, unixfsShardedDir.Root, allSelector) + unixfsShardedDirBlocks := testutil.ToBlocks(t, lsys, unixfsShardedDir.Root, allSelector) - unixfsPreloadSelector := mustCompile(unixfsnode.MatchUnixFSPreloadSelector.Node()) + unixfsPreloadSelector := unixfsnode.MatchUnixFSPreloadSelector.Node() - unixfsPreloadDirBlocks := toBlocks(t, lsys, unixfsDir.Root, unixfsPreloadSelector) - unixfsPreloadShardedDirBlocks := toBlocks(t, lsys, unixfsShardedDir.Root, unixfsPreloadSelector) + unixfsPreloadDirBlocks := testutil.ToBlocks(t, lsys, unixfsDir.Root, unixfsPreloadSelector) + unixfsPreloadShardedDirBlocks := testutil.ToBlocks(t, lsys, unixfsShardedDir.Root, unixfsPreloadSelector) - unixfsDirSubsetSelector := mustCompile(unixfsnode.UnixFSPathSelectorBuilder(unixfsDir.Children[1].Path, unixfsnode.MatchUnixFSPreloadSelector, false)) + unixfsDirSubsetSelector := unixfsnode.UnixFSPathSelectorBuilder(unixfsDir.Children[1].Path, unixfsnode.MatchUnixFSPreloadSelector, false) - unixfsWrappedPathSelector := mustCompile(unixfsnode.UnixFSPathSelectorBuilder(wrapPath, unixfsnode.ExploreAllRecursivelySelector, false)) - unixfsWrappedPreloadPathSelector := mustCompile(unixfsnode.UnixFSPathSelectorBuilder(wrapPath, unixfsnode.MatchUnixFSPreloadSelector, false)) + unixfsWrappedPathSelector := unixfsnode.UnixFSPathSelectorBuilder(wrapPath, unixfsnode.ExploreAllRecursivelySelector, false) + unixfsWrappedPreloadPathSelector := unixfsnode.UnixFSPathSelectorBuilder(wrapPath, unixfsnode.MatchUnixFSPreloadSelector, false) unixfsWrappedFile := unixfs.WrapContent(t, rndReader, &lsys, unixfsFile, wrapPath, false) - unixfsWrappedFileBlocks := toBlocks(t, lsys, unixfsWrappedFile.Root, allSelector) + unixfsWrappedFileBlocks := testutil.ToBlocks(t, lsys, unixfsWrappedFile.Root, allSelector) // "trimmed" is similar to "exclusive" except that "trimmed" is a subset // of a larger DAG, whereas "exclusive" is a complete DAG. - unixfsTrimmedWrappedFileBlocks := toBlocks(t, lsys, unixfsWrappedFile.Root, unixfsWrappedPathSelector) + unixfsTrimmedWrappedFileBlocks := testutil.ToBlocks(t, lsys, unixfsWrappedFile.Root, unixfsWrappedPathSelector) unixfsExclusiveWrappedFile := unixfs.WrapContent(t, rndReader, &lsys, unixfsFile, wrapPath, true) - unixfsExclusiveWrappedFileBlocks := toBlocks(t, lsys, unixfsExclusiveWrappedFile.Root, allSelector) + unixfsExclusiveWrappedFileBlocks := testutil.ToBlocks(t, lsys, unixfsExclusiveWrappedFile.Root, allSelector) unixfsWrappedShardedDir := unixfs.WrapContent(t, rndReader, &lsys, unixfsShardedDir, wrapPath, false) - unixfsWrappedShardedDirBlocks := toBlocks(t, lsys, unixfsWrappedShardedDir.Root, allSelector) + unixfsWrappedShardedDirBlocks := testutil.ToBlocks(t, lsys, unixfsWrappedShardedDir.Root, allSelector) // "trimmed" is similar to "exclusive" except that "trimmed" is a subset // of a larger DAG, whereas "exclusive" is a complete DAG. - unixfsTrimmedWrappedShardedDirBlocks := toBlocks(t, lsys, unixfsWrappedShardedDir.Root, unixfsWrappedPathSelector) - unixfsTrimmedWrappedShardedDirOnlyBlocks := toBlocks(t, lsys, unixfsWrappedShardedDir.Root, unixfsWrappedPreloadPathSelector) + unixfsTrimmedWrappedShardedDirBlocks := testutil.ToBlocks(t, lsys, unixfsWrappedShardedDir.Root, unixfsWrappedPathSelector) + unixfsTrimmedWrappedShardedDirOnlyBlocks := testutil.ToBlocks(t, lsys, unixfsWrappedShardedDir.Root, unixfsWrappedPreloadPathSelector) unixfsExclusiveWrappedShardedDir := unixfs.WrapContent(t, rndReader, &lsys, unixfsShardedDir, wrapPath, true) - unixfsExclusiveWrappedShardedDirBlocks := toBlocks(t, lsys, unixfsExclusiveWrappedShardedDir.Root, allSelector) - unixfsExclusiveWrappedShardedDirOnlyBlocks := toBlocks(t, lsys, unixfsExclusiveWrappedShardedDir.Root, unixfsWrappedPreloadPathSelector) + unixfsExclusiveWrappedShardedDirBlocks := testutil.ToBlocks(t, lsys, unixfsExclusiveWrappedShardedDir.Root, allSelector) + unixfsExclusiveWrappedShardedDirOnlyBlocks := testutil.ToBlocks(t, lsys, unixfsExclusiveWrappedShardedDir.Root, unixfsWrappedPreloadPathSelector) + mismatchedCidBlk, _ := blocks.NewBlockWithCid(extraneousByts, allBlocks[99].Cid()) testCases := []struct { name string blocks []expectedBlock @@ -169,7 +167,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "carv1 with extraneous trailing block errors", - blocks: append(consumedBlocks(append([]block{}, allBlocks...)), expectedBlock{block{extraneousLnk.(cidlink.Link).Cid, extraneousByts}, true}), + blocks: append(consumedBlocks(append([]blocks.Block{}, allBlocks...)), expectedBlock{extraneousBlk, true}), roots: []cid.Cid{root1}, err: "extraneous block in CAR", cfg: verifiedcar.Config{ @@ -179,9 +177,9 @@ func TestVerifiedCar(t *testing.T) { }, { name: "carv1 with extraneous leading block errors", - blocks: append(consumedBlocks([]block{{extraneousLnk.(cidlink.Link).Cid, extraneousByts}}), consumedBlocks(allBlocks)...), + blocks: append(consumedBlocks([]blocks.Block{extraneousBlk}), consumedBlocks(allBlocks)...), roots: []cid.Cid{root1}, - err: "unexpected block in CAR: " + extraneousLnk.(cidlink.Link).Cid.String() + " != " + allBlocks[0].cid.String(), + err: "unexpected block in CAR: " + extraneousLnk.(cidlink.Link).Cid.String() + " != " + allBlocks[0].Cid().String(), cfg: verifiedcar.Config{ Root: root1, Selector: allSelector, @@ -189,9 +187,9 @@ func TestVerifiedCar(t *testing.T) { }, { name: "carv1 with out-of-order blocks errors", - blocks: consumedBlocks(append(append([]block{}, allBlocks[50:]...), allBlocks[0:50]...)), + blocks: consumedBlocks(append(append([]blocks.Block{}, allBlocks[50:]...), allBlocks[0:50]...)), roots: []cid.Cid{root1}, - err: "unexpected block in CAR: " + allBlocks[50].cid.String() + " != " + allBlocks[0].cid.String(), + err: "unexpected block in CAR: " + allBlocks[50].Cid().String() + " != " + allBlocks[0].Cid().String(), cfg: verifiedcar.Config{ Root: root1, Selector: allSelector, @@ -199,7 +197,7 @@ func TestVerifiedCar(t *testing.T) { }, { name: "carv1 with mismatching CID errors", - blocks: consumedBlocks(append(append([]block{}, allBlocks[0:99]...), block{allBlocks[99].cid, extraneousByts})), + blocks: consumedBlocks(append(append([]blocks.Block{}, allBlocks[0:99]...), mismatchedCidBlk)), roots: []cid.Cid{root1}, err: "mismatch in content integrity", cfg: verifiedcar.Config{ @@ -423,7 +421,7 @@ func TestVerifiedCar(t *testing.T) { { name: "unixfs: file with dups, incoming has dups, not allowed", blocks: append(append(consumedBlocks(unixfsFileWithDupsBlocks[:2]), skippedBlocks(unixfsFileWithDupsBlocks[2:len(unixfsFileWithDupsBlocks)-1])...), consumedBlocks(unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1:])...), - err: "unexpected block in CAR: " + unixfsFileWithDupsBlocks[2].cid.String() + " != " + unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1].cid.String(), + err: "unexpected block in CAR: " + unixfsFileWithDupsBlocks[2].Cid().String() + " != " + unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1].Cid().String(), roots: []cid.Cid{unixfsFileWithDups.Root}, cfg: verifiedcar.Config{ Root: unixfsFileWithDups.Root, @@ -476,7 +474,7 @@ func TestVerifiedCar(t *testing.T) { req := require.New(t) - store := &correctedMemStore{&memstore.Store{ + store := &testutil.CorrectedMemStore{&memstore.Store{ Bag: make(map[string][]byte), }} lsys := cidlink.DefaultLinkSystem() @@ -491,8 +489,8 @@ func TestVerifiedCar(t *testing.T) { for testCase.blocks[writeCounter+skipped].skipped { skipped++ } - req.Equal(testCase.blocks[writeCounter+skipped].cid, l.(cidlink.Link).Cid, "block %d", writeCounter) - req.Equal(testCase.blocks[writeCounter+skipped].data, buf.Bytes(), "block %d", writeCounter) + req.Equal(testCase.blocks[writeCounter+skipped].Cid(), l.(cidlink.Link).Cid, "block %d", writeCounter) + req.Equal(testCase.blocks[writeCounter+skipped].RawData(), buf.Bytes(), "block %d", writeCounter) writeCounter++ w, wc, err := bwo(lc) if err != nil { @@ -560,7 +558,7 @@ func makeCarStream( return } for _, block := range blocks { - err := carWriter.Put(ctx, block.cid.KeyString(), block.data) + err := carWriter.Put(ctx, block.Cid().KeyString(), block.RawData()) if !expectErrors { req.NoError(err) } @@ -590,16 +588,12 @@ func makeCarStream( return r } -type block struct { - cid cid.Cid - data []byte -} type expectedBlock struct { - block + blocks.Block skipped bool } -func consumedBlocks(blocks []block) []expectedBlock { +func consumedBlocks(blocks []blocks.Block) []expectedBlock { expectedBlocks := make([]expectedBlock, 0, len(blocks)) for _, block := range blocks { expectedBlocks = append(expectedBlocks, expectedBlock{block, false}) @@ -607,7 +601,7 @@ func consumedBlocks(blocks []block) []expectedBlock { return expectedBlocks } -func skippedBlocks(blocks []block) []expectedBlock { +func skippedBlocks(blocks []blocks.Block) []expectedBlock { expectedBlocks := make([]expectedBlock, 0, len(blocks)) for _, block := range blocks { expectedBlocks = append(expectedBlocks, expectedBlock{block, true}) @@ -629,81 +623,8 @@ func sizeOf(blocks []expectedBlock) uint64 { total := uint64(0) for _, block := range blocks { if !block.skipped { - total += uint64(len(block.data)) + total += uint64(len(block.RawData())) } } return total } - -func toBlocks(t *testing.T, lsys linking.LinkSystem, root cid.Cid, sel selector.Selector) []block { - blocks := make([]block, 0) - unixfsnode.AddUnixFSReificationToLinkSystem(&lsys) - osro := lsys.StorageReadOpener - lsys.StorageReadOpener = func(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) { - r, err := osro(lc, l) - if err != nil { - return nil, err - } - byts, err := io.ReadAll(r) - if err != nil { - return nil, err - } - blocks = append(blocks, block{l.(cidlink.Link).Cid, byts}) - return bytes.NewReader(byts), nil - } - var proto datamodel.NodePrototype = basicnode.Prototype.Any - if root.Prefix().Codec == cid.DagProtobuf { - proto = dagpb.Type.PBNode - } - rootNode, err := lsys.Load(linking.LinkContext{}, cidlink.Link{Cid: root}, proto) - require.NoError(t, err) - prog := traversal.Progress{ - Cfg: &traversal.Config{ - LinkSystem: lsys, - LinkTargetNodePrototypeChooser: dagpb.AddSupportToChooser(basicnode.Chooser), - }, - } - vf := func(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error { return nil } - err = prog.WalkAdv(rootNode, sel, vf) - require.NoError(t, err) - - return blocks -} - -// TODO: remove when this is fixed in IPLD prime -type correctedMemStore struct { - *memstore.Store -} - -func (cms *correctedMemStore) Get(ctx context.Context, key string) ([]byte, error) { - data, err := cms.Store.Get(ctx, key) - if err != nil && err.Error() == "404" { - err = format.ErrNotFound{} - } - return data, err -} - -func (cms *correctedMemStore) GetStream(ctx context.Context, key string) (io.ReadCloser, error) { - rc, err := cms.Store.GetStream(ctx, key) - if err != nil && err.Error() == "404" { - err = format.ErrNotFound{} - } - return rc, err -} - -func mustCompile(selNode datamodel.Node) selector.Selector { - sel, err := selector.CompileSelector(selNode) - if err != nil { - panic(err) - } - return sel -} - -type zeroReader struct{} - -func (zeroReader) Read(b []byte) (n int, err error) { - for i := range b { - b[i] = 0 - } - return len(b), nil -} From baff9da9d2f11f2fd067781adc9dc0d541195971 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 24 May 2023 15:08:05 -0700 Subject: [PATCH 03/24] refactor(duplicateadder): use more flexible block stream to handle traversal --- pkg/internal/itest/http_fetch_test.go | 53 ++++++- pkg/internal/itest/testpeer/generator.go | 20 ++- pkg/retriever/httpretriever.go | 2 +- pkg/storage/duplicateaddercar.go | 185 ++++++++++++++--------- pkg/verifiedcar/verifiedcar.go | 24 ++- pkg/verifiedcar/verifiedcar_test.go | 2 +- 6 files changed, 204 insertions(+), 82 deletions(-) diff --git a/pkg/internal/itest/http_fetch_test.go b/pkg/internal/itest/http_fetch_test.go index 437727b1..fadb8ec6 100644 --- a/pkg/internal/itest/http_fetch_test.go +++ b/pkg/internal/itest/http_fetch_test.go @@ -19,8 +19,10 @@ import ( datatransfer "github.com/filecoin-project/go-data-transfer/v2" "github.com/filecoin-project/lassie/pkg/internal/itest/mocknet" "github.com/filecoin-project/lassie/pkg/internal/itest/testpeer" + "github.com/filecoin-project/lassie/pkg/internal/testutil" "github.com/filecoin-project/lassie/pkg/lassie" httpserver "github.com/filecoin-project/lassie/pkg/server/http" + "github.com/filecoin-project/lassie/pkg/verifiedcar" "github.com/google/uuid" "github.com/ipfs/go-cid" unixfs "github.com/ipfs/go-unixfsnode/testutil" @@ -30,6 +32,8 @@ import ( "github.com/ipld/go-ipld-prime/datamodel" "github.com/ipld/go-ipld-prime/linking" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/storage/memstore" + selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multicodec" "github.com/stretchr/testify/require" @@ -49,7 +53,10 @@ func TestHttpFetch(t *testing.T) { blockQuery := func(q url.Values, _ []testpeer.TestPeer) { q.Set("dag-scope", "block") } - + noDups := func(header http.Header) { + header.Set("Accept", "application/vnd.ipld.car;order=dfs;version=1;dups=n;") + } + type headerSetter func(http.Header) type queryModifier func(url.Values, []testpeer.TestPeer) type bodyValidator func(*testing.T, unixfs.DirEntry, []byte) @@ -66,6 +73,7 @@ func TestHttpFetch(t *testing.T) { modifyHttpConfig func(httpserver.HttpServerConfig) httpserver.HttpServerConfig generate func(*testing.T, io.Reader, []testpeer.TestPeer) []unixfs.DirEntry paths []string + setHeader headerSetter modifyQueries []queryModifier validateBodies []bodyValidator }{ @@ -646,6 +654,43 @@ func TestHttpFetch(t *testing.T) { v.Set("providers", strings.Join(maStrings, ",")) }}, }, + { + name: "http large sharded file with dups", + httpRemotes: 1, + generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry { + return []unixfs.DirEntry{unixfs.GenerateFile(t, remotes[0].LinkSystem, testutil.ZeroReader{}, 4<<20)} + }, + validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) { + store := &testutil.CorrectedMemStore{&memstore.Store{ + Bag: make(map[string][]byte), + }} + lsys := cidlink.DefaultLinkSystem() + lsys.SetReadStorage(store) + lsys.SetWriteStorage(store) + _, _, err := verifiedcar.Config{ + Root: srcData.Root, + Selector: selectorparse.CommonSelector_ExploreAllRecursively, + AllowDuplicatesIn: true, + }.VerifyCar(context.Background(), bytes.NewReader(body), lsys) + require.NoError(t, err) + }}, + }, + { + name: "http large sharded file with dups, no dups response requested", + httpRemotes: 1, + setHeader: noDups, + generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry { + return []unixfs.DirEntry{unixfs.GenerateFile(t, remotes[0].LinkSystem, testutil.ZeroReader{}, 4<<20)} + }, + validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) { + wantCids := []cid.Cid{ + srcData.Root, // "/"" + srcData.SelfCids[1], + srcData.SelfCids[len(srcData.SelfCids)-1], + } + validateCarBody(t, body, srcData.Root, wantCids, true) + }}, + }, } for _, testCase := range testCases { @@ -712,7 +757,11 @@ func TestHttpFetch(t *testing.T) { addr := fmt.Sprintf("http://%s/ipfs/%s%s", httpServer.Addr(), srcData[i].Root.String(), path) getReq, err := http.NewRequest("GET", addr, nil) req.NoError(err) - getReq.Header.Add("Accept", "application/vnd.ipld.car") + if testCase.setHeader == nil { + getReq.Header.Add("Accept", "application/vnd.ipld.car") + } else { + testCase.setHeader(getReq.Header) + } if testCase.modifyQueries != nil && testCase.modifyQueries[i] != nil { q := getReq.URL.Query() testCase.modifyQueries[i](q, mrn.Remotes) diff --git a/pkg/internal/itest/testpeer/generator.go b/pkg/internal/itest/testpeer/generator.go index b478519e..eef3cc24 100644 --- a/pkg/internal/itest/testpeer/generator.go +++ b/pkg/internal/itest/testpeer/generator.go @@ -355,6 +355,24 @@ func MockIpfsHandler(ctx context.Context, lsys linking.LinkSystem) func(http.Res unixfsPath = "/" + strings.Join(urlPath[2:], "/") } + acceptTypes := strings.Split(req.Header.Get("Accept"), ",") + includeDupes := false + for _, acceptType := range acceptTypes { + typeParts := strings.Split(acceptType, ";") + if typeParts[0] == "application/vnd.ipld.car" { + for _, nextPart := range typeParts[1:] { + pair := strings.Split(nextPart, "=") + if len(pair) == 2 { + attr := strings.TrimSpace(pair[0]) + value := strings.TrimSpace(pair[1]) + if attr == "dups" && value == "y" { + includeDupes = true + } + } + } + } + } + // We're always providing the dag-scope parameter, so add a failure case if we stop // providing it in the future if !req.URL.Query().Has("dag-scope") { @@ -384,7 +402,7 @@ func MockIpfsHandler(ctx context.Context, lsys linking.LinkSystem) func(http.Res } // Write to response writer - carWriter, err := storage.NewWritable(res, []cid.Cid{rootCid}, car.WriteAsCarV1(true), car.AllowDuplicatePuts(false)) + carWriter, err := storage.NewWritable(res, []cid.Cid{rootCid}, car.WriteAsCarV1(true), car.AllowDuplicatePuts(includeDupes)) if err != nil { http.Error(res, fmt.Sprintf("Failed to create car writer: %v", err), http.StatusInternalServerError) return diff --git a/pkg/retriever/httpretriever.go b/pkg/retriever/httpretriever.go index 19dd0fc6..e1113a37 100644 --- a/pkg/retriever/httpretriever.go +++ b/pkg/retriever/httpretriever.go @@ -119,7 +119,7 @@ func (ph *ProtocolHttp) Retrieve( AllowDuplicatesIn: true, } - blockCount, byteCount, err := cfg.Verify(ctx, rdr, retrieval.request.LinkSystem) + blockCount, byteCount, err := cfg.VerifyCar(ctx, rdr, retrieval.request.LinkSystem) if err != nil { return nil, err } diff --git a/pkg/storage/duplicateaddercar.go b/pkg/storage/duplicateaddercar.go index 6761004c..bcfe4f40 100644 --- a/pkg/storage/duplicateaddercar.go +++ b/pkg/storage/duplicateaddercar.go @@ -1,16 +1,22 @@ package storage import ( + "bytes" + "container/list" "context" + "fmt" "io" "sync" "github.com/filecoin-project/lassie/pkg/types" "github.com/filecoin-project/lassie/pkg/verifiedcar" "github.com/ipfs/go-cid" + "github.com/ipfs/go-libipfs/blocks" log "github.com/ipfs/go-log/v2" "github.com/ipfs/go-unixfsnode" carv2 "github.com/ipld/go-car/v2" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/linking" cidlink "github.com/ipld/go-ipld-prime/linking/cid" ) @@ -18,97 +24,94 @@ var logger = log.Logger("lassie/storage") type DuplicateAdderCar struct { *DeferredCarWriter - bufferStream io.WriteCloser ctx context.Context + root cid.Cid + path string + scope types.DagScope + store *DeferredStorageCar + blockStream *blockStream streamCompletion chan error streamCompletionLk sync.Mutex } func NewDuplicateAdderCarForStream(ctx context.Context, root cid.Cid, path string, scope types.DagScope, store *DeferredStorageCar, outStream io.Writer) *DuplicateAdderCar { - // create an io pipe from incoming data in a regular, dedupped car file - // to outgoing data piped to a verification that inserts dups - incomingStream, bufferStream := io.Pipe() - - // create the dedupped car writer we'll write to first - buffer := NewDeferredCarWriterForStream(root, bufferStream) + blockStream := &blockStream{} + blockStream.blockBuffer = list.New() + blockStream.cond = sync.NewCond(&blockStream.mu) // create the car writer for the final stream outgoing := NewDeferredCarWriterForStream(root, outStream, carv2.AllowDuplicatePuts(true)) - da := &DuplicateAdderCar{ - DeferredCarWriter: buffer, - bufferStream: bufferStream, + return &DuplicateAdderCar{ + DeferredCarWriter: outgoing, ctx: ctx, + root: root, + path: path, + scope: scope, + store: store, + blockStream: blockStream, } - // on first write to the dedupped writer, start the process of verifying and inserting dups - buffer.OnPut(func(int) { - da.addDupes(root, path, scope, store, outgoing, incomingStream) - }, true) - return da } -func (da *DuplicateAdderCar) addDupes(root cid.Cid, path string, scope types.DagScope, store *DeferredStorageCar, outgoing *DeferredCarWriter, incomingStream io.Reader) { - // first, check if we have a stream completion channel, and abort if this is called twice - da.streamCompletionLk.Lock() - if da.streamCompletion != nil { - da.streamCompletionLk.Unlock() - logger.Warnf("attempted to start duplicate streaming for the second time") - return - } - da.streamCompletion = make(chan error, 1) - da.streamCompletionLk.Unlock() - // start a go routine to verify the outgoing data and insert dups - go func() { - var err error - defer func() { - select { - case <-da.ctx.Done(): - case da.streamCompletion <- err: - } - }() - sel := types.PathScopeSelector(path, scope) - - // we're going to do a verified car where we add dupes back in - cfg := verifiedcar.Config{ - Root: root, - Selector: sel, - WriteDuplicatesOut: true, +func (da *DuplicateAdderCar) addDupes() { + var err error + defer func() { + select { + case <-da.ctx.Done(): + case da.streamCompletion <- err: } + }() + sel := types.PathScopeSelector(da.path, da.scope) - lsys := cidlink.DefaultLinkSystem() - // use the final car writer to write blocks - lsys.SetWriteStorage(outgoing) - // use the deferred storage car to read in any dups we need - // to serve - // TODO: we access the readWrite method interface directly to avoid - // locking the store, which can deadlock with a simultaneous call to - // Put because of the io.Pipe. We should find a better way to do this. - var rw ReadableWritableStorage - if rw, err = store.readWrite(); err != nil { - return - } - lsys.SetReadStorage(rw) - lsys.TrustedStorage = true - unixfsnode.AddUnixFSReificationToLinkSystem(&lsys) + // we're going to do a verified car where we add dupes back in + cfg := verifiedcar.Config{ + Root: da.root, + Selector: sel, + WriteDuplicatesOut: true, + } - // run the verification - _, _, err = cfg.Verify(da.ctx, incomingStream, lsys) - return - }() + lsys := cidlink.DefaultLinkSystem() + // use the final car writer to write blocks + lsys.SetWriteStorage(da) + // use the deferred storage car to read in any dups we need + // to serve + lsys.SetReadStorage(da.store) + lsys.TrustedStorage = true + unixfsnode.AddUnixFSReificationToLinkSystem(&lsys) + + // run the verification + _, _, err = cfg.VerifyBlockStream(da.ctx, da.blockStream, lsys) + return +} + +func (da *DuplicateAdderCar) BlockWriteOpener() linking.BlockWriteOpener { + return func(lctx linking.LinkContext) (io.Writer, linking.BlockWriteCommitter, error) { + // first, check if we have a stream completion channel, and abort if this is called twice + da.streamCompletionLk.Lock() + if da.streamCompletion == nil { + da.streamCompletion = make(chan error, 1) + go da.addDupes() + } + da.streamCompletionLk.Unlock() + var buf bytes.Buffer + var written bool + return &buf, func(lnk ipld.Link) error { + if written { + return fmt.Errorf("WriteCommitter already used") + } + written = true + blk, err := blocks.NewBlockWithCid(buf.Bytes(), lnk.(cidlink.Link).Cid) + if err != nil { + return err + } + return da.blockStream.WriteBlock(blk) + }, nil + } } // Close closes the dup stream, verifying completion, if one was created. func (da *DuplicateAdderCar) Close() error { - // close the buffer writer - err := da.DeferredCarWriter.Close() - if err != nil { - return err - } - // close the write side of the io.Pipe -- this should trigger finishing - // the dupe verifcation - err = da.bufferStream.Close() - if err != nil { - return err - } + // close the block stream + da.blockStream.Close() // wait for the dupe stream to complete da.streamCompletionLk.Lock() @@ -124,3 +127,43 @@ func (da *DuplicateAdderCar) Close() error { return err } } + +type blockStream struct { + done bool + mu sync.Mutex + cond *sync.Cond + blockBuffer *list.List +} + +func (bs *blockStream) Close() { + bs.mu.Lock() + bs.done = true + bs.mu.Unlock() + bs.cond.Signal() +} + +func (bs *blockStream) WriteBlock(blk blocks.Block) error { + bs.mu.Lock() + defer bs.mu.Unlock() + if bs.done { + return errClosed + } + bs.blockBuffer.PushBack(blk) + bs.cond.Signal() + return nil +} + +func (bs *blockStream) Next() (blocks.Block, error) { + bs.mu.Lock() + defer bs.mu.Unlock() + + for { + if e := bs.blockBuffer.Front(); e != nil { + return bs.blockBuffer.Remove(e).(blocks.Block), nil + } + if bs.done { + return nil, io.EOF + } + bs.cond.Wait() + } +} diff --git a/pkg/verifiedcar/verifiedcar.go b/pkg/verifiedcar/verifiedcar.go index 55dd6c7b..abf80d02 100644 --- a/pkg/verifiedcar/verifiedcar.go +++ b/pkg/verifiedcar/verifiedcar.go @@ -10,6 +10,7 @@ import ( "github.com/ipfs/go-cid" format "github.com/ipfs/go-ipld-format" + "github.com/ipfs/go-libipfs/blocks" "github.com/ipfs/go-unixfsnode" "github.com/ipld/go-car/v2" dagpb "github.com/ipld/go-codec-dagpb" @@ -54,11 +55,7 @@ func visitNoop(p traversal.Progress, n datamodel.Node, r traversal.VisitReason) // * https://specs.ipfs.tech/http-gateways/trustless-gateway/ // // * https://specs.ipfs.tech/http-gateways/path-gateway/ -func (cfg Config) Verify(ctx context.Context, rdr io.Reader, lsys linking.LinkSystem) (uint64, uint64, error) { - sel, err := selector.CompileSelector(cfg.Selector) - if err != nil { - return 0, 0, err - } +func (cfg Config) VerifyCar(ctx context.Context, rdr io.Reader, lsys linking.LinkSystem) (uint64, uint64, error) { cbr, err := car.NewBlockReader(rdr, car.WithTrustedCAR(false)) if err != nil { @@ -79,6 +76,20 @@ func (cfg Config) Verify(ctx context.Context, rdr io.Reader, lsys linking.LinkSy if len(cbr.Roots) != 1 || cbr.Roots[0] != cfg.Root { return 0, 0, ErrBadRoots } + return cfg.VerifyBlockStream(ctx, cbr, lsys) +} + +type BlockReader interface { + Next() (blocks.Block, error) +} + +func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys linking.LinkSystem) (uint64, uint64, error) { + + sel, err := selector.CompileSelector(cfg.Selector) + if err != nil { + return 0, 0, err + } + cr := &carReader{ cbr: cbr, } @@ -129,6 +140,7 @@ func (cfg *Config) nextBlockReadOpener(ctx context.Context, cr *carReader, bt *w var err error if _, ok := seen[cid]; ok { if cfg.AllowDuplicatesIn { + // duplicate block, but in this case we are expecting the stream to have it data, err = cr.readNextBlock(ctx, cid) if err != nil { return nil, err @@ -174,7 +186,7 @@ func (cfg *Config) nextBlockReadOpener(ctx context.Context, cr *carReader, bt *w } type carReader struct { - cbr *car.BlockReader + cbr BlockReader } func (cr *carReader) readNextBlock(ctx context.Context, expected cid.Cid) ([]byte, error) { diff --git a/pkg/verifiedcar/verifiedcar_test.go b/pkg/verifiedcar/verifiedcar_test.go index ef6a158b..d375dc16 100644 --- a/pkg/verifiedcar/verifiedcar_test.go +++ b/pkg/verifiedcar/verifiedcar_test.go @@ -502,7 +502,7 @@ func TestVerifiedCar(t *testing.T) { } carStream := makeCarStream(t, ctx, testCase.roots, testCase.blocks, testCase.carv2, testCase.err != "", testCase.incomingHasDups) - blockCount, byteCount, err := testCase.cfg.Verify(ctx, carStream, lsys) + blockCount, byteCount, err := testCase.cfg.VerifyCar(ctx, carStream, lsys) // read the rest of data io.ReadAll(carStream) From 91c64825e330ae4f84c5a2e689c6214ad7eb9366 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 24 May 2023 15:43:05 -0700 Subject: [PATCH 04/24] fix(storage): close underlying writer --- pkg/storage/duplicateaddercar.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/storage/duplicateaddercar.go b/pkg/storage/duplicateaddercar.go index bcfe4f40..f0d5bf3e 100644 --- a/pkg/storage/duplicateaddercar.go +++ b/pkg/storage/duplicateaddercar.go @@ -124,7 +124,11 @@ func (da *DuplicateAdderCar) Close() error { case <-da.ctx.Done(): return da.ctx.Err() case err := <-streamCompletion: - return err + if err != nil { + return err + } + da.DeferredCarWriter.Close() + return nil } } From 80c25a8c8c7457ab3b7660b18d25c1305759f128 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 24 May 2023 15:49:03 -0700 Subject: [PATCH 05/24] style(lint): fix lint issues --- pkg/internal/testutil/toblocks.go | 3 +++ pkg/storage/duplicateaddercar.go | 4 ---- pkg/verifiedcar/verifiedcar.go | 3 +-- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/pkg/internal/testutil/toblocks.go b/pkg/internal/testutil/toblocks.go index 4173fa57..729d5f37 100644 --- a/pkg/internal/testutil/toblocks.go +++ b/pkg/internal/testutil/toblocks.go @@ -35,6 +35,9 @@ func ToBlocks(t *testing.T, lsys linking.LinkSystem, root cid.Cid, selNode datam return nil, err } blk, err := blocks.NewBlockWithCid(byts, l.(cidlink.Link).Cid) + if err != nil { + return nil, err + } traversedBlocks = append(traversedBlocks, blk) return bytes.NewReader(byts), nil } diff --git a/pkg/storage/duplicateaddercar.go b/pkg/storage/duplicateaddercar.go index f0d5bf3e..d89da776 100644 --- a/pkg/storage/duplicateaddercar.go +++ b/pkg/storage/duplicateaddercar.go @@ -12,7 +12,6 @@ import ( "github.com/filecoin-project/lassie/pkg/verifiedcar" "github.com/ipfs/go-cid" "github.com/ipfs/go-libipfs/blocks" - log "github.com/ipfs/go-log/v2" "github.com/ipfs/go-unixfsnode" carv2 "github.com/ipld/go-car/v2" "github.com/ipld/go-ipld-prime" @@ -20,8 +19,6 @@ import ( cidlink "github.com/ipld/go-ipld-prime/linking/cid" ) -var logger = log.Logger("lassie/storage") - type DuplicateAdderCar struct { *DeferredCarWriter ctx context.Context @@ -80,7 +77,6 @@ func (da *DuplicateAdderCar) addDupes() { // run the verification _, _, err = cfg.VerifyBlockStream(da.ctx, da.blockStream, lsys) - return } func (da *DuplicateAdderCar) BlockWriteOpener() linking.BlockWriteOpener { diff --git a/pkg/verifiedcar/verifiedcar.go b/pkg/verifiedcar/verifiedcar.go index abf80d02..1c002959 100644 --- a/pkg/verifiedcar/verifiedcar.go +++ b/pkg/verifiedcar/verifiedcar.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "github.com/ipfs/go-cid" format "github.com/ipfs/go-ipld-format" @@ -154,7 +153,7 @@ func (cfg *Config) nextBlockReadOpener(ctx context.Context, cr *carReader, bt *w if !cfg.WriteDuplicatesOut { return rdr, err } - data, err = ioutil.ReadAll(rdr) + data, err = io.ReadAll(rdr) if err != nil { return nil, err } From fbfd04d94873cb59206ed6b1bd01795147e76186 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 24 May 2023 15:51:31 -0700 Subject: [PATCH 06/24] fix(storage): abort duplicate adder on context error --- pkg/storage/duplicateaddercar.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/storage/duplicateaddercar.go b/pkg/storage/duplicateaddercar.go index d89da776..ec1d175e 100644 --- a/pkg/storage/duplicateaddercar.go +++ b/pkg/storage/duplicateaddercar.go @@ -32,7 +32,7 @@ type DuplicateAdderCar struct { } func NewDuplicateAdderCarForStream(ctx context.Context, root cid.Cid, path string, scope types.DagScope, store *DeferredStorageCar, outStream io.Writer) *DuplicateAdderCar { - blockStream := &blockStream{} + blockStream := &blockStream{ctx: ctx} blockStream.blockBuffer = list.New() blockStream.cond = sync.NewCond(&blockStream.mu) @@ -120,16 +120,13 @@ func (da *DuplicateAdderCar) Close() error { case <-da.ctx.Done(): return da.ctx.Err() case err := <-streamCompletion: - if err != nil { - return err - } - da.DeferredCarWriter.Close() - return nil + return err } } type blockStream struct { done bool + ctx context.Context mu sync.Mutex cond *sync.Cond blockBuffer *list.List @@ -158,6 +155,11 @@ func (bs *blockStream) Next() (blocks.Block, error) { defer bs.mu.Unlock() for { + select { + case <-bs.ctx.Done(): + return nil, bs.ctx.Err() + default: + } if e := bs.blockBuffer.Front(); e != nil { return bs.blockBuffer.Remove(e).(blocks.Block), nil } From 7b1bb6e33f453a6efd13b298dd55ade63d01acdd Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 24 May 2023 16:15:57 -0700 Subject: [PATCH 07/24] style(lint): fix lint issues --- pkg/internal/itest/http_fetch_test.go | 2 +- pkg/retriever/bitswapretriever_test.go | 4 ++-- pkg/retriever/httpretriever_test.go | 2 +- pkg/storage/duplicateaddercar_test.go | 2 +- pkg/verifiedcar/verifiedcar_test.go | 4 ++-- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/internal/itest/http_fetch_test.go b/pkg/internal/itest/http_fetch_test.go index fadb8ec6..f6b9316d 100644 --- a/pkg/internal/itest/http_fetch_test.go +++ b/pkg/internal/itest/http_fetch_test.go @@ -661,7 +661,7 @@ func TestHttpFetch(t *testing.T) { return []unixfs.DirEntry{unixfs.GenerateFile(t, remotes[0].LinkSystem, testutil.ZeroReader{}, 4<<20)} }, validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) { - store := &testutil.CorrectedMemStore{&memstore.Store{ + store := &testutil.CorrectedMemStore{Store: &memstore.Store{ Bag: make(map[string][]byte), }} lsys := cidlink.DefaultLinkSystem() diff --git a/pkg/retriever/bitswapretriever_test.go b/pkg/retriever/bitswapretriever_test.go index 42bd5490..1d7b5c42 100644 --- a/pkg/retriever/bitswapretriever_test.go +++ b/pkg/retriever/bitswapretriever_test.go @@ -31,7 +31,7 @@ import ( func TestBitswapRetriever(t *testing.T) { ctx := context.Background() - store := &testutil.CorrectedMemStore{&memstore.Store{ + store := &testutil.CorrectedMemStore{Store: &memstore.Store{ Bag: make(map[string][]byte), }} lsys := cidlink.DefaultLinkSystem() @@ -513,7 +513,7 @@ func makeLsys(blocks []blocks.Block) *linking.LinkSystem { bag[cidlink.Link{Cid: block.Cid()}.Binary()] = block.RawData() } lsys := cidlink.DefaultLinkSystem() - store := &testutil.CorrectedMemStore{&memstore.Store{Bag: bag}} + store := &testutil.CorrectedMemStore{Store: &memstore.Store{Bag: bag}} lsys.SetReadStorage(store) lsys.SetWriteStorage(store) return &lsys diff --git a/pkg/retriever/httpretriever_test.go b/pkg/retriever/httpretriever_test.go index 3a7f71bf..7b766dde 100644 --- a/pkg/retriever/httpretriever_test.go +++ b/pkg/retriever/httpretriever_test.go @@ -30,7 +30,7 @@ import ( func TestHTTPRetriever(t *testing.T) { ctx := context.Background() - store := &testutil.CorrectedMemStore{&memstore.Store{ + store := &testutil.CorrectedMemStore{Store: &memstore.Store{ Bag: make(map[string][]byte), }} lsys := cidlink.DefaultLinkSystem() diff --git a/pkg/storage/duplicateaddercar_test.go b/pkg/storage/duplicateaddercar_test.go index df916770..3518a306 100644 --- a/pkg/storage/duplicateaddercar_test.go +++ b/pkg/storage/duplicateaddercar_test.go @@ -20,7 +20,7 @@ import ( func TestDuplicateAdderCar(t *testing.T) { - setupStore := &testutil.CorrectedMemStore{&memstore.Store{ + setupStore := &testutil.CorrectedMemStore{Store: &memstore.Store{ Bag: make(map[string][]byte), }} lsys := cidlink.DefaultLinkSystem() diff --git a/pkg/verifiedcar/verifiedcar_test.go b/pkg/verifiedcar/verifiedcar_test.go index d375dc16..3fe79dfd 100644 --- a/pkg/verifiedcar/verifiedcar_test.go +++ b/pkg/verifiedcar/verifiedcar_test.go @@ -36,7 +36,7 @@ func TestVerifiedCar(t *testing.T) { t.Logf("random seed: %d", rndSeed) var rndReader io.Reader = rand.New(rand.NewSource(rndSeed)) - store := &testutil.CorrectedMemStore{&memstore.Store{ + store := &testutil.CorrectedMemStore{Store: &memstore.Store{ Bag: make(map[string][]byte), }} lsys := cidlink.DefaultLinkSystem() @@ -474,7 +474,7 @@ func TestVerifiedCar(t *testing.T) { req := require.New(t) - store := &testutil.CorrectedMemStore{&memstore.Store{ + store := &testutil.CorrectedMemStore{Store: &memstore.Store{ Bag: make(map[string][]byte), }} lsys := cidlink.DefaultLinkSystem() From 7ae1acd4e8ad6eb24afd6d674e9817abb69387c8 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 24 May 2023 18:30:45 -0700 Subject: [PATCH 08/24] fix(verifier): use max blocks --- pkg/internal/itest/http_fetch_test.go | 20 ++++++++++++++++++++ pkg/retriever/httpretriever.go | 1 + pkg/verifiedcar/verifiedcar.go | 8 ++++++++ pkg/verifiedcar/verifiedcar_test.go | 16 ++++++++++++++++ 4 files changed, 45 insertions(+) diff --git a/pkg/internal/itest/http_fetch_test.go b/pkg/internal/itest/http_fetch_test.go index f6b9316d..d5014a08 100644 --- a/pkg/internal/itest/http_fetch_test.go +++ b/pkg/internal/itest/http_fetch_test.go @@ -204,6 +204,26 @@ func TestHttpFetch(t *testing.T) { validateCarBody(t, body, srcData.Root, wantCids, true) }}, }, + { + name: "http max block limit", + httpRemotes: 1, + modifyHttpConfig: func(cfg httpserver.HttpServerConfig) httpserver.HttpServerConfig { + cfg.MaxBlocksPerRequest = 3 + return cfg + }, + generate: func(t *testing.T, rndReader io.Reader, remotes []testpeer.TestPeer) []unixfs.DirEntry { + return []unixfs.DirEntry{unixfs.GenerateFile(t, remotes[0].LinkSystem, rndReader, 4<<20)} + }, + validateBodies: []bodyValidator{func(t *testing.T, srcData unixfs.DirEntry, body []byte) { + // 3 blocks max, start at the root and then two blocks into the sharded data + wantCids := []cid.Cid{ + srcData.Root, + srcData.SelfCids[0], + srcData.SelfCids[1], + } + validateCarBody(t, body, srcData.Root, wantCids, true) + }}, + }, { // dag-scope entity fetch should get the same DAG as full for a plain file name: "graphsync large sharded file, dag-scope entity", diff --git a/pkg/retriever/httpretriever.go b/pkg/retriever/httpretriever.go index e1113a37..f942a576 100644 --- a/pkg/retriever/httpretriever.go +++ b/pkg/retriever/httpretriever.go @@ -117,6 +117,7 @@ func (ph *ProtocolHttp) Retrieve( Root: retrieval.request.Cid, Selector: retrieval.request.GetSelector(), AllowDuplicatesIn: true, + MaxBlocks: retrieval.request.MaxBlocks, } blockCount, byteCount, err := cfg.VerifyCar(ctx, rdr, retrieval.request.LinkSystem) diff --git a/pkg/verifiedcar/verifiedcar.go b/pkg/verifiedcar/verifiedcar.go index 1c002959..cd1a89bf 100644 --- a/pkg/verifiedcar/verifiedcar.go +++ b/pkg/verifiedcar/verifiedcar.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "io" + "math" "github.com/ipfs/go-cid" format "github.com/ipfs/go-ipld-format" @@ -39,6 +40,7 @@ type Config struct { Selector datamodel.Node // The selector to execute, starting at the provided Root, to verify the contents of the CAR AllowDuplicatesIn bool // Handles whether the incoming stream has duplicates WriteDuplicatesOut bool // Handles whether duplicates should be written a second time as blocks + MaxBlocks uint64 // set a budget for the traversal } func visitNoop(p traversal.Progress, n datamodel.Node, r traversal.VisitReason) error { return nil } @@ -106,6 +108,12 @@ func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys l LinkTargetNodePrototypeChooser: protoChooser, }, } + if cfg.MaxBlocks > 0 { + progress.Budget = &traversal.Budget{ + LinkBudget: int64(cfg.MaxBlocks) - 1, // first block is already loaded + NodeBudget: math.MaxInt64, + } + } lc := linking.LinkContext{Ctx: ctx} lnk := cidlink.Link{Cid: cfg.Root} proto, err := protoChooser(lnk, lc) diff --git a/pkg/verifiedcar/verifiedcar_test.go b/pkg/verifiedcar/verifiedcar_test.go index 3fe79dfd..45cf973d 100644 --- a/pkg/verifiedcar/verifiedcar_test.go +++ b/pkg/verifiedcar/verifiedcar_test.go @@ -23,6 +23,7 @@ import ( cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/node/basicnode" "github.com/ipld/go-ipld-prime/storage/memstore" + "github.com/ipld/go-ipld-prime/traversal" selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" "github.com/stretchr/testify/require" ) @@ -205,6 +206,21 @@ func TestVerifiedCar(t *testing.T) { Selector: allSelector, }, }, + { + name: "carv1 over budget errors", + blocks: consumedBlocks(allBlocks), + roots: []cid.Cid{root1}, + err: (&traversal.ErrBudgetExceeded{ + BudgetKind: "link", + Path: datamodel.ParsePath("Parents/0/Parents/0/Parents/0"), + Link: tbc1.LinkTipIndex(3), + }).Error(), + cfg: verifiedcar.Config{ + Root: root1, + Selector: allSelector, + MaxBlocks: 3, + }, + }, { name: "unixfs: large sharded file", blocks: consumedBlocks(unixfsFileBlocks), From 05af04b68682bf0dde506957fc420b965e59705c Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Wed, 24 May 2023 18:34:00 -0700 Subject: [PATCH 09/24] fix(http): lower log output to info --- pkg/server/http/ipfs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/server/http/ipfs.go b/pkg/server/http/ipfs.go index 8311078a..1e318818 100644 --- a/pkg/server/http/ipfs.go +++ b/pkg/server/http/ipfs.go @@ -235,7 +235,7 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response carStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), tempStore) defer func() { if err := carWriter.Close(); err != nil { - logger.Errorf("error closing car writer: %s", err) + logger.Infof("error closing car writer: %s", err) } if err := carStore.Close(); err != nil { logger.Errorf("error closing temp store: %s", err) From 2aace5b312fda0d35c39a12c905988d9af876046 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 25 May 2023 06:56:17 -0700 Subject: [PATCH 10/24] refactor(http): move car closing to after http --- pkg/server/http/ipfs.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/server/http/ipfs.go b/pkg/server/http/ipfs.go index 1e318818..1103218a 100644 --- a/pkg/server/http/ipfs.go +++ b/pkg/server/http/ipfs.go @@ -234,9 +234,6 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response } carStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), tempStore) defer func() { - if err := carWriter.Close(); err != nil { - logger.Infof("error closing car writer: %s", err) - } if err := carStore.Close(); err != nil { logger.Errorf("error closing temp store: %s", err) } @@ -318,6 +315,9 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response metric := header.NewMetric(string(re.Phase())) metric.Duration = re.Time().Sub(re.PhaseStartTime()) }) + if cerr := carWriter.Close(); cerr != nil { + logger.Infof("error closing car writer: %s", cerr) + } if err != nil { select { case <-bytesWritten: From 6e8c1e48f7d2724a3651bc062fbee755ebc7eeeb Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 25 May 2023 07:17:24 -0700 Subject: [PATCH 11/24] refactor(itest): add log --- pkg/internal/itest/http_fetch_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/internal/itest/http_fetch_test.go b/pkg/internal/itest/http_fetch_test.go index d5014a08..cf441d0d 100644 --- a/pkg/internal/itest/http_fetch_test.go +++ b/pkg/internal/itest/http_fetch_test.go @@ -847,6 +847,7 @@ func TestHttpFetch(t *testing.T) { req.NoError(err) if DEBUG_DATA { + t.Logf("Creating CAR %s in temp dir", fmt.Sprintf("%s_received%d.car", testCase.name, i)) dstf, err := os.CreateTemp("", fmt.Sprintf("%s_received%d.car", testCase.name, i)) req.NoError(err) t.Logf("Writing received data to CAR @ %s", dstf.Name()) From 6f4e2d8835d2de2976d5ebd49ed9569aa5c0d139 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 25 May 2023 07:54:31 -0700 Subject: [PATCH 12/24] fix(testpeer): remove superfluous write --- pkg/internal/itest/testpeer/generator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/internal/itest/testpeer/generator.go b/pkg/internal/itest/testpeer/generator.go index eef3cc24..ef878296 100644 --- a/pkg/internal/itest/testpeer/generator.go +++ b/pkg/internal/itest/testpeer/generator.go @@ -451,7 +451,7 @@ func MockIpfsHandler(ctx context.Context, lsys linking.LinkSystem) func(http.Res err = progress.WalkAdv(rootNode, sel, visitNoop) if err != nil { - http.Error(res, fmt.Sprintf("Failed to traverse from root node: %v", err), http.StatusInternalServerError) + // if we loaded the first block, we can't write headers any more return } } From 8984b60785a97821090683b2a538a74cfabc97c8 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 25 May 2023 08:30:54 -0700 Subject: [PATCH 13/24] style(http): add comment --- pkg/server/http/ipfs.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/server/http/ipfs.go b/pkg/server/http/ipfs.go index 1103218a..a1d08ede 100644 --- a/pkg/server/http/ipfs.go +++ b/pkg/server/http/ipfs.go @@ -315,9 +315,12 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response metric := header.NewMetric(string(re.Phase())) metric.Duration = re.Time().Sub(re.PhaseStartTime()) }) + + // force all blocks to flush if cerr := carWriter.Close(); cerr != nil { logger.Infof("error closing car writer: %s", cerr) } + if err != nil { select { case <-bytesWritten: From 9d20893883811340c5c4e2d38d99236f1abfef44 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 25 May 2023 08:32:29 -0700 Subject: [PATCH 14/24] fix(itest): fix merged diff --- pkg/internal/itest/http_fetch_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/internal/itest/http_fetch_test.go b/pkg/internal/itest/http_fetch_test.go index cf441d0d..51f72f80 100644 --- a/pkg/internal/itest/http_fetch_test.go +++ b/pkg/internal/itest/http_fetch_test.go @@ -205,8 +205,9 @@ func TestHttpFetch(t *testing.T) { }}, }, { - name: "http max block limit", - httpRemotes: 1, + name: "http max block limit", + httpRemotes: 1, + expectUncleanEnd: true, modifyHttpConfig: func(cfg httpserver.HttpServerConfig) httpserver.HttpServerConfig { cfg.MaxBlocksPerRequest = 3 return cfg From 041c644de5abe5a2c10357c10fc92ef1f94d7ec9 Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Thu, 25 May 2023 08:36:01 -0700 Subject: [PATCH 15/24] Update pkg/server/http/ipfs.go Co-authored-by: Rod Vagg --- pkg/server/http/ipfs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/server/http/ipfs.go b/pkg/server/http/ipfs.go index a1d08ede..ddfc593c 100644 --- a/pkg/server/http/ipfs.go +++ b/pkg/server/http/ipfs.go @@ -56,7 +56,7 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response if typeParts[0] == "*/*" || typeParts[0] == "application/*" || typeParts[0] == "application/vnd.ipld.car" { validAccept = true if typeParts[0] == "application/vnd.ipld.car" { - // parse 412 car attributes + // parse https://github.com/ipfs/specs/pull/412 car attributes for _, nextPart := range typeParts[1:] { pair := strings.Split(nextPart, "=") if len(pair) == 2 { From 69d1b08da473ddc3742c4805aa6147c0f2dd804c Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Thu, 25 May 2023 08:36:53 -0700 Subject: [PATCH 16/24] Update pkg/server/http/ipfs.go Co-authored-by: Rod Vagg --- pkg/server/http/ipfs.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/server/http/ipfs.go b/pkg/server/http/ipfs.go index ddfc593c..3b7a7a26 100644 --- a/pkg/server/http/ipfs.go +++ b/pkg/server/http/ipfs.go @@ -80,7 +80,13 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response validAccept = false } case "order": - // for now, dfs traversal satisfies all known orders + switch value { + case "dfs": + case "unk": + default: + // we only do dfs, which also satisfies unk, future extensions are not yet supported + validAccept = false + } default: // ignore others } From cf2585a09d30aba0ee64950b72481c8f3d97e931 Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Thu, 25 May 2023 08:37:05 -0700 Subject: [PATCH 17/24] Update pkg/storage/duplicateaddercar.go Co-authored-by: Rod Vagg --- pkg/storage/duplicateaddercar.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/storage/duplicateaddercar.go b/pkg/storage/duplicateaddercar.go index ec1d175e..212f4297 100644 --- a/pkg/storage/duplicateaddercar.go +++ b/pkg/storage/duplicateaddercar.go @@ -73,7 +73,6 @@ func (da *DuplicateAdderCar) addDupes() { // to serve lsys.SetReadStorage(da.store) lsys.TrustedStorage = true - unixfsnode.AddUnixFSReificationToLinkSystem(&lsys) // run the verification _, _, err = cfg.VerifyBlockStream(da.ctx, da.blockStream, lsys) From af14b93337f7f46cd4d22bffda5846666d5e6264 Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Thu, 25 May 2023 08:37:16 -0700 Subject: [PATCH 18/24] Update pkg/storage/duplicateaddercar_test.go Co-authored-by: Rod Vagg --- pkg/storage/duplicateaddercar_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/duplicateaddercar_test.go b/pkg/storage/duplicateaddercar_test.go index 3518a306..45c2c880 100644 --- a/pkg/storage/duplicateaddercar_test.go +++ b/pkg/storage/duplicateaddercar_test.go @@ -41,7 +41,7 @@ func TestDuplicateAdderCar(t *testing.T) { cachingTempStore.Put(ctx, unixfsFileWithDupsBlocks[0].Cid().KeyString(), unixfsFileWithDupsBlocks[0].RawData()) // write the duped block cachingTempStore.Put(ctx, unixfsFileWithDupsBlocks[1].Cid().KeyString(), unixfsFileWithDupsBlocks[1].RawData()) - // write the last block + // write the last block, which will be unique because of a different length cachingTempStore.Put(ctx, unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1].Cid().KeyString(), unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1].RawData()) err := carWriter.Close() require.NoError(t, err) From d15734d3d37bcd0ba3fc70d8c5fbe5a7fc887aab Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Thu, 25 May 2023 08:37:26 -0700 Subject: [PATCH 19/24] Update pkg/storage/duplicateaddercar_test.go Co-authored-by: Rod Vagg --- pkg/storage/duplicateaddercar_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/duplicateaddercar_test.go b/pkg/storage/duplicateaddercar_test.go index 45c2c880..83a8a22c 100644 --- a/pkg/storage/duplicateaddercar_test.go +++ b/pkg/storage/duplicateaddercar_test.go @@ -37,7 +37,7 @@ func TestDuplicateAdderCar(t *testing.T) { carWriter := storage.NewDuplicateAdderCarForStream(ctx, unixfsFileWithDups.Root, "", types.DagScopeAll, store, buf) cachingTempStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), store) - // write the root block + // write the root block, containing sharding metadata cachingTempStore.Put(ctx, unixfsFileWithDupsBlocks[0].Cid().KeyString(), unixfsFileWithDupsBlocks[0].RawData()) // write the duped block cachingTempStore.Put(ctx, unixfsFileWithDupsBlocks[1].Cid().KeyString(), unixfsFileWithDupsBlocks[1].RawData()) From ab14be65ca97f6cee6d2df80c38ef0799d336808 Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Thu, 25 May 2023 08:37:35 -0700 Subject: [PATCH 20/24] Update pkg/storage/duplicateaddercar_test.go Co-authored-by: Rod Vagg --- pkg/storage/duplicateaddercar_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/storage/duplicateaddercar_test.go b/pkg/storage/duplicateaddercar_test.go index 83a8a22c..ed213729 100644 --- a/pkg/storage/duplicateaddercar_test.go +++ b/pkg/storage/duplicateaddercar_test.go @@ -39,7 +39,7 @@ func TestDuplicateAdderCar(t *testing.T) { // write the root block, containing sharding metadata cachingTempStore.Put(ctx, unixfsFileWithDupsBlocks[0].Cid().KeyString(), unixfsFileWithDupsBlocks[0].RawData()) - // write the duped block + // write the duped block that the root points to for all but the last block cachingTempStore.Put(ctx, unixfsFileWithDupsBlocks[1].Cid().KeyString(), unixfsFileWithDupsBlocks[1].RawData()) // write the last block, which will be unique because of a different length cachingTempStore.Put(ctx, unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1].Cid().KeyString(), unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1].RawData()) From 2a33b36f3badf6987a2f8bd96f0a48c1d1f660b2 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 25 May 2023 08:56:39 -0700 Subject: [PATCH 21/24] refactor(verifiedcar): rename allow dupes to expect dupes --- pkg/internal/itest/http_fetch_test.go | 6 +++--- pkg/retriever/httpretriever.go | 8 ++++---- pkg/storage/duplicateaddercar.go | 1 - pkg/verifiedcar/verifiedcar.go | 4 ++-- pkg/verifiedcar/verifiedcar_test.go | 8 ++++---- 5 files changed, 13 insertions(+), 14 deletions(-) diff --git a/pkg/internal/itest/http_fetch_test.go b/pkg/internal/itest/http_fetch_test.go index 51f72f80..3786f246 100644 --- a/pkg/internal/itest/http_fetch_test.go +++ b/pkg/internal/itest/http_fetch_test.go @@ -689,9 +689,9 @@ func TestHttpFetch(t *testing.T) { lsys.SetReadStorage(store) lsys.SetWriteStorage(store) _, _, err := verifiedcar.Config{ - Root: srcData.Root, - Selector: selectorparse.CommonSelector_ExploreAllRecursively, - AllowDuplicatesIn: true, + Root: srcData.Root, + Selector: selectorparse.CommonSelector_ExploreAllRecursively, + ExpectDuplicatesIn: true, }.VerifyCar(context.Background(), bytes.NewReader(body), lsys) require.NoError(t, err) }}, diff --git a/pkg/retriever/httpretriever.go b/pkg/retriever/httpretriever.go index f942a576..ed716bed 100644 --- a/pkg/retriever/httpretriever.go +++ b/pkg/retriever/httpretriever.go @@ -114,10 +114,10 @@ func (ph *ProtocolHttp) Retrieve( shared.sendEvent(events.FirstByte(retrieval.Clock.Now(), retrieval.request.RetrievalID, phaseStartTime, candidate)) }) cfg := verifiedcar.Config{ - Root: retrieval.request.Cid, - Selector: retrieval.request.GetSelector(), - AllowDuplicatesIn: true, - MaxBlocks: retrieval.request.MaxBlocks, + Root: retrieval.request.Cid, + Selector: retrieval.request.GetSelector(), + ExpectDuplicatesIn: true, + MaxBlocks: retrieval.request.MaxBlocks, } blockCount, byteCount, err := cfg.VerifyCar(ctx, rdr, retrieval.request.LinkSystem) diff --git a/pkg/storage/duplicateaddercar.go b/pkg/storage/duplicateaddercar.go index 212f4297..2a0f0dbd 100644 --- a/pkg/storage/duplicateaddercar.go +++ b/pkg/storage/duplicateaddercar.go @@ -12,7 +12,6 @@ import ( "github.com/filecoin-project/lassie/pkg/verifiedcar" "github.com/ipfs/go-cid" "github.com/ipfs/go-libipfs/blocks" - "github.com/ipfs/go-unixfsnode" carv2 "github.com/ipld/go-car/v2" "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/linking" diff --git a/pkg/verifiedcar/verifiedcar.go b/pkg/verifiedcar/verifiedcar.go index cd1a89bf..f50dbc6d 100644 --- a/pkg/verifiedcar/verifiedcar.go +++ b/pkg/verifiedcar/verifiedcar.go @@ -38,7 +38,7 @@ type Config struct { Root cid.Cid // The single root we expect to appear in the CAR and that we use to run our traversal against AllowCARv2 bool // If true, allow CARv2 files to be received, otherwise strictly only allow CARv1 Selector datamodel.Node // The selector to execute, starting at the provided Root, to verify the contents of the CAR - AllowDuplicatesIn bool // Handles whether the incoming stream has duplicates + ExpectDuplicatesIn bool // Handles whether the incoming stream has duplicates WriteDuplicatesOut bool // Handles whether duplicates should be written a second time as blocks MaxBlocks uint64 // set a budget for the traversal } @@ -146,7 +146,7 @@ func (cfg *Config) nextBlockReadOpener(ctx context.Context, cr *carReader, bt *w var data []byte var err error if _, ok := seen[cid]; ok { - if cfg.AllowDuplicatesIn { + if cfg.ExpectDuplicatesIn { // duplicate block, but in this case we are expecting the stream to have it data, err = cr.readNextBlock(ctx, cid) if err != nil { diff --git a/pkg/verifiedcar/verifiedcar_test.go b/pkg/verifiedcar/verifiedcar_test.go index 45cf973d..b1027683 100644 --- a/pkg/verifiedcar/verifiedcar_test.go +++ b/pkg/verifiedcar/verifiedcar_test.go @@ -450,9 +450,9 @@ func TestVerifiedCar(t *testing.T) { blocks: append(append(consumedBlocks(unixfsFileWithDupsBlocks[:2]), skippedBlocks(unixfsFileWithDupsBlocks[2:len(unixfsFileWithDupsBlocks)-1])...), consumedBlocks(unixfsFileWithDupsBlocks[len(unixfsFileWithDupsBlocks)-1:])...), roots: []cid.Cid{unixfsFileWithDups.Root}, cfg: verifiedcar.Config{ - Root: unixfsFileWithDups.Root, - Selector: allSelector, - AllowDuplicatesIn: true, + Root: unixfsFileWithDups.Root, + Selector: allSelector, + ExpectDuplicatesIn: true, }, incomingHasDups: true, }, @@ -474,7 +474,7 @@ func TestVerifiedCar(t *testing.T) { Root: unixfsFileWithDups.Root, Selector: allSelector, WriteDuplicatesOut: true, - AllowDuplicatesIn: true, + ExpectDuplicatesIn: true, }, incomingHasDups: true, }, From 325e9c21f46d6b31f39b0578da6baaad6a47b24d Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 25 May 2023 09:09:37 -0700 Subject: [PATCH 22/24] style(lint): fix lint errors --- pkg/server/http/ipfs.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/server/http/ipfs.go b/pkg/server/http/ipfs.go index 3b7a7a26..2fab1014 100644 --- a/pkg/server/http/ipfs.go +++ b/pkg/server/http/ipfs.go @@ -81,12 +81,12 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response } case "order": switch value { - case "dfs": - case "unk": - default: - // we only do dfs, which also satisfies unk, future extensions are not yet supported - validAccept = false - } + case "dfs": + case "unk": + default: + // we only do dfs, which also satisfies unk, future extensions are not yet supported + validAccept = false + } default: // ignore others } From a4974af86a20c20ef66cb09d374d0cbee0667932 Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 25 May 2023 09:13:56 -0700 Subject: [PATCH 23/24] fix(verifiedcar): move interface to top --- pkg/verifiedcar/verifiedcar.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/verifiedcar/verifiedcar.go b/pkg/verifiedcar/verifiedcar.go index f50dbc6d..067edde4 100644 --- a/pkg/verifiedcar/verifiedcar.go +++ b/pkg/verifiedcar/verifiedcar.go @@ -32,6 +32,10 @@ var ( ErrMissingBlock = errors.New("missing block in CAR") ) +type BlockReader interface { + Next() (blocks.Block, error) +} + var protoChooser = dagpb.AddSupportToChooser(basicnode.Chooser) type Config struct { @@ -80,10 +84,6 @@ func (cfg Config) VerifyCar(ctx context.Context, rdr io.Reader, lsys linking.Lin return cfg.VerifyBlockStream(ctx, cbr, lsys) } -type BlockReader interface { - Next() (blocks.Block, error) -} - func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys linking.LinkSystem) (uint64, uint64, error) { sel, err := selector.CompileSelector(cfg.Selector) From c7fd62befbc9ae3fc1b285591ea9907965f15f5c Mon Sep 17 00:00:00 2001 From: hannahhoward Date: Thu, 25 May 2023 09:28:14 -0700 Subject: [PATCH 24/24] fix(storage): remove uncertainty on car close --- pkg/storage/duplicateaddercar.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/pkg/storage/duplicateaddercar.go b/pkg/storage/duplicateaddercar.go index 2a0f0dbd..8c58a337 100644 --- a/pkg/storage/duplicateaddercar.go +++ b/pkg/storage/duplicateaddercar.go @@ -51,10 +51,7 @@ func NewDuplicateAdderCarForStream(ctx context.Context, root cid.Cid, path strin func (da *DuplicateAdderCar) addDupes() { var err error defer func() { - select { - case <-da.ctx.Done(): - case da.streamCompletion <- err: - } + da.streamCompletion <- err }() sel := types.PathScopeSelector(da.path, da.scope) @@ -114,12 +111,7 @@ func (da *DuplicateAdderCar) Close() error { if streamCompletion == nil { return nil } - select { - case <-da.ctx.Done(): - return da.ctx.Err() - case err := <-streamCompletion: - return err - } + return <-streamCompletion } type blockStream struct {