Skip to content

Commit

Permalink
feat: require content-type parser to set content-type
Browse files Browse the repository at this point in the history
fixes #422
  • Loading branch information
SgtPooki committed Feb 7, 2024
1 parent 3851fe2 commit c4f3355
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 120 deletions.
10 changes: 9 additions & 1 deletion packages/verified-fetch/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@
import { trustlessGateway } from '@helia/block-brokers'
import { createHeliaHTTP } from '@helia/http'
import { delegatedHTTPRouting } from '@helia/routers'
import { VerifiedFetch as VerifiedFetchClass } from './verified-fetch.js'
import { VerifiedFetch as VerifiedFetchClass, type ContentTypeParser } from './verified-fetch.js'
import type { Helia } from '@helia/interface'
import type { IPNSRoutingEvents, ResolveDnsLinkProgressEvents, ResolveProgressEvents } from '@helia/ipns'
import type { GetEvents } from '@helia/unixfs'
Expand Down Expand Up @@ -262,8 +262,16 @@ export interface VerifiedFetch {
export interface CreateVerifiedFetchWithOptions {
gateways: string[]
routers?: string[]
/**
* A function to handle parsing content type from bytes. The function you provide will be passed the first set of
* bytes we receive from the network, and should return a string that will be used as the value for the `Content-Type`
* header in the response.
*/
contentTypeParser?: ContentTypeParser
}

export type { ContentTypeParser }

export type BubbledProgressEvents =
// unixfs
GetEvents |
Expand Down
55 changes: 0 additions & 55 deletions packages/verified-fetch/src/utils/get-content-type.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
import { CustomProgressEvent } from 'progress-events'
import { getContentType } from './get-content-type.js'
import type { VerifiedFetchInit } from '../index.js'
import type { ComponentLogger } from '@libp2p/interface'

/**
* Converts an async iterator of Uint8Array bytes to a stream and attempts to determine the content type of those bytes.
* Converts an async iterator of Uint8Array bytes to a stream and returns the first chunk of bytes
*/
export async function getStreamAndContentType (iterator: AsyncIterable<Uint8Array>, path: string, logger: ComponentLogger, options?: Pick<VerifiedFetchInit, 'onProgress'>): Promise<{ contentType: string, stream: ReadableStream<Uint8Array> }> {
const log = logger.forComponent('helia:verified-fetch:get-stream-and-content-type')
export async function getStreamFromAsyncIterable (iterator: AsyncIterable<Uint8Array>, path: string, logger: ComponentLogger, options?: Pick<VerifiedFetchInit, 'onProgress'>): Promise<{ stream: ReadableStream<Uint8Array>, firstChunk: Uint8Array }> {
const log = logger.forComponent('helia:verified-fetch:get-stream-from-async-iterable')
const reader = iterator[Symbol.asyncIterator]()
const { value, done } = await reader.next()
const { value: firstChunk, done } = await reader.next()

if (done === true) {
log.error('No content found for path', path)
throw new Error('No content found')
}

const contentType = await getContentType({ bytes: value, path })
const stream = new ReadableStream({
async start (controller) {
// the initial value is already available
options?.onProgress?.(new CustomProgressEvent<void>('verified-fetch:request:progress:chunk'))
controller.enqueue(value)
controller.enqueue(firstChunk)
},
async pull (controller) {
const { value, done } = await reader.next()
Expand All @@ -40,5 +38,8 @@ export async function getStreamAndContentType (iterator: AsyncIterable<Uint8Arra
}
})

return { contentType, stream }
return {
stream,
firstChunk
}
}
33 changes: 24 additions & 9 deletions packages/verified-fetch/src/verified-fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { code as dagPbCode } from '@ipld/dag-pb'
import { code as jsonCode } from 'multiformats/codecs/json'
import { decode, code as rawCode } from 'multiformats/codecs/raw'
import { CustomProgressEvent } from 'progress-events'
import { getStreamAndContentType } from './utils/get-stream-and-content-type.js'
import { getStreamFromAsyncIterable } from './utils/get-stream-from-async-iterable.js'
import { parseResource } from './utils/parse-resource.js'
import { walkPath, type PathWalkerFn } from './utils/walk-path.js'
import type { CIDDetail, Resource, VerifiedFetchInit as VerifiedFetchOptions } from './index.js'
Expand All @@ -29,12 +29,15 @@ interface VerifiedFetchComponents {
pathWalker?: PathWalkerFn
}

export interface ContentTypeParser {
(bytes: Uint8Array): Promise<string>
}

/**
* Potential future options for the VerifiedFetch constructor.
*/
// eslint-disable-next-line @typescript-eslint/no-empty-interface
interface VerifiedFetchInit {

contentTypeParser?: ContentTypeParser
}

interface FetchHandlerFunctionArg {
Expand Down Expand Up @@ -72,6 +75,7 @@ export class VerifiedFetch {
private readonly json: JSON
private readonly pathWalker: PathWalkerFn
private readonly log: Logger
private readonly contentTypeParser: ContentTypeParser | undefined

constructor ({ helia, ipns, unixfs, dagJson, json, dagCbor, pathWalker }: VerifiedFetchComponents, init?: VerifiedFetchInit) {
this.helia = helia
Expand All @@ -87,6 +91,7 @@ export class VerifiedFetch {
this.json = json ?? heliaJson(helia)
this.dagCbor = dagCbor ?? heliaDagCbor(helia)
this.pathWalker = pathWalker ?? walkPath
this.contentTypeParser = init?.contentTypeParser
this.log.trace('created VerifiedFetch instance')
}

Expand Down Expand Up @@ -133,13 +138,13 @@ export class VerifiedFetch {
private async handleDagCbor ({ cid, path, options }: FetchHandlerFunctionArg): Promise<Response> {
this.log.trace('fetching %c/%s', cid, path)
options?.onProgress?.(new CustomProgressEvent<CIDDetail>('verified-fetch:request:start', { cid: cid.toString(), path }))
const result = await this.dagCbor.get(cid, {
const result = await this.dagCbor.get<Uint8Array>(cid, {
signal: options?.signal,
onProgress: options?.onProgress
})
options?.onProgress?.(new CustomProgressEvent<CIDDetail>('verified-fetch:request:end', { cid: cid.toString(), path }))
const response = new Response(JSON.stringify(result), { status: 200 })
response.headers.set('content-type', 'application/json')
const response = new Response(result, { status: 200 })
await this.setContentType(result, response)
return response
}

Expand Down Expand Up @@ -179,11 +184,11 @@ export class VerifiedFetch {
options?.onProgress?.(new CustomProgressEvent<CIDDetail>('verified-fetch:request:end', { cid: resolvedCID.toString(), path: '' }))
this.log('got async iterator for %c/%s', cid, path)

const { contentType, stream } = await getStreamAndContentType(asyncIter, path ?? '', this.helia.logger, {
const { stream, firstChunk } = await getStreamFromAsyncIterable(asyncIter, path ?? '', this.helia.logger, {
onProgress: options?.onProgress
})
const response = new Response(stream, { status: 200 })
response.headers.set('content-type', contentType)
await this.setContentType(firstChunk, response)

return response
}
Expand All @@ -194,10 +199,20 @@ export class VerifiedFetch {
const result = await this.helia.blockstore.get(cid)
options?.onProgress?.(new CustomProgressEvent<CIDDetail>('verified-fetch:request:end', { cid: cid.toString(), path }))
const response = new Response(decode(result), { status: 200 })
response.headers.set('content-type', 'application/octet-stream')
await this.setContentType(result, response)
return response
}

private async setContentType (bytes: Uint8Array, response: Response): Promise<void> {
if (this.contentTypeParser != null) {
try {
response.headers.set('content-type', await this.contentTypeParser(bytes))
} catch (err) {
this.log.error('Error parsing content type', err)
}
}
}

/**
* Determines the format requested by the client, defaults to `null` if no format is requested.
*
Expand Down
34 changes: 0 additions & 34 deletions packages/verified-fetch/test/get-content-type.spec.ts

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { defaultLogger } from '@libp2p/logger'
import { expect } from 'aegir/chai'
import sinon from 'sinon'
import { getStreamAndContentType } from '../src/utils/get-stream-and-content-type.js'
import { getStreamFromAsyncIterable } from '../src/utils/get-stream-from-async-iterable.js'

describe('getStreamAndContentType', () => {
describe('getStreamFromAsyncIterable', () => {
let onProgressSpy: sinon.SinonSpy

beforeEach(() => {
Expand All @@ -12,23 +12,26 @@ describe('getStreamAndContentType', () => {

it('should throw an error if no content is found', async () => {
const iterator = (async function * () { })()
await expect(getStreamAndContentType(iterator, 'test', defaultLogger())).to.be.rejectedWith('No content found')
await expect(getStreamFromAsyncIterable(iterator, 'test', defaultLogger())).to.be.rejectedWith('No content found')
})

it('should return the correct content type and a readable stream', async () => {
const iterator = (async function * () { yield new TextEncoder().encode('Hello, world!') })()
const { contentType, stream } = await getStreamAndContentType(iterator, 'test.txt', defaultLogger(), { onProgress: onProgressSpy })
expect(contentType).to.equal('text/plain')
const chunks = new TextEncoder().encode('Hello, world!')
const iterator = (async function * () { yield chunks })()
const { firstChunk, stream } = await getStreamFromAsyncIterable(iterator, 'test.txt', defaultLogger(), { onProgress: onProgressSpy })
expect(firstChunk).to.equal(chunks)
const reader = stream.getReader()
const { value } = await reader.read()
expect(onProgressSpy.callCount).to.equal(1)
expect(new TextDecoder().decode(value)).to.equal('Hello, world!')
})

it('should handle multiple chunks of data', async () => {
const iterator = (async function * () { yield new TextEncoder().encode('Hello,'); yield new TextEncoder().encode(' world!') })()
const { contentType, stream } = await getStreamAndContentType(iterator, 'test.txt', defaultLogger(), { onProgress: onProgressSpy })
expect(contentType).to.equal('text/plain')
const textEncoder = new TextEncoder()
const chunks = ['Hello,', ' world!'].map((txt) => textEncoder.encode(txt))
const iterator = (async function * () { yield chunks[0]; yield chunks[1] })()
const { firstChunk, stream } = await getStreamFromAsyncIterable(iterator, 'test.txt', defaultLogger(), { onProgress: onProgressSpy })
expect(firstChunk).to.equal(chunks[0])
const reader = stream.getReader()
let result = ''
let chunk
Expand All @@ -42,20 +45,23 @@ describe('getStreamAndContentType', () => {
it('should include last value done is true', async () => {
// if done === true and there is a value
const LIMIT = 5
let actualFirstChunk: Uint8Array
const iterator: AsyncIterable<Uint8Array> = {
[Symbol.asyncIterator] () {
let i = 0
return {
async next () {
const done = i === LIMIT
const value = new Uint8Array([i++])
actualFirstChunk = actualFirstChunk ?? value
return Promise.resolve({ value, done })
}
}
}
}
const { contentType, stream } = await getStreamAndContentType(iterator, 'test.txt', defaultLogger(), { onProgress: onProgressSpy })
expect(contentType).to.equal('text/plain')
const { firstChunk, stream } = await getStreamFromAsyncIterable(iterator, 'test.txt', defaultLogger(), { onProgress: onProgressSpy })
// @ts-expect-error - actualFirstChunk is not used before set, because the await above.
expect(firstChunk).to.equal(actualFirstChunk)
const reader = stream.getReader()
const result = []
let chunk
Expand Down
Loading

0 comments on commit c4f3355

Please sign in to comment.