Skip to content

Commit

Permalink
fix: Handle streaming not chunked request (#313)
Browse files Browse the repository at this point in the history
when body is received in same chunk as the rest of the request we don't handle it well.
this makes streaming agnostic and we are able to handle both chunked and not chunked request
  • Loading branch information
koncar authored Dec 31, 2024
1 parent 270545c commit 00bed87
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 11 deletions.
48 changes: 48 additions & 0 deletions src/__tests__/__snapshots__/stream-impersonator.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,32 @@ content-length: 11
hello world"
`;

exports[`StreamImpersonator does not remove crlf+whitespace from body 1`] = `
"GET / HTTP/1.1
accept: application/json
content-type: application/json
cache-control: max-age=0
content-length: 8
authorization: Bearer service-account-token
impersonate-user: johndoe
impersonate-group: dev
impersonate-group: ops
foo
bar"
`;

exports[`StreamImpersonator handles all body and headers splitted to separate chunks 1`] = `
"POST / HTTP/1.1
accept: application/json
content-type: application/json
content-length: 24
authorization: Bearer service-account-token
impersonate-user: johndoe
first chunk second chunk"
`;

exports[`StreamImpersonator handles body separator splitted to separate chunks 1`] = `
"GET / HTTP/1.1
accept: application/json
Expand All @@ -20,6 +46,28 @@ impersonate-user: johndoe
"
`;

exports[`StreamImpersonator handles headers and body in same stream chunk 1`] = `
"POST / HTTP/1.1
accept: application/json
content-type: application/json
content-length: 24
authorization: Bearer service-account-token
impersonate-user: johndoe
first chunk second chunk"
`;

exports[`StreamImpersonator handles headers in one chunk, body in another 1`] = `
"POST / HTTP/1.1
accept: application/json
content-type: application/json
content-length: 24
authorization: Bearer service-account-token
impersonate-user: johndoe
first chunk second chunk"
`;

exports[`StreamImpersonator handles http request pipelining 1`] = `
"GET /foo HTTP/1.1
accept: application/json
Expand Down
60 changes: 58 additions & 2 deletions src/__tests__/stream-impersonator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,10 @@ MwIDAQAB

stream.pipe(parser).pipe(destination);
stream.write(`GET / HTTP/1.1\r\nAccept: application/json\r\nContent-`);
stream.write(`Type: application/json\r\nAuthorization: Bearer ${token}\r\n trailing-data\r\nCache-Control: max-age=0\r\n\r\nfoo\r\n bar`);
stream.write(`Type: application/json\r\nAuthorization: Bearer ${token}\r\nCache-Control: max-age=0\r\nContent-Length: 8\r\n\r\nfoo\r\nbar`);

expect(destination.buffer.toString().includes(`\r\n\r\nfoo\r\n bar`));
expect(destination.buffer.toString()).toMatchSnapshot();
expect(destination.buffer.toString()).toContain(`foo\r\nbar`);
});

it ("handles newline splitted to separate chunks", async () => {
Expand All @@ -152,6 +153,61 @@ MwIDAQAB
expect(destination.buffer.toString()).toMatchSnapshot();
});

it ("handles all body and headers splitted to separate chunks", async () => {
parser.boredServer = boredServer;
parser.publicKey = jwtPublicKey;

const token = jwt.sign({
exp: Math.floor(Date.now() / 1000) + (60 * 60),
sub: "johndoe",
aud: [boredServer]
}, jwtPrivateKey, { algorithm: "RS256" });

stream.pipe(parser).pipe(destination);
stream.write("POST / HTTP/1.1\r\n");
stream.write("Accept: application/json\r\n");
stream.write(`Authorization: Bearer ${token}\r\n`);
stream.write("Content-Type: application/json\r\nContent-Length: 24\r\n\r\n");
stream.write("first chunk");
stream.write(" second chunk");
expect(destination.buffer.toString()).toMatchSnapshot();
expect(destination.buffer.toString()).toContain("first chunk second chunk");
});

it ("handles headers in one chunk, body in another", () => {
parser.boredServer = boredServer;
parser.publicKey = jwtPublicKey;

const token = jwt.sign({
exp: Math.floor(Date.now() / 1000) + (60 * 60),
sub: "johndoe",
aud: [boredServer]
}, jwtPrivateKey, { algorithm: "RS256" });

stream.pipe(parser).pipe(destination);
stream.write(`POST / HTTP/1.1\r\nAccept: application/json\r\nAuthorization: Bearer ${token}\r\nContent-Type: application/json\r\nContent-Length: 24\r\n\r\n`);
stream.write("first chunk");
stream.write(" second chunk");
expect(destination.buffer.toString()).toMatchSnapshot();
expect(destination.buffer.toString()).toContain("first chunk second chunk");
});

it ("handles headers and body in same stream chunk", async () => {
parser.boredServer = boredServer;
parser.publicKey = jwtPublicKey;

const token = jwt.sign({
exp: Math.floor(Date.now() / 1000) + (60 * 60),
sub: "johndoe",
aud: [boredServer]
}, jwtPrivateKey, { algorithm: "RS256" });

stream.pipe(parser).pipe(destination);
stream.write(`POST / HTTP/1.1\r\nAccept: application/json\r\nAuthorization: Bearer ${token}\r\nContent-Type: application/json\r\nContent-Length: 24\r\n\r\nfirst chunk second chunk`);
expect(destination.buffer.toString()).toMatchSnapshot();
expect(destination.buffer.toString()).toContain("first chunk second chunk");
});

it ("handles http request pipelining", async () => {
parser.boredServer = boredServer;
parser.publicKey = jwtPublicKey;
Expand Down
52 changes: 43 additions & 9 deletions src/stream-impersonator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ export class StreamImpersonator extends Transform {
private httpParser: HTTPParserJS;
private upgrade = false;
private getSaToken: GetSaToken;
private partialMessage: string = "";

constructor(getSaToken: GetSaToken) {
super();
Expand Down Expand Up @@ -72,8 +73,12 @@ export class StreamImpersonator extends Transform {
return 0;
};

this.httpParser.onBody = () => {
this.flushChunks();
this.httpParser.onBody = (
bodyChunk: Buffer,
start: number,
len: number,
) => {
this.chunks.push(bodyChunk.subarray(start, start + len));
};

this.httpParser.onMessageComplete = () => {
Expand All @@ -82,24 +87,53 @@ export class StreamImpersonator extends Transform {
}

private flushChunks() {
this.push(Buffer.concat(this.chunks));
this.chunks = [];
if (this.chunks.length > 0) {
this.push(Buffer.concat(this.chunks));
this.chunks = [];
}
}

_final(callback: TransformCallback): void {
this.flushChunks();
callback();
}

_transform(chunk: Buffer, encoding: BufferEncoding, callback: TransformCallback): void {
_transform(
chunk: Buffer,
encoding: BufferEncoding,
callback: TransformCallback,
): void {
const chunkStr = chunk.toString();

if (this.upgrade) {
this.push(chunk);
} else {
this.chunks.push(chunk);
this.httpParser.execute(chunk);

return callback();
}

this.partialMessage += chunkStr;

const handleError = (err: Error) => {
this.partialMessage = "";
logger.error("[IMPERSONATOR] Error parsing HTTP data: %s", String(err));
throw err;
};

try {
const bytesParsed = this.httpParser.execute(
Buffer.from(this.partialMessage),
);

if (bytesParsed instanceof Error) {
return handleError(bytesParsed);
}

this.partialMessage = this.partialMessage.slice(bytesParsed);
} catch (err) {
return handleError(err as Error);
}

return callback();
callback();
}

validateRequestHeaders(headers: Headers) {
Expand Down

0 comments on commit 00bed87

Please sign in to comment.