From 7c07e113d644a1efc32b7fd0c268f5f892256ce9 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Wed, 21 Feb 2024 18:19:44 +0000 Subject: [PATCH] feat: stream car file bytes from @helia/car (#444) To better support streaming CAR files with a less confusing API, add a method to `@helia/car` that takes root CIDs and returns an AsyncGenerator that yields CAR file bytes. --- packages/car/src/index.ts | 42 +++++++++++++++++++++-- packages/car/test/fixtures/dag-walkers.ts | 27 +++++++++++++++ packages/car/test/index.spec.ts | 30 ++-------------- packages/car/test/stream.spec.ts | 37 ++++++++++++++++++++ 4 files changed, 106 insertions(+), 30 deletions(-) create mode 100644 packages/car/test/fixtures/dag-walkers.ts create mode 100644 packages/car/test/stream.spec.ts diff --git a/packages/car/src/index.ts b/packages/car/src/index.ts index e2dd8b18..b97acff8 100644 --- a/packages/car/src/index.ts +++ b/packages/car/src/index.ts @@ -58,13 +58,14 @@ * ``` */ +import { CarWriter } from '@ipld/car' import drain from 'it-drain' import map from 'it-map' import defer from 'p-defer' import PQueue from 'p-queue' import type { DAGWalker } from '@helia/interface' import type { GetBlockProgressEvents, PutManyBlocksProgressEvents } from '@helia/interface/blocks' -import type { CarReader, CarWriter } from '@ipld/car' +import type { CarReader } from '@ipld/car' import type { AbortOptions } from '@libp2p/interfaces' import type { Blockstore } from 'interface-blockstore' import type { CID } from 'multiformats/cid' @@ -129,6 +130,28 @@ export interface Car { * ``` */ export(root: CID | CID[], writer: Pick, options?: AbortOptions & ProgressOptions): Promise + + /** + * Returns an AsyncGenerator that yields CAR file bytes. + * + * @example + * + * ```typescript + * import { createHelia } from 'helia' + * import { car } from '@helia/car + * import { CID } from 'multiformats/cid' + * + * const helia = await createHelia() + * const cid = CID.parse('QmFoo...') + * + * const c = car(helia) + * + * for (const buf of c.stream(cid)) { + * // store or send `buf` somewhere + * } + * ``` + */ + stream(root: CID | CID[], options?: AbortOptions & ProgressOptions): AsyncGenerator } const DAG_WALK_QUEUE_CONCURRENCY = 1 @@ -148,7 +171,7 @@ class DefaultCar implements Car { } async export (root: CID | CID[], writer: Pick, options?: AbortOptions & ProgressOptions): Promise { - const deferred = defer() + const deferred = defer() const roots = Array.isArray(root) ? root : [root] // use a queue to walk the DAG instead of recursion so we can traverse very large DAGs @@ -159,6 +182,7 @@ class DefaultCar implements Car { deferred.resolve() }) queue.on('error', (err) => { + queue.clear() deferred.reject(err) }) @@ -168,6 +192,7 @@ class DefaultCar implements Car { await writer.put({ cid, bytes }) }, options) }) + .catch(() => {}) } // wait for the writer to end @@ -178,6 +203,19 @@ class DefaultCar implements Car { } } + async * stream (root: CID | CID[], options?: AbortOptions & ProgressOptions): AsyncGenerator { + const { writer, out } = CarWriter.create(root) + + // has to be done async so we write to `writer` and read from `out` at the + // same time + this.export(root, writer, options) + .catch(() => {}) + + for await (const buf of out) { + yield buf + } + } + /** * Walk the DAG behind the passed CID, ensure all blocks are present in the blockstore * and update the pin count for them diff --git a/packages/car/test/fixtures/dag-walkers.ts b/packages/car/test/fixtures/dag-walkers.ts new file mode 100644 index 00000000..74faf7dc --- /dev/null +++ b/packages/car/test/fixtures/dag-walkers.ts @@ -0,0 +1,27 @@ +import * as dagPb from '@ipld/dag-pb' +import * as raw from 'multiformats/codecs/raw' +import type { DAGWalker } from '@helia/interface' + +/** + * Dag walker for dag-pb CIDs + */ +const dagPbWalker: DAGWalker = { + codec: dagPb.code, + * walk (block) { + const node = dagPb.decode(block) + + yield * node.Links.map(l => l.Hash) + } +} + +const rawWalker: DAGWalker = { + codec: raw.code, + * walk () { + // no embedded CIDs in a raw block + } +} + +export const dagWalkers = { + [dagPb.code]: dagPbWalker, + [raw.code]: rawWalker +} diff --git a/packages/car/test/index.spec.ts b/packages/car/test/index.spec.ts index 3f93e200..3b371fe2 100644 --- a/packages/car/test/index.spec.ts +++ b/packages/car/test/index.spec.ts @@ -2,49 +2,23 @@ import { type UnixFS, unixfs } from '@helia/unixfs' import { CarReader } from '@ipld/car' -import * as dagPb from '@ipld/dag-pb' import { expect } from 'aegir/chai' import { MemoryBlockstore } from 'blockstore-core' import { fixedSize } from 'ipfs-unixfs-importer/chunker' import toBuffer from 'it-to-buffer' -import * as raw from 'multiformats/codecs/raw' import { car, type Car } from '../src/index.js' +import { dagWalkers } from './fixtures/dag-walkers.js' import { largeFile, smallFile } from './fixtures/files.js' import { memoryCarWriter } from './fixtures/memory-car.js' -import type { DAGWalker } from '@helia/interface' import type { Blockstore } from 'interface-blockstore' -/** - * Dag walker for dag-pb CIDs - */ -const dagPbWalker: DAGWalker = { - codec: dagPb.code, - * walk (block) { - const node = dagPb.decode(block) - - yield * node.Links.map(l => l.Hash) - } -} - -const rawWalker: DAGWalker = { - codec: raw.code, - * walk () { - // no embedded CIDs in a raw block - } -} - -describe('import', () => { +describe('import/export car file', () => { let blockstore: Blockstore let c: Car let u: UnixFS - let dagWalkers: Record beforeEach(async () => { blockstore = new MemoryBlockstore() - dagWalkers = { - [dagPb.code]: dagPbWalker, - [raw.code]: rawWalker - } c = car({ blockstore, dagWalkers }) u = unixfs({ blockstore }) diff --git a/packages/car/test/stream.spec.ts b/packages/car/test/stream.spec.ts new file mode 100644 index 00000000..51563817 --- /dev/null +++ b/packages/car/test/stream.spec.ts @@ -0,0 +1,37 @@ +/* eslint-env mocha */ + +import { type UnixFS, unixfs } from '@helia/unixfs' +import { expect } from 'aegir/chai' +import { MemoryBlockstore } from 'blockstore-core' +import toBuffer from 'it-to-buffer' +import { car, type Car } from '../src/index.js' +import { dagWalkers } from './fixtures/dag-walkers.js' +import { smallFile } from './fixtures/files.js' +import { memoryCarWriter } from './fixtures/memory-car.js' +import type { Blockstore } from 'interface-blockstore' + +describe('stream car file', () => { + let blockstore: Blockstore + let c: Car + let u: UnixFS + + beforeEach(async () => { + blockstore = new MemoryBlockstore() + + c = car({ blockstore, dagWalkers }) + u = unixfs({ blockstore }) + }) + + it('streams car file', async () => { + const cid = await u.addBytes(smallFile) + + const writer = memoryCarWriter(cid) + await c.export(cid, writer) + + const bytes = await writer.bytes() + + const streamed = await toBuffer(c.stream(cid)) + + expect(bytes).to.equalBytes(streamed) + }) +})