Skip to content

Commit

Permalink
Complete rewrite to use Loki as storage
Browse files Browse the repository at this point in the history
  • Loading branch information
Ale Figueroa committed Mar 5, 2019
1 parent 0fb773b commit 00152ab
Show file tree
Hide file tree
Showing 20 changed files with 143 additions and 963 deletions.
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,13 @@
},
"license": "MIT",
"dependencies": {
"@types/http-proxy": "^1.16.2",
"@types/lokijs": "^1.5.2",
"@types/mkdirp": "^0.5.2",
"content-type": "^1.0.4",
"debug": "^4.1.1",
"fast-json-stable-stringify": "^2.0.0",
"http-proxy": "^1.17.0",
"incoming-message-hash": "^3.2.2",
"invariant": "^2.2.4",
"lodash.map": "^4.6.0",
Expand Down
101 changes: 43 additions & 58 deletions src/Recorder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import { IncomingMessage, ServerResponse, IncomingHttpHeaders } from 'http'
import { URL } from 'url'
import { h64 } from 'xxhashjs'
import { parse } from 'querystring'
import { buffer } from './buffer'
import { proxy } from './proxy'
import * as curl from './curl'
import { isMatch } from 'lodash'
import { isMatch, pick } from 'lodash'
import httpProxy from 'http-proxy'
import { buffer } from './buffer'
import assert from 'assert'

const debug = Debug('yaktime:recorder')

Expand All @@ -18,26 +19,28 @@ type Unpacked<T> = T extends (infer U)[] ? U : T extends (...args: any[]) => inf
type SerializedRequest = ReturnType<Recorder['serializeRequest']>
type SerializedResponse = Unpacked<ReturnType<Recorder['serializeResponse']>>
interface FullSerializedRequest extends SerializedRequest {
$loki?: number
response: SerializedResponse
}

export class Recorder {
opts: YakTimeOpts
host: string
db: Promise<Loki>
proxy: httpProxy
constructor (opts: YakTimeOpts, host: string) {
this.opts = opts
this.host = host
this.db = getDB(opts)
this.proxy = httpProxy.createProxyServer({ target: host, xfwd: true, changeOrigin: true, autoRewrite: true })
}

serializeRequest (req: IncomingMessage, body: any[]) {
serializeRequest (req: IncomingMessage, body: Buffer) {
const fullUrl = new URL(req.url as string, this.host)
const { method = '', httpVersion, headers, trailers } = req
const { method, httpVersion, headers, trailers } = req

const bodyBuffer = Buffer.concat(body)
const bodyEncoded = bodyBuffer.toString('base64')
const bodyHash = h64(bodyBuffer, 0).toString(16)
const bodyEncoded = body.toString('base64')
const bodyHash = h64(body, 0).toString(16)

return {
host: fullUrl.host,
Expand All @@ -52,36 +55,48 @@ export class Recorder {
}
}

async serializeResponse (res: IncomingMessage) {
const statusCode = res.statusCode || 200
async serializeResponse (res: IncomingMessage, body: Buffer) {
const statusCode = res.statusCode as number
const headers = res.headers
const body = Buffer.concat(await buffer<Buffer>(res)).toString('base64')
const trailers = res.trailers

return {
statusCode,
headers,
body,
body: body.toString('base64'),
trailers
}
}

async respond (storedRes: SerializedResponse, res: ServerResponse) {
async respond (storedReq: FullSerializedRequest, res: ServerResponse) {
assert(res.headersSent === false, 'Response has already been delivered')
const storedRes = storedReq.response
res.statusCode = storedRes.statusCode
res.writeHead(storedRes.statusCode, storedRes.headers)
res.addTrailers(storedRes.trailers || {})
res.end(Buffer.from(storedRes.body, 'base64'))
if (storedReq.trailers != null && storedReq.trailers !== {}) {
res.addTrailers(storedReq.trailers)
}
res.writeHead(storedRes.statusCode, { 'x-yakbak-tape': storedReq.$loki, ...storedReq.response.headers })
res.end(Buffer.from(storedReq.response.body, 'base64'))
}

async record (req: IncomingMessage, body: Buffer[], host: string, opts: YakTimeOpts) {
ensureRecordingIsAllowed(req, opts)
async record (req: IncomingMessage, res: ServerResponse, body: Buffer): Promise<FullSerializedRequest> {
ensureRecordingIsAllowed(req, this.opts)
debug('proxy', req.url)
const pres = await proxy(req, body, host)
debug(curl.response(pres))
ensureIsValidStatusCode(pres, opts)
debug('record', req.url)
const request = this.serializeRequest(req, body)
const response = await this.serializeResponse(pres)

const proxyRes: IncomingMessage = await new Promise((resolve, reject) => {
this.proxy.once('proxyRes', async (proxyRes: IncomingMessage) => {
resolve(proxyRes)
})
this.proxy.once('error', reject)
this.proxy.web(req, res, { selfHandleResponse: true })
})

debug(curl.response(proxyRes))
debug('record', req.url)
ensureIsValidStatusCode(proxyRes, this.opts)
const proxiedResponseBody = await buffer(proxyRes)
const response = await this.serializeResponse(proxyRes, proxiedResponseBody)
return this.save(request, response)
}

Expand All @@ -91,13 +106,13 @@ export class Recorder {
return tapes.add({ ...request, response })
}

async read (req: IncomingMessage, body: Buffer[]) {
async read (req: IncomingMessage, body: Buffer) {
const serializedRequest = this.serializeRequest(req, body)
return this.load(serializedRequest)
}

async load (request: SerializedRequest): Promise<FullSerializedRequest | null> {
const { ignoredQueryFields = [], ignoredHeaders = [] } = this.opts.hasherOptions || {}
const { ignoredQueryFields = [], allowedHeaders = [] } = this.opts
const db = await this.db
const tapes = db.addCollection<FullSerializedRequest>('tapes', { disableMeta: true })

Expand All @@ -106,49 +121,19 @@ export class Recorder {
const query = {
..._query
}
const headers = {
..._headers
}
const headers = pick(_headers, ['x-cassette-id', ...allowedHeaders])

ignoredQueryFields.forEach(q => delete query[q])
ignoredHeaders.forEach(h => delete headers[h])

const lokiQuery = {
...request,
query,
headers
}

delete query.body
delete lokiQuery.body
delete lokiQuery.host

return tapes.where(obj => isMatch(obj, lokiQuery))[0]
}
}

export class DbMigrator {
data: Buffer[] = []
headers: IncomingHttpHeaders = {}
statusCode = 200
setHeader (name: string, value: string) {
this.headers[name] = value
}
write (input: Buffer | string) {
this.data.push(Buffer.isBuffer(input) ? input : Buffer.from(input))
}

end (data?: any) {
if (data != null) {
this.write(data)
}
debug('finished migration')
}

toSerializedResponse (): SerializedResponse {
return {
statusCode: this.statusCode,
headers: this.headers,
body: Buffer.concat(this.data).toString('base64'),
trailers: {}
}
}
}
10 changes: 5 additions & 5 deletions src/buffer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ describe('buffer', () => {
let str = new stream.PassThrough()

subject(str)
.then(function(body) {
expect(body).toEqual([Buffer.from('a'), Buffer.from('b'), Buffer.from('c')])
.then(function (body) {
expect(body).toEqual(Buffer.from('abc'))
done()
})
.catch(function(err) {
.catch(function (err) {
done(err)
})

Expand All @@ -28,10 +28,10 @@ describe('buffer', () => {
let str = new stream.PassThrough()

subject(str)
.then(function() {
.then(function () {
done(new Error('should have yielded an error'))
})
.catch(function(err) {
.catch(function (err) {
expect(err.message).toEqual('boom')
done()
})
Expand Down
19 changes: 7 additions & 12 deletions src/buffer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,15 @@ import { Readable } from 'stream'
* @param - stream
*/

export function buffer<T = any> (stream: Readable): Promise<T[]> {
return new Promise(function (resolve, reject) {
let data: T[] = []

stream.on('data', function (buf) {
data.push(buf)
})

stream.on('error', function (err) {
reject(err)
export function buffer (stream: Readable): Promise<Buffer> {
return new Promise((resolve, reject) => {
let body = Buffer.from('')
stream.on('data', function (data) {
body = Buffer.concat([body, data])
})

stream.on('end', function () {
resolve(data)
resolve(body)
})
stream.on('error', reject)
})
}
2 changes: 1 addition & 1 deletion src/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import Debug from 'debug'

const debug = Debug('yaktime:db')
let db: Loki
export async function getDB (opts: YakTimeOpts) {
export async function getDB (opts: YakTimeOpts): Promise<Loki> {
if (db == null) {
const dbPath = path.join(opts.dirname, 'tapes.json')
debug(`Opening db on ${dbPath}`)
Expand Down
91 changes: 0 additions & 91 deletions src/proxy.test.ts

This file was deleted.

Loading

0 comments on commit 00152ab

Please sign in to comment.