Skip to content

Commit

Permalink
feat: stream car file bytes from @helia/car (#444)
Browse files Browse the repository at this point in the history
To better support streaming CAR files with a less confusing API,
add a method to `@helia/car` that takes root CIDs and returns an
AsyncGenerator that yields CAR file bytes.
  • Loading branch information
achingbrain committed Feb 21, 2024
1 parent 8db7792 commit 7c07e11
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 30 deletions.
42 changes: 40 additions & 2 deletions packages/car/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,14 @@
* ```
*/

import { CarWriter } from '@ipld/car'
import drain from 'it-drain'
import map from 'it-map'
import defer from 'p-defer'
import PQueue from 'p-queue'
import type { DAGWalker } from '@helia/interface'
import type { GetBlockProgressEvents, PutManyBlocksProgressEvents } from '@helia/interface/blocks'
import type { CarReader, CarWriter } from '@ipld/car'
import type { CarReader } from '@ipld/car'
import type { AbortOptions } from '@libp2p/interfaces'
import type { Blockstore } from 'interface-blockstore'
import type { CID } from 'multiformats/cid'
Expand Down Expand Up @@ -129,6 +130,28 @@ export interface Car {
* ```
*/
export(root: CID | CID[], writer: Pick<CarWriter, 'put' | 'close'>, options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): Promise<void>

/**
* Returns an AsyncGenerator that yields CAR file bytes.
*
* @example
*
* ```typescript
* import { createHelia } from 'helia'
* import { car } from '@helia/car
* import { CID } from 'multiformats/cid'
*
* const helia = await createHelia()
* const cid = CID.parse('QmFoo...')
*
* const c = car(helia)
*
* for (const buf of c.stream(cid)) {
* // store or send `buf` somewhere
* }
* ```
*/
stream(root: CID | CID[], options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): AsyncGenerator<Uint8Array>
}

const DAG_WALK_QUEUE_CONCURRENCY = 1
Expand All @@ -148,7 +171,7 @@ class DefaultCar implements Car {
}

async export (root: CID | CID[], writer: Pick<CarWriter, 'put' | 'close'>, options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): Promise<void> {
const deferred = defer()
const deferred = defer<Error | undefined>()
const roots = Array.isArray(root) ? root : [root]

// use a queue to walk the DAG instead of recursion so we can traverse very large DAGs
Expand All @@ -159,6 +182,7 @@ class DefaultCar implements Car {
deferred.resolve()
})
queue.on('error', (err) => {
queue.clear()
deferred.reject(err)
})

Expand All @@ -168,6 +192,7 @@ class DefaultCar implements Car {
await writer.put({ cid, bytes })
}, options)
})
.catch(() => {})
}

// wait for the writer to end
Expand All @@ -178,6 +203,19 @@ class DefaultCar implements Car {
}
}

async * stream (root: CID | CID[], options?: AbortOptions & ProgressOptions<GetBlockProgressEvents>): AsyncGenerator<Uint8Array, void, undefined> {
const { writer, out } = CarWriter.create(root)

// has to be done async so we write to `writer` and read from `out` at the
// same time
this.export(root, writer, options)
.catch(() => {})

for await (const buf of out) {
yield buf
}
}

/**
* Walk the DAG behind the passed CID, ensure all blocks are present in the blockstore
* and update the pin count for them
Expand Down
27 changes: 27 additions & 0 deletions packages/car/test/fixtures/dag-walkers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import * as dagPb from '@ipld/dag-pb'
import * as raw from 'multiformats/codecs/raw'
import type { DAGWalker } from '@helia/interface'

/**
* Dag walker for dag-pb CIDs
*/
const dagPbWalker: DAGWalker = {
codec: dagPb.code,
* walk (block) {
const node = dagPb.decode(block)

yield * node.Links.map(l => l.Hash)
}
}

const rawWalker: DAGWalker = {
codec: raw.code,
* walk () {
// no embedded CIDs in a raw block
}
}

export const dagWalkers = {
[dagPb.code]: dagPbWalker,
[raw.code]: rawWalker
}
30 changes: 2 additions & 28 deletions packages/car/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,23 @@

import { type UnixFS, unixfs } from '@helia/unixfs'
import { CarReader } from '@ipld/car'
import * as dagPb from '@ipld/dag-pb'
import { expect } from 'aegir/chai'
import { MemoryBlockstore } from 'blockstore-core'
import { fixedSize } from 'ipfs-unixfs-importer/chunker'
import toBuffer from 'it-to-buffer'
import * as raw from 'multiformats/codecs/raw'
import { car, type Car } from '../src/index.js'
import { dagWalkers } from './fixtures/dag-walkers.js'
import { largeFile, smallFile } from './fixtures/files.js'
import { memoryCarWriter } from './fixtures/memory-car.js'
import type { DAGWalker } from '@helia/interface'
import type { Blockstore } from 'interface-blockstore'

/**
* Dag walker for dag-pb CIDs
*/
const dagPbWalker: DAGWalker = {
codec: dagPb.code,
* walk (block) {
const node = dagPb.decode(block)

yield * node.Links.map(l => l.Hash)
}
}

const rawWalker: DAGWalker = {
codec: raw.code,
* walk () {
// no embedded CIDs in a raw block
}
}

describe('import', () => {
describe('import/export car file', () => {
let blockstore: Blockstore
let c: Car
let u: UnixFS
let dagWalkers: Record<number, DAGWalker>

beforeEach(async () => {
blockstore = new MemoryBlockstore()
dagWalkers = {
[dagPb.code]: dagPbWalker,
[raw.code]: rawWalker
}

c = car({ blockstore, dagWalkers })
u = unixfs({ blockstore })
Expand Down
37 changes: 37 additions & 0 deletions packages/car/test/stream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/* eslint-env mocha */

import { type UnixFS, unixfs } from '@helia/unixfs'
import { expect } from 'aegir/chai'
import { MemoryBlockstore } from 'blockstore-core'
import toBuffer from 'it-to-buffer'
import { car, type Car } from '../src/index.js'
import { dagWalkers } from './fixtures/dag-walkers.js'
import { smallFile } from './fixtures/files.js'
import { memoryCarWriter } from './fixtures/memory-car.js'
import type { Blockstore } from 'interface-blockstore'

describe('stream car file', () => {
let blockstore: Blockstore
let c: Car
let u: UnixFS

beforeEach(async () => {
blockstore = new MemoryBlockstore()

c = car({ blockstore, dagWalkers })
u = unixfs({ blockstore })
})

it('streams car file', async () => {
const cid = await u.addBytes(smallFile)

const writer = memoryCarWriter(cid)
await c.export(cid, writer)

const bytes = await writer.bytes()

const streamed = await toBuffer(c.stream(cid))

expect(bytes).to.equalBytes(streamed)
})
})

0 comments on commit 7c07e11

Please sign in to comment.