Skip to content

Commit

Permalink
feat: use blockstore sessions
Browse files Browse the repository at this point in the history
Adds a configurable session cache that creates sessions based on
the base URL of the requested resource.

E.g. `https://Qmfoo.ipfs.gateway.com/foo.txt` and`https://Qmfoo.ipfs.gateway.com/bar.txt`
will be loaded from the same session.

Defaults to 100 sessions maximum with a TTL of one minute. These
are arbitrary numbers that will require some tweaking.
  • Loading branch information
achingbrain committed Apr 22, 2024
1 parent abaaeab commit 4b05147
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 29 deletions.
1 change: 1 addition & 0 deletions packages/verified-fetch/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
"it-pipe": "^3.0.1",
"it-tar": "^6.0.5",
"it-to-browser-readablestream": "^2.0.6",
"lru-cache": "^10.2.0",
"multiformats": "^13.1.0",
"progress-events": "^1.0.0",
"uint8arrays": "^5.0.3"
Expand Down
31 changes: 31 additions & 0 deletions packages/verified-fetch/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,22 @@ export interface CreateVerifiedFetchOptions {
* @default undefined
*/
contentTypeParser?: ContentTypeParser

/**
* Blockstore sessions are cached for reuse with requests with the same
* base URL or CID. This parameter controls how many to cache. Once this limit
* is reached older/less used sessions will be evicted from the cache.
*
* @default 100
*/
sessionCacheSize?: number

/**
* How long each blockstore session should stay in the cache for.
*
* @default 60000
*/
sessionTTLms?: number
}

/**
Expand Down Expand Up @@ -696,6 +712,21 @@ export type VerifiedFetchProgressEvents =
* progress events.
*/
export interface VerifiedFetchInit extends RequestInit, ProgressOptions<BubbledProgressEvents | VerifiedFetchProgressEvents> {
/**
* If true, try to create a blockstore session - this can reduce overall
* network traffic by first querying for a set of peers that have the data we
* wish to retrieve. Subsequent requests for data using the session will only
* be sent to those peers, unless they don't have the data, in which case
* further peers will be added to the session.
*
* Sessions are cached based on the CID/IPNS name they attempt to access. That
* is, requests for `https://qmfoo.ipfs.localhost/bar.txt` and
* `https://qmfoo.ipfs.localhost/baz.txt` would use the same session, if this
* argument is true for both fetch requests.
*
* @default true
*/
session?: boolean
}

/**
Expand Down
2 changes: 1 addition & 1 deletion packages/verified-fetch/src/utils/parse-url-string.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ function matchUrlGroupsGuard (groups?: null | { [key in string]: string; } | Mat
(queryString == null || typeof queryString === 'string')
}

function matchURLString (urlString: string): MatchUrlGroups {
export function matchURLString (urlString: string): MatchUrlGroups {
for (const pattern of [URL_REGEX, PATH_REGEX, PATH_GATEWAY_REGEX, SUBDOMAIN_GATEWAY_REGEX]) {
const match = urlString.match(pattern)

Expand Down
30 changes: 30 additions & 0 deletions packages/verified-fetch/src/utils/resource-to-cache-key.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { CID } from 'multiformats/cid'
import { matchURLString } from './parse-url-string.js'

/**
* Takes a resource and returns a session cache key as an IPFS or IPNS path with
* any trailing segments removed.
*
* E.g.
*
* - Qmfoo -> /ipfs/Qmfoo
* - https://Qmfoo.ipfs.gateway.org -> /ipfs/Qmfoo
* - https://gateway.org/ipfs/Qmfoo -> /ipfs/Qmfoo
* - https://gateway.org/ipfs/Qmfoo/bar.txt -> /ipfs/Qmfoo
* - etc
*/
export function resourceToSessionCacheKey (url: string | CID): string {
const cid = CID.asCID(url)

if (cid != null) {
return `/ipfs/${cid}`
}

try {
return `/ipfs/${CID.parse(url.toString())}`
} catch {}

const { protocol, cidOrPeerIdOrDnsLink } = matchURLString(url.toString())

return `/${protocol}/${cidOrPeerIdOrDnsLink}`
}
92 changes: 66 additions & 26 deletions packages/verified-fetch/src/verified-fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { peerIdFromString } from '@libp2p/peer-id'
import { Key } from 'interface-datastore'
import { exporter } from 'ipfs-unixfs-exporter'
import toBrowserReadableStream from 'it-to-browser-readablestream'
import { LRUCache } from 'lru-cache'
import { code as jsonCode } from 'multiformats/codecs/json'
import { code as rawCode } from 'multiformats/codecs/raw'
import { identity } from 'multiformats/hashes/identity'
Expand All @@ -24,34 +25,43 @@ import { getResolvedAcceptHeader } from './utils/get-resolved-accept-header.js'
import { getStreamFromAsyncIterable } from './utils/get-stream-from-async-iterable.js'
import { tarStream } from './utils/get-tar-stream.js'
import { parseResource } from './utils/parse-resource.js'
import { resourceToSessionCacheKey } from './utils/resource-to-cache-key.js'
import { setCacheControlHeader } from './utils/response-headers.js'
import { badRequestResponse, movedPermanentlyResponse, notAcceptableResponse, notSupportedResponse, okResponse, badRangeResponse, okRangeResponse, badGatewayResponse, notFoundResponse } from './utils/responses.js'
import { selectOutputType } from './utils/select-output-type.js'
import { isObjectNode, walkPath } from './utils/walk-path.js'
import type { CIDDetail, ContentTypeParser, Resource, VerifiedFetchInit as VerifiedFetchOptions } from './index.js'
import type { CIDDetail, ContentTypeParser, CreateVerifiedFetchOptions, Resource, VerifiedFetchInit as VerifiedFetchOptions } from './index.js'
import type { RequestFormatShorthand } from './types.js'
import type { ParsedUrlStringResults } from './utils/parse-url-string'
import type { Helia } from '@helia/interface'
import type { DNSResolver } from '@multiformats/dns/resolvers'
import type { Helia, SessionBlockstore } from '@helia/interface'
import type { Blockstore } from 'interface-blockstore'
import type { ObjectNode, UnixFSEntry } from 'ipfs-unixfs-exporter'
import type { CID } from 'multiformats/cid'

const SESSION_CACHE_MAX_SIZE = 100
const SESSION_CACHE_TTL_MS = 60 * 1000

interface VerifiedFetchComponents {
helia: Helia
ipns?: IPNS
}

/**
* Potential future options for the VerifiedFetch constructor.
*/
interface VerifiedFetchInit {
contentTypeParser?: ContentTypeParser
dnsResolvers?: DNSResolver[]
}

interface FetchHandlerFunctionArg {
cid: CID
path: string

/**
* A key for use with the blockstore session cache
*/
cacheKey: string

/**
* Whether to use a session during fetch operations
*
* @default true
*/
session: boolean

options?: Omit<VerifiedFetchOptions, 'signal'> & AbortOptions

/**
Expand Down Expand Up @@ -129,15 +139,38 @@ export class VerifiedFetch {
private readonly ipns: IPNS
private readonly log: Logger
private readonly contentTypeParser: ContentTypeParser | undefined
private readonly blockstoreSessions: LRUCache<string, SessionBlockstore>

constructor ({ helia, ipns }: VerifiedFetchComponents, init?: VerifiedFetchInit) {
constructor ({ helia, ipns }: VerifiedFetchComponents, init?: CreateVerifiedFetchOptions) {
this.helia = helia
this.log = helia.logger.forComponent('helia:verified-fetch')
this.ipns = ipns ?? heliaIpns(helia)
this.contentTypeParser = init?.contentTypeParser
this.blockstoreSessions = new LRUCache({
max: init?.sessionCacheSize ?? SESSION_CACHE_MAX_SIZE,
ttl: init?.sessionTTLms ?? SESSION_CACHE_TTL_MS,
dispose: (store) => {
store.close()
}
})
this.log.trace('created VerifiedFetch instance')
}

private getBlockstore (root: CID, key: string, useSession: boolean, options?: AbortOptions): Blockstore {
if (!useSession) {
return this.helia.blockstore
}

let session = this.blockstoreSessions.get(key)

if (session == null) {
session = this.helia.blockstore.createSession(root, options)
this.blockstoreSessions.set(key, session)
}

return session
}

/**
* Accepts an `ipns://...` URL as a string and returns a `Response` containing
* a raw IPNS record.
Expand Down Expand Up @@ -178,8 +211,9 @@ export class VerifiedFetch {
* Accepts a `CID` and returns a `Response` with a body stream that is a CAR
* of the `DAG` referenced by the `CID`.
*/
private async handleCar ({ resource, cid, options }: FetchHandlerFunctionArg): Promise<Response> {
const c = car(this.helia)
private async handleCar ({ resource, cid, session, cacheKey, options }: FetchHandlerFunctionArg): Promise<Response> {
const blockstore = this.getBlockstore(cid, cacheKey, session, options)
const c = car({ blockstore, dagWalkers: this.helia.dagWalkers })
const stream = toBrowserReadableStream(c.stream(cid, options))

const response = okResponse(resource, stream)
Expand All @@ -192,22 +226,24 @@ export class VerifiedFetch {
* Accepts a UnixFS `CID` and returns a `.tar` file containing the file or
* directory structure referenced by the `CID`.
*/
private async handleTar ({ resource, cid, path, options }: FetchHandlerFunctionArg): Promise<Response> {
private async handleTar ({ resource, cid, path, session, cacheKey, options }: FetchHandlerFunctionArg): Promise<Response> {
if (cid.code !== dagPbCode && cid.code !== rawCode) {
return notAcceptableResponse('only UnixFS data can be returned in a TAR file')
}

const stream = toBrowserReadableStream<Uint8Array>(tarStream(`/ipfs/${cid}/${path}`, this.helia.blockstore, options))
const blockstore = this.getBlockstore(cid, cacheKey, session, options)
const stream = toBrowserReadableStream<Uint8Array>(tarStream(`/ipfs/${cid}/${path}`, blockstore, options))

const response = okResponse(resource, stream)
response.headers.set('content-type', 'application/x-tar')

return response
}

private async handleJson ({ resource, cid, path, accept, options }: FetchHandlerFunctionArg): Promise<Response> {
private async handleJson ({ resource, cid, path, accept, session, cacheKey, options }: FetchHandlerFunctionArg): Promise<Response> {
this.log.trace('fetching %c/%s', cid, path)
const block = await this.helia.blockstore.get(cid, options)
const blockstore = this.getBlockstore(cid, cacheKey, session, options)
const block = await blockstore.get(cid, options)
let body: string | Uint8Array

if (accept === 'application/vnd.ipld.dag-cbor' || accept === 'application/cbor') {
Expand All @@ -231,14 +267,15 @@ export class VerifiedFetch {
return response
}

private async handleDagCbor ({ resource, cid, path, accept, options }: FetchHandlerFunctionArg): Promise<Response> {
private async handleDagCbor ({ resource, cid, path, accept, session, cacheKey, options }: FetchHandlerFunctionArg): Promise<Response> {
this.log.trace('fetching %c/%s', cid, path)
let terminalElement: ObjectNode | undefined
let ipfsRoots: CID[] | undefined
const blockstore = this.getBlockstore(cid, cacheKey, session, options)

// need to walk path, if it exists, to get the terminal element
try {
const pathDetails = await walkPath(this.helia.blockstore, `${cid.toString()}/${path}`, options)
const pathDetails = await walkPath(blockstore, `${cid.toString()}/${path}`, options)
ipfsRoots = pathDetails.ipfsRoots
const potentialTerminalElement = pathDetails.terminalElement
if (potentialTerminalElement == null) {
Expand All @@ -256,7 +293,7 @@ export class VerifiedFetch {
this.log.error('error walking path %s', path, err)
return badGatewayResponse(resource, 'Error walking path')
}
const block = terminalElement?.node ?? await this.helia.blockstore.get(cid, options)
const block = terminalElement?.node ?? await blockstore.get(cid, options)

let body: string | Uint8Array

Expand Down Expand Up @@ -304,14 +341,15 @@ export class VerifiedFetch {
return response
}

private async handleDagPb ({ cid, path, resource, options }: FetchHandlerFunctionArg): Promise<Response> {
private async handleDagPb ({ cid, path, resource, cacheKey, session, options }: FetchHandlerFunctionArg): Promise<Response> {
let terminalElement: UnixFSEntry | undefined
let ipfsRoots: CID[] | undefined
let redirected = false
const byteRangeContext = new ByteRangeContext(this.helia.logger, options?.headers)
const blockstore = this.getBlockstore(cid, cacheKey, session, options)

try {
const pathDetails = await walkPath(this.helia.blockstore, `${cid.toString()}/${path}`, options)
const pathDetails = await walkPath(blockstore, `${cid.toString()}/${path}`, options)
ipfsRoots = pathDetails.ipfsRoots
terminalElement = pathDetails.terminalElement
} catch (err: any) {
Expand Down Expand Up @@ -415,9 +453,10 @@ export class VerifiedFetch {
}
}

private async handleRaw ({ resource, cid, path, options, accept }: FetchHandlerFunctionArg): Promise<Response> {
private async handleRaw ({ resource, cid, path, session, cacheKey, options, accept }: FetchHandlerFunctionArg): Promise<Response> {
const byteRangeContext = new ByteRangeContext(this.helia.logger, options?.headers)
const result = await this.helia.blockstore.get(cid, options)
const blockstore = this.getBlockstore(cid, cacheKey, session, options)
const result = await blockstore.get(cid, options)
byteRangeContext.setBody(result)
const response = okRangeResponse(resource, byteRangeContext.getBody(), { byteRangeContext, log: this.log }, {
redirected: false
Expand Down Expand Up @@ -520,7 +559,8 @@ export class VerifiedFetch {
let response: Response
let reqFormat: RequestFormatShorthand | undefined

const handlerArgs: FetchHandlerFunctionArg = { resource: resource.toString(), cid, path, accept, options }
const cacheKey = resourceToSessionCacheKey(resource)
const handlerArgs: FetchHandlerFunctionArg = { resource: resource.toString(), cid, path, accept, cacheKey, session: options?.session ?? true, options }

if (accept === 'application/vnd.ipfs.ipns-record') {
// the user requested a raw IPNS record
Expand Down
7 changes: 5 additions & 2 deletions packages/verified-fetch/test/abort-handling.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,14 @@ describe('abort-handling', function () {
peerIdResolverCalled.resolve()
return getAbortablePromise(options.signal)
})
blockRetriever = stubInterface<Required<Pick<BlockBroker, 'retrieve'>>>({
blockRetriever = stubInterface<Required<Pick<BlockBroker, 'retrieve' | 'createSession'>>>({
retrieve: sandbox.stub().callsFake(async (cid, options) => {
blockBrokerRetrieveCalled.resolve()
return getAbortablePromise(options.signal)
})
}),
createSession: () => {
return blockRetriever
}
})

logger = prefixLogger('test:abort-handling')
Expand Down
55 changes: 55 additions & 0 deletions packages/verified-fetch/test/utils/resource-to-cache-key.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import { expect } from 'aegir/chai'
import { CID } from 'multiformats/cid'
import { resourceToSessionCacheKey } from '../../src/utils/resource-to-cache-key.js'

describe('resource-to-cache-key', () => {
it('converts url with IPFS path', () => {
expect(resourceToSessionCacheKey('https://localhost:8080/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJA'))
.to.equal('/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJA')
})

it('converts url with IPFS path and resource path', () => {
expect(resourceToSessionCacheKey('https://localhost:8080/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJA/foo/bar/baz.txt'))
.to.equal('/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJA')
})

it('converts url with IPNS path', () => {
expect(resourceToSessionCacheKey('https://localhost:8080/ipns/ipfs.io'))
.to.equal('/ipns/ipfs.io')
})

it('converts url with IPNS path and resource path', () => {
expect(resourceToSessionCacheKey('https://localhost:8080/ipns/ipfs.io/foo/bar/baz.txt'))
.to.equal('/ipns/ipfs.io')
})

it('converts IPFS subdomain', () => {
expect(resourceToSessionCacheKey('https://QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJA.ipfs.localhost:8080'))
.to.equal('/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJA')
})

it('converts IPFS subdomain with path', () => {
expect(resourceToSessionCacheKey('https://QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJA.ipfs.localhost:8080/foo/bar/baz.txt'))
.to.equal('/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJA')
})

it('converts IPNS subdomain', () => {
expect(resourceToSessionCacheKey('https://ipfs.io.ipns.localhost:8080'))
.to.equal('/ipns/ipfs.io')
})

it('converts IPNS subdomain with resource path', () => {
expect(resourceToSessionCacheKey('https://ipfs.io.ipns.localhost:8080/foo/bar/baz.txt'))
.to.equal('/ipns/ipfs.io')
})

it('converts CID', () => {
expect(resourceToSessionCacheKey(CID.parse('QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJA')))
.to.equal('/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJA')
})

it('converts CID string', () => {
expect(resourceToSessionCacheKey('QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJA'))
.to.equal('/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJA')
})
})
Loading

0 comments on commit 4b05147

Please sign in to comment.