Skip to content

Commit

Permalink
fix: writing byte one at a time (#320)
Browse files Browse the repository at this point in the history
  • Loading branch information
panuhorsmalahti authored Jan 17, 2025
1 parent cc2412e commit 67961f2
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 10 deletions.
11 changes: 11 additions & 0 deletions src/__tests__/__snapshots__/stream-impersonator.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ impersonate-user: johndoe
BarÄö"
`;

exports[`StreamImpersonator handles binary data in body one byte at a time 1`] = `
"POST / HTTP/1.1
accept: application/json
content-type: application/octet-stream
content-length: 4
authorization: Bearer service-account-token
impersonate-user: johndoe
Äö"
`;

exports[`StreamImpersonator handles body separator splitted to separate chunks 1`] = `
"GET / HTTP/1.1
accept: application/json
Expand Down
56 changes: 56 additions & 0 deletions src/__tests__/stream-impersonator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,62 @@ MwIDAQAB
expect(destination.buffer.includes(Buffer.from([0x01, 0x02, 0x03, 0x42, 0x61, 0x72, 0xC3, 0x84, 0xC3, 0xB6]))).toBe(true);
});

it("handles parse errors", () => {
parser.boredServer = boredServer;
parser.publicKey = jwtPublicKey;

stream.pipe(parser).pipe(destination);

// Writing the headers as text
stream.write("POST ");
stream.write("/");
stream.write(" HTTP/1.1");

expect(() => {
stream.write("\r\r\n\n");
}).toThrowError("Parse Error");
});

it("handles binary data in body one byte at a time", async () => {
parser.boredServer = boredServer;
parser.publicKey = jwtPublicKey;

const token = jwt.sign(
{
exp: Math.floor(Date.now() / 1000) + 60 * 60,
sub: "johndoe",
aud: [boredServer],
},
jwtPrivateKey,
{ algorithm: "RS256" }
);

stream.pipe(parser).pipe(destination);

// Writing the headers as text
stream.write("POST ");
stream.write("/");
stream.write(" HTTP/1.1");
stream.write("\r\n");
stream.write("Accept: application/json\r\n");
stream.write(`Authorization: Bearer ${token}\r\n`);
stream.write("Content-Type: application/octet-stream\r\nContent-Length: 4\r\n\r\n");

// Writing binary data as four Buffer chunks
// UTF8: Äö
stream.write(Buffer.from([0xC3]));
await delay(1);
stream.write(Buffer.from([0x84]));
await delay(1);
stream.write(Buffer.from([0xC3]));
await delay(1);
stream.write(Buffer.from([0xB6]));

expect(destination.buffer.toString()).toMatchSnapshot();
expect(destination.buffer.toString()).toContain("Äö");
expect(destination.buffer.includes(Buffer.from([0xC3, 0x84, 0xC3, 0xB6]))).toBe(true);
});

it ("handles headers in one chunk, body in another", () => {
parser.boredServer = boredServer;
parser.publicKey = jwtPublicKey;
Expand Down
114 changes: 114 additions & 0 deletions src/__tests__/stream-utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import { removeBytesFromBuffersHead } from "../stream-utils";

describe("stream-utils", () => {
describe("removeBytesFromBuffersHead", () => {
describe("no buffers", () => {
describe("zero bytes to remove", () => {
it("removes nothing", () => {
expect(removeBytesFromBuffersHead([], 0)).toEqual([]);
});
});

describe("one byte to remove", () => {
it("removes nothing", () => {
expect(removeBytesFromBuffersHead([], 0)).toEqual([]);
});
});

describe("minutes 1 byte to remove", () => {
it("removes nothing", () => {
expect(removeBytesFromBuffersHead([], -1)).toEqual([]);
});
});
});

describe("one buffer", () => {
describe("has one byte", () => {
it("is removed completely", () => {
const buffer = Buffer.from([0x01]);

expect(removeBytesFromBuffersHead([buffer], 1)).toEqual([]);
});
});

describe("has one byte and removes 2", () => {
it("is removed completely", () => {
const buffer = Buffer.from([0x01]);

expect(removeBytesFromBuffersHead([buffer], 2)).toEqual([]);
});
});

describe("has empty buffer", () => {
it("is removed completely", () => {
const buffer = Buffer.from([]);

expect(removeBytesFromBuffersHead([buffer], 1)).toEqual([]);
});
});

describe("bytes to remove matches last buffer length", () => {
it("is removed completely", () => {
const buffer = Buffer.from([0x01, 0x02, 0x03]);

expect(removeBytesFromBuffersHead([buffer], 3)).toEqual([]);
});
});

describe("bytes to remove doesn't match last buffer length", () => {
it("is removed partially", () => {
const buffer = Buffer.from([0x01, 0x02, 0x03]);

expect(removeBytesFromBuffersHead([buffer], 2)).toEqual([Buffer.from([0x03])]);
});
});
});

describe("many buffers", () => {
describe("bytes to remove matches last buffer length", () => {
it("first buffer is removed completely", () => {
const buffer1 = Buffer.from([0x01, 0x02, 0x03]);
const buffer2 = Buffer.from([0x04, 0x05, 0x06]);

expect(removeBytesFromBuffersHead([buffer1, buffer2], 3)).toEqual([buffer2]);
});
});

describe("bytes to remove is less than last buffer length", () => {
it("is removed partially", () => {
const buffer1 = Buffer.from([0x01, 0x02, 0x03]);
const buffer2 = Buffer.from([0x04, 0x05, 0x06]);

expect(removeBytesFromBuffersHead([buffer1, buffer2], 2)).toEqual([ Buffer.from([0x03]), buffer2]);
});
});

describe("bytes to remove is more than last buffer length", () => {
it("is removed partially", () => {
const buffer1 = Buffer.from([0x01, 0x02, 0x03]);
const buffer2 = Buffer.from([0x04, 0x05, 0x06]);

expect(removeBytesFromBuffersHead([buffer1, buffer2], 4)).toEqual([Buffer.from([0x05, 0x06])]);
});
});

describe("bytes to remove is equal to sum of all buffer lengths", () => {
it("is removed completely", () => {
const buffer1 = Buffer.from([0x01, 0x02, 0x03]);
const buffer2 = Buffer.from([0x04, 0x05, 0x06]);

expect(removeBytesFromBuffersHead([buffer1, buffer2], 6)).toEqual([]);
});
});

describe("bytes to remove is more than the sum of all buffer lengths", () => {
it("is removed completely", () => {
const buffer1 = Buffer.from([0x01, 0x02, 0x03]);
const buffer2 = Buffer.from([0x04, 0x05, 0x06]);

expect(removeBytesFromBuffersHead([buffer1, buffer2], 9999)).toEqual([]);
});
});
});
});
});
21 changes: 11 additions & 10 deletions src/stream-impersonator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { HTTPParser, HTTPParserJS } from "http-parser-js";
import { chunk } from "lodash";
import * as jwt from "jsonwebtoken";
import logger from "./logger";
import { removeBytesFromBuffersHead } from "./stream-utils";

type TokenPayload = {
exp: number;
Expand All @@ -23,7 +24,10 @@ export class StreamImpersonator extends Transform {
private httpParser: HTTPParserJS;
private upgrade = false;
private getSaToken: GetSaToken;
private partialMessage: string = "";

// If the a chunk can't be parsed fully, unparsed bytes are saved to be passed
// when we receive the next chunk
private partialMessage: Buffer[] = [];

constructor(getSaToken: GetSaToken) {
super();
Expand Down Expand Up @@ -100,35 +104,32 @@ export class StreamImpersonator extends Transform {

_transform(
chunk: Buffer,
encoding: BufferEncoding,
_encoding: BufferEncoding,
callback: TransformCallback,
): void {
const chunkStr = chunk.toString();

if (this.upgrade) {
this.push(chunk);

return callback();
}

this.partialMessage += chunkStr;
this.partialMessage.push(chunk);

const handleError = (err: Error) => {
this.partialMessage = "";
this.partialMessage = [];
logger.error("[IMPERSONATOR] Error parsing HTTP data: %s", String(err));
throw err;
};

try {
const bytesParsed = this.httpParser.execute(
Buffer.from(this.partialMessage),
);
const bufferToParse = Buffer.concat(this.partialMessage);
const bytesParsed = this.httpParser.execute(bufferToParse);

if (bytesParsed instanceof Error) {
return handleError(bytesParsed);
}

this.partialMessage = this.partialMessage.slice(bytesParsed);
this.partialMessage = removeBytesFromBuffersHead(this.partialMessage, bytesParsed);
} catch (err) {
return handleError(err as Error);
}
Expand Down
27 changes: 27 additions & 0 deletions src/stream-utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Removes bytes from the beginning of the buffers array.
* If bytes to be removed is less than the buffer length, the bytes are removed from the start
* of the buffer.
* @param buffers Array of buffers to reduce
* @param bytes Number of bytes to remove.
* @returns
*/
export const removeBytesFromBuffersHead = (buffers: Buffer[], bytes: number) => {
let bytesToRemove = bytes;

while (bytesToRemove > 0 && buffers.length > 0) {
const firstBuffer = buffers[0];

if (bytesToRemove < firstBuffer.length) {
// Remove 'bytesToRemove' from the start of the last buffer
buffers[0] = firstBuffer.subarray(bytesToRemove);

bytesToRemove = 0;
} else {
buffers.shift();
bytesToRemove -= firstBuffer.length;
}
}

return buffers;
};

0 comments on commit 67961f2

Please sign in to comment.