Skip to content

Commit

Permalink
fix: pass integrityEmitter to cacache to avoid a redundant integrity …
Browse files Browse the repository at this point in the history
…stream
  • Loading branch information
nlf committed May 18, 2022
1 parent a88213e commit ae62c21
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 5 deletions.
16 changes: 13 additions & 3 deletions lib/cache/entry.js
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ class CacheEntry {
metadata: getMetadata(this.request, this.response, this.options),
size,
integrity: this.options.integrity,
integrityEmitter: this.response.body.hasIntegrityEmitter && this.response.body,
}

let body = null
Expand All @@ -273,10 +274,17 @@ class CacheEntry {
return cacheWritePromise
},
}))
// this is always true since if we aren't reusing the one from the remote fetch, we
// are using the one from cacache
body.hasIntegrityEmitter = true

const onResume = () => {
const tee = new Minipass()
const cacheStream = cacache.put.stream(this.options.cachePath, this.key, cacheOpts)
// re-emit the integrity and size events on our new response body so they can be reused
cacheStream.on('integrity', i => body.emit('integrity', i))
cacheStream.on('size', s => body.emit('size', s))
// stick a flag on here so downstream users will know if they can expect integrity events
tee.pipe(cacheStream)
// TODO if the cache write fails, log a warning but return the response anyway
cacheStream.promise().then(cacheWriteResolve, cacheWriteReject)
Expand Down Expand Up @@ -320,6 +328,7 @@ class CacheEntry {
// we're responding with a full cached response, so create a body
// that reads from cacache and attach it to a new Response
const body = new Minipass()
const headers = { ...this.policy.responseHeaders() }
const onResume = () => {
const cacheStream = cacache.get.stream.byDigest(
this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize }
Expand All @@ -337,6 +346,9 @@ class CacheEntry {
body.emit('error', err)
cacheStream.resume()
})
// emit the integrity and size events based on our metadata so we're consistent
body.emit('integrity', this.entry.integrity)
body.emit('size', Number(headers['content-length']))
cacheStream.pipe(body)
}

Expand All @@ -346,9 +358,7 @@ class CacheEntry {
url: this.entry.metadata.url,
counter: options.counter,
status: 200,
headers: {
...this.policy.responseHeaders(),
},
headers,
})
}

Expand Down
9 changes: 8 additions & 1 deletion lib/remote.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ const remoteFetch = (request, options) => {
// we got a 200 response and the user has specified an expected
// integrity value, so wrap the response in an ssri stream to verify it
const integrityStream = ssri.integrityStream({ integrity: _opts.integrity })
res = new fetch.Response(new MinipassPipeline(res.body, integrityStream), res)
const pipeline = new MinipassPipeline(res.body, integrityStream)
// we also propagate the integrity and size events out to the pipeline so we can use
// this new response body as an integrityEmitter for cacache
integrityStream.on('integrity', i => pipeline.emit('integrity', i))
integrityStream.on('size', s => pipeline.emit('size', s))
res = new fetch.Response(pipeline, res)
// set an explicit flag so we know if our response body will emit integrity and size
res.body.hasIntegrityEmitter = true
}

res.headers.set('x-fetch-attempts', attemptNum)
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
"license": "ISC",
"dependencies": {
"agentkeepalive": "^4.2.1",
"cacache": "^16.0.2",
"cacache": "^16.1.0",
"http-cache-semantics": "^4.1.0",
"http-proxy-agent": "^5.0.0",
"https-proxy-agent": "^5.0.0",
Expand Down
79 changes: 79 additions & 0 deletions test/events.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
'use strict'

const events = require('events')
const nock = require('nock')
const ssri = require('ssri')
const t = require('tap')

const fetch = require('../lib/index.js')

const CONTENT = Buffer.from('hello, world!', { encoding: 'utf8' })
const HOST = 'https://make-fetch-happen.npm'

nock.disableNetConnect()
t.beforeEach(() => {
nock.cleanAll()
})

t.test('emits integrity and size events', t => {
t.test('when response is cacheable', async t => {
const INTEGRITY = ssri.fromData(CONTENT)
const CACHE = t.testdir()
const srv = nock(HOST)
.get('/test')
.reply(200, CONTENT)

const res = await fetch(`${HOST}/test`, { cachePath: CACHE })
t.equal(res.status, 200, 'successful status code')
t.equal(res.headers.get('x-local-cache-status'), 'miss', 'is a cache miss')
t.equal(res.body.hasIntegrityEmitter, true, 'flag is set on body')
const gotIntegrity = events.once(res.body, 'integrity').then(i => i[0])
const gotSize = events.once(res.body, 'size').then(s => s[0])
const [integrity, size, buf] = await Promise.all([gotIntegrity, gotSize, res.buffer()])
t.same(buf, CONTENT, 'request succeeded')
t.same(integrity, INTEGRITY, 'got the right integrity')
t.same(size, CONTENT.byteLength, 'got the right size')
t.ok(srv.isDone())
})

t.test('when expected integrity is provided', async t => {
const INTEGRITY = ssri.fromData(CONTENT)
const srv = nock(HOST)
.get('/test')
.reply(200, CONTENT)

const res = await fetch(`${HOST}/test`, { integrity: INTEGRITY })
t.equal(res.status, 200, 'successful status code')
t.notOk(res.headers.has('x-local-cache-status'), 'should not touch the cache')
t.equal(res.body.hasIntegrityEmitter, true, 'flag is set on body')
const gotIntegrity = events.once(res.body, 'integrity').then(i => i[0])
const gotSize = events.once(res.body, 'size').then(s => s[0])
const [integrity, size, buf] = await Promise.all([gotIntegrity, gotSize, res.buffer()])
t.same(buf, CONTENT, 'request succeeded')
t.same(integrity, INTEGRITY, 'got the right integrity')
t.same(size, CONTENT.byteLength, 'got the right size')
t.ok(srv.isDone())
})

t.test('when both expected integrity is provided and response is cacheable', async t => {
const INTEGRITY = ssri.fromData(CONTENT)
const CACHE = t.testdir()
const srv = nock(HOST)
.get('/test')
.reply(200, CONTENT)

const res = await fetch(`${HOST}/test`, { cachePath: CACHE, integrity: INTEGRITY })
t.equal(res.status, 200, 'successful status code')
t.equal(res.headers.get('x-local-cache-status'), 'miss', 'is a cache miss')
t.equal(res.body.hasIntegrityEmitter, true, 'flag is set on body')
const gotIntegrity = events.once(res.body, 'integrity').then(i => i[0])
const gotSize = events.once(res.body, 'size').then(s => s[0])
const [integrity, size, buf] = await Promise.all([gotIntegrity, gotSize, res.buffer()])
t.same(buf, CONTENT, 'request succeeded')
t.same(integrity, INTEGRITY, 'got the right integrity')
t.same(size, CONTENT.byteLength, 'got the right size')
t.ok(srv.isDone())
})

t.end()
})

0 comments on commit ae62c21

Please sign in to comment.