Skip to content

Commit

Permalink
feat: abort signals are respected (#26)
Browse files Browse the repository at this point in the history
* fix: options.signal should be used if not === null

* fix: misc

- badRequestResponse is called correctly
- badRequestResponse accepts error for body
- add tests for abort handling

* chore: remove throwIfAborted expressions

* chore: significant cleanup of abort handling

* chore: remove debugging log statement

* chore: move test fixtures

* chore: remove commented out test fixture

* fix: getStreamFromAsyncIterable closes on signal abort
  • Loading branch information
SgtPooki authored Mar 21, 2024
1 parent 114f3a4 commit 30148fe
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type { ComponentLogger } from '@libp2p/interface'
/**
* Converts an async iterator of Uint8Array bytes to a stream and returns the first chunk of bytes
*/
export async function getStreamFromAsyncIterable (iterator: AsyncIterable<Uint8Array>, path: string, logger: ComponentLogger, options?: Pick<VerifiedFetchInit, 'onProgress'>): Promise<{ stream: ReadableStream<Uint8Array>, firstChunk: Uint8Array }> {
export async function getStreamFromAsyncIterable (iterator: AsyncIterable<Uint8Array>, path: string, logger: ComponentLogger, options?: Pick<VerifiedFetchInit, 'onProgress' | 'signal'>): Promise<{ stream: ReadableStream<Uint8Array>, firstChunk: Uint8Array }> {
const log = logger.forComponent('helia:verified-fetch:get-stream-from-async-iterable')
const reader = iterator[Symbol.asyncIterator]()
const { value: firstChunk, done } = await reader.next()
Expand All @@ -23,6 +23,11 @@ export async function getStreamFromAsyncIterable (iterator: AsyncIterable<Uint8A
},
async pull (controller) {
const { value, done } = await reader.next()
if (options?.signal?.aborted === true) {
controller.error(new Error(options.signal.reason ?? 'signal aborted by user'))
controller.close()
return
}

if (done === true) {
if (value != null) {
Expand Down
9 changes: 4 additions & 5 deletions packages/verified-fetch/src/utils/parse-resource.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import { CID } from 'multiformats/cid'
import { parseUrlString } from './parse-url-string.js'
import type { ParsedUrlStringResults } from './parse-url-string.js'
import type { ParseUrlStringOptions, ParsedUrlStringResults } from './parse-url-string.js'
import type { Resource } from '../index.js'
import type { IPNS, IPNSRoutingEvents, ResolveDNSLinkProgressEvents, ResolveProgressEvents } from '@helia/ipns'
import type { IPNS } from '@helia/ipns'
import type { ComponentLogger } from '@libp2p/interface'
import type { ProgressOptions } from 'progress-events'

export interface ParseResourceComponents {
ipns: IPNS
logger: ComponentLogger
}

export interface ParseResourceOptions extends ProgressOptions<ResolveProgressEvents | IPNSRoutingEvents | ResolveDNSLinkProgressEvents> {
export interface ParseResourceOptions extends ParseUrlStringOptions {

}
/**
Expand All @@ -21,7 +20,7 @@ export interface ParseResourceOptions extends ProgressOptions<ResolveProgressEve
*/
export async function parseResource (resource: Resource, { ipns, logger }: ParseResourceComponents, options?: ParseResourceOptions): Promise<ParsedUrlStringResults> {
if (typeof resource === 'string') {
return parseUrlString({ urlString: resource, ipns, logger }, { onProgress: options?.onProgress })
return parseUrlString({ urlString: resource, ipns, logger }, options)
}

const cid = CID.asCID(resource)
Expand Down
19 changes: 12 additions & 7 deletions packages/verified-fetch/src/utils/parse-url-string.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import { peerIdFromString } from '@libp2p/peer-id'
import { CID } from 'multiformats/cid'
import { TLRU } from './tlru.js'
import type { RequestFormatShorthand } from '../types.js'
import type { DNSLinkResolveResult, IPNS, IPNSResolveResult, ResolveDNSLinkProgressEvents, ResolveResult } from '@helia/ipns'
import type { ComponentLogger } from '@libp2p/interface'
import type { DNSLinkResolveResult, IPNS, IPNSResolveResult, IPNSRoutingEvents, ResolveDNSLinkProgressEvents, ResolveProgressEvents, ResolveResult } from '@helia/ipns'
import type { AbortOptions, ComponentLogger } from '@libp2p/interface'
import type { ProgressOptions } from 'progress-events'

const ipnsCache = new TLRU<DNSLinkResolveResult | IPNSResolveResult>(1000)
Expand All @@ -13,7 +13,7 @@ export interface ParseUrlStringInput {
ipns: IPNS
logger: ComponentLogger
}
export interface ParseUrlStringOptions extends ProgressOptions<ResolveDNSLinkProgressEvents> {
export interface ParseUrlStringOptions extends ProgressOptions<ResolveProgressEvents | IPNSRoutingEvents | ResolveDNSLinkProgressEvents>, AbortOptions {

}

Expand Down Expand Up @@ -131,7 +131,10 @@ function dnsLinkLabelDecoder (linkLabel: string): string {
* After determining the protocol successfully, we process the cidOrPeerIdOrDnsLink:
* * If it's ipfs, it parses the CID or throws an Aggregate error
* * If it's ipns, it attempts to resolve the PeerId and then the DNSLink. If both fail, an Aggregate error is thrown.
*
* @todo we need to break out each step of this function (cid parsing, ipns resolving, dnslink resolving) into separate functions and then remove the eslint-disable comment
*/
// eslint-disable-next-line complexity
export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStringInput, options?: ParseUrlStringOptions): Promise<ParsedUrlStringResults> {
const log = logger.forComponent('helia:verified-fetch:parse-url-string')
const { protocol, cidOrPeerIdOrDnsLink, path: urlPath, queryString } = matchURLString(urlString)
Expand Down Expand Up @@ -165,11 +168,12 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin
try {
// try resolving as an IPNS name
peerId = peerIdFromString(cidOrPeerIdOrDnsLink)
resolveResult = await ipns.resolve(peerId, { onProgress: options?.onProgress })
cid = resolveResult.cid
resolvedPath = resolveResult.path
resolveResult = await ipns.resolve(peerId, options)
cid = resolveResult?.cid
resolvedPath = resolveResult?.path
log.trace('resolved %s to %c', cidOrPeerIdOrDnsLink, cid)
} catch (err) {
options?.signal?.throwIfAborted()
if (peerId == null) {
log.error('could not parse PeerId string "%s"', cidOrPeerIdOrDnsLink, err)
errors.push(new TypeError(`Could not parse PeerId in ipns url "${cidOrPeerIdOrDnsLink}", ${(err as Error).message}`))
Expand All @@ -189,11 +193,12 @@ export async function parseUrlString ({ urlString, ipns, logger }: ParseUrlStrin
log.trace('Attempting to resolve DNSLink for %s', decodedDnsLinkLabel)

try {
resolveResult = await ipns.resolveDNSLink(decodedDnsLinkLabel, { onProgress: options?.onProgress })
resolveResult = await ipns.resolveDNSLink(decodedDnsLinkLabel, options)
cid = resolveResult?.cid
resolvedPath = resolveResult?.path
log.trace('resolved %s to %c', decodedDnsLinkLabel, cid)
} catch (err: any) {
options?.signal?.throwIfAborted()
log.error('could not resolve DnsLink for "%s"', cidOrPeerIdOrDnsLink, err)
errors.push(err)
}
Expand Down
8 changes: 7 additions & 1 deletion packages/verified-fetch/src/utils/responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,13 @@ export function notAcceptableResponse (url: string, body?: SupportedBodyTypes, i
return response
}

export function badRequestResponse (url: string, body?: SupportedBodyTypes, init?: ResponseInit): Response {
/**
* if body is an Error, it will be converted to a string containing the error message.
*/
export function badRequestResponse (url: string, body?: SupportedBodyTypes | Error, init?: ResponseInit): Response {
if (body instanceof Error) {
body = body.message
}
const response = new Response(body, {
...(init ?? {}),
status: 400,
Expand Down
41 changes: 35 additions & 6 deletions packages/verified-fetch/src/verified-fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ function convertOptions (options?: VerifiedFetchOptions): (Omit<VerifiedFetchOpt
let signal: AbortSignal | undefined
if (options?.signal === null) {
signal = undefined
} else {
signal = options?.signal
}

return {
...options,
signal
Expand Down Expand Up @@ -284,10 +287,13 @@ export class VerifiedFetch {
const pathDetails = await walkPath(this.helia.blockstore, `${cid.toString()}/${path}`, options)
ipfsRoots = pathDetails.ipfsRoots
terminalElement = pathDetails.terminalElement
} catch (err) {
} catch (err: any) {
this.log.error('error walking path %s', path, err)
if (options?.signal?.aborted === true) {
return badRequestResponse(resource.toString(), new Error('signal aborted by user'))
}

return badGatewayResponse('Error walking path')
return badGatewayResponse(resource.toString(), 'Error walking path')
}

let resolvedCID = terminalElement?.cid ?? cid
Expand Down Expand Up @@ -347,7 +353,8 @@ export class VerifiedFetch {

try {
const { stream, firstChunk } = await getStreamFromAsyncIterable(asyncIter, path ?? '', this.helia.logger, {
onProgress: options?.onProgress
onProgress: options?.onProgress,
signal: options?.signal
})
byteRangeContext.setBody(stream)
// if not a valid range request, okRangeRequest will call okResponse
Expand All @@ -367,7 +374,7 @@ export class VerifiedFetch {
if (byteRangeContext.isRangeRequest && err.code === 'ERR_INVALID_PARAMS') {
return badRangeResponse(resource)
}
return badGatewayResponse('Unable to stream content')
return badGatewayResponse(resource.toString(), 'Unable to stream content')
}
}

Expand Down Expand Up @@ -431,11 +438,33 @@ export class VerifiedFetch {
[identity.code]: this.handleRaw
}

/**
*
* TODO: Should we use 400, 408, 418, or 425, or throw and not even return a response?
*/
private async abortHandler (opController: AbortController): Promise<void> {
this.log.error('signal aborted by user')
opController.abort('signal aborted by user')
}

/**
* We're starting to get to the point where we need a queue or pipeline of
* operations to perform and a single place to handle errors.
*
* TODO: move operations called by fetch to a queue of operations where we can
* always exit early (and cleanly) if a given signal is aborted
*/
async fetch (resource: Resource, opts?: VerifiedFetchOptions): Promise<Response> {
this.log('fetch %s', resource)

const options = convertOptions(opts)

const opController = new AbortController()
if (options?.signal != null) {
options.signal.onabort = this.abortHandler.bind(this, opController)
options.signal = opController.signal
}

options?.onProgress?.(new CustomProgressEvent<CIDDetail>('verified-fetch:request:start', { resource }))

// resolve the CID/path from the requested resource
Expand All @@ -451,10 +480,10 @@ export class VerifiedFetch {
query = result.query
ttl = result.ttl
protocol = result.protocol
} catch (err) {
} catch (err: any) {
this.log.error('error parsing resource %s', resource, err)

return badRequestResponse('Invalid resource')
return badRequestResponse(resource.toString(), err)
}

options?.onProgress?.(new CustomProgressEvent<CIDDetail>('verified-fetch:request:resolve', { cid, path }))
Expand Down
139 changes: 139 additions & 0 deletions packages/verified-fetch/test/abort-handling.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import { dagCbor } from '@helia/dag-cbor'
import { type DNSLinkResolveResult, type IPNS, type IPNSResolveResult } from '@helia/ipns'
import { stop, type ComponentLogger, type Logger } from '@libp2p/interface'
import { prefixLogger, logger as libp2pLogger } from '@libp2p/logger'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { expect } from 'aegir/chai'
import { CID } from 'multiformats/cid'
import pDefer, { type DeferredPromise } from 'p-defer'
import Sinon from 'sinon'
import { stubInterface, type StubbedInstance } from 'sinon-ts'
import { VerifiedFetch } from '../src/verified-fetch.js'
import { createHelia } from './fixtures/create-offline-helia.js'
import { getAbortablePromise } from './fixtures/get-abortable-promise.js'
import { makeAbortedRequest } from './fixtures/make-aborted-request.js'
import type { BlockRetriever, Helia } from '@helia/interface'

describe('abort-handling', function () {
this.timeout(500) // these tests should all fail extremely quickly. if they don't, they're not aborting properly, or they're being ran on an extremely slow machine.
const sandbox = Sinon.createSandbox()
/**
* CID I created by running `npx kubo add --cid-version 1 -r dist` in the `verified-fetch` package folder
*/
const notPublishedCid = CID.parse('bafybeichqiz32cw5c3vdpvh2xtfgl42veqbsr6sw2g6c7ffz6atvh2vise')
let helia: Helia
let name: StubbedInstance<IPNS>
let logger: ComponentLogger
let componentLoggers: Logger[] = []
let verifiedFetch: VerifiedFetch

/**
* Stubbed networking components
*/
let blockRetriever: StubbedInstance<BlockRetriever>
let dnsLinkResolver: Sinon.SinonStub<any[], Promise<DNSLinkResolveResult>>
let peerIdResolver: Sinon.SinonStub<any[], Promise<IPNSResolveResult>>

/**
* used as promises to pass to makeAbortedRequest that will abort the request as soon as it's resolved.
*/
let blockBrokerRetrieveCalled: DeferredPromise<void>
let dnsLinkResolverCalled: DeferredPromise<void>
let peerIdResolverCalled: DeferredPromise<void>

beforeEach(async () => {
peerIdResolver = sandbox.stub()
dnsLinkResolver = sandbox.stub()
peerIdResolverCalled = pDefer()
dnsLinkResolverCalled = pDefer()
blockBrokerRetrieveCalled = pDefer()

dnsLinkResolver.withArgs('timeout-5000-example.com', Sinon.match.any).callsFake(async (_domain, options) => {
dnsLinkResolverCalled.resolve()
return getAbortablePromise(options.signal)
})
peerIdResolver.callsFake(async (peerId, options) => {
peerIdResolverCalled.resolve()
return getAbortablePromise(options.signal)
})
blockRetriever = stubInterface<BlockRetriever>({
retrieve: sandbox.stub().callsFake(async (cid, options) => {
blockBrokerRetrieveCalled.resolve()
return getAbortablePromise(options.signal)
})
})

logger = prefixLogger('test:abort-handling')
sandbox.stub(logger, 'forComponent').callsFake((name) => {
const newLogger = libp2pLogger(`test:abort-handling:child-logger-${componentLoggers.length}:${name}`)
componentLoggers.push(sandbox.stub(newLogger))
return newLogger
})
helia = await createHelia({
logger,
blockBrokers: [() => blockRetriever]
})
name = stubInterface<IPNS>({
resolveDNSLink: dnsLinkResolver,
resolve: peerIdResolver
})
verifiedFetch = new VerifiedFetch({
helia,
ipns: name
})
})

afterEach(async () => {
await stop(helia, verifiedFetch, name)
componentLoggers = []
sandbox.restore()
})

it('should abort a request before peerId resolution', async function () {
const c = dagCbor(helia)
const cid = await c.add({
hello: 'world'
})

const peerId = await createEd25519PeerId()

await name.publish(peerId, cid, { lifetime: 1000 * 60 * 60 })

const abortedResult = await makeAbortedRequest(verifiedFetch, [`ipns://${peerId}`], peerIdResolverCalled.promise)

expect(peerIdResolver.callCount).to.equal(1)
expect(dnsLinkResolver.callCount).to.equal(0) // not called because signal abort was detected
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because we never got the cid
expect(abortedResult).to.be.ok()
expect(abortedResult.status).to.equal(400)
expect(abortedResult.statusText).to.equal('Bad Request')
await expect(abortedResult.text()).to.eventually.contain('aborted')
})

it('should abort a request before dns resolution', async function () {
const abortedResult = await makeAbortedRequest(verifiedFetch, ['ipns://timeout-5000-example.com'], dnsLinkResolverCalled.promise)

expect(peerIdResolver.callCount).to.equal(0) // not called because peerIdFromString fails
expect(dnsLinkResolver.callCount).to.equal(1)
expect(blockRetriever.retrieve.callCount).to.equal(0) // not called because we never got the cid
expect(abortedResult).to.be.ok()
expect(abortedResult.status).to.equal(400)
expect(abortedResult.statusText).to.equal('Bad Request')
await expect(abortedResult.text()).to.eventually.contain('aborted')
})

it('should abort a request while looking for cid', async function () {
const abortedResult = await makeAbortedRequest(verifiedFetch, [notPublishedCid, { headers: { accept: 'application/octet-stream' } }], blockBrokerRetrieveCalled.promise)

expect(peerIdResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(dnsLinkResolver.callCount).to.equal(0) // not called because parseResource never passes the resource to parseUrlString
expect(blockRetriever.retrieve.callCount).to.equal(1)
expect(abortedResult).to.be.ok()
expect(abortedResult.status).to.equal(400)
expect(abortedResult.statusText).to.equal('Bad Request')
// this error is exactly what blockRetriever throws, so we can check for "aborted" in the error message
await expect(abortedResult.text()).to.eventually.contain('aborted')
})

// TODO: verify that the request is aborted when calling unixfs.cat and unixfs.walkPath
})
12 changes: 1 addition & 11 deletions packages/verified-fetch/test/cache-control-header.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,13 @@ import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { dns } from '@multiformats/dns'
import { expect } from 'aegir/chai'
import Sinon from 'sinon'
import { stubInterface } from 'sinon-ts'
import { VerifiedFetch } from '../src/verified-fetch.js'
import { createHelia } from './fixtures/create-offline-helia.js'
import { answerFake } from './fixtures/dns-answer-fake.js'
import type { Helia } from '@helia/interface'
import type { IPNS } from '@helia/ipns'
import type { DNSResponse } from '@multiformats/dns'

function answerFake (data: string, TTL: number, name: string, type: number): DNSResponse {
const fake = stubInterface<DNSResponse>()
fake.Answer = [{
data,
TTL,
name,
type
}]
return fake
}
describe('cache-control header', () => {
let helia: Helia
let name: IPNS
Expand Down
Loading

0 comments on commit 30148fe

Please sign in to comment.