Skip to content

Commit

Permalink
fix: chunked data (#329)
Browse files Browse the repository at this point in the history
  • Loading branch information
panuhorsmalahti authored Jan 22, 2025
1 parent f9a213c commit 67f1e0a
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 28 deletions.
32 changes: 32 additions & 0 deletions src/__tests__/__snapshots__/stream-impersonator.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,35 @@ impersonate-user: johndoe
"
`;

exports[`StreamImpersonator transfer encoding chunked 1`] = `
"GET /version HTTP/1.1
host: 127.0.0.1:53364
user-agent: node-fetch
accept: */*
accept-encoding: gzip, deflate, br
x-forwarded-for: 127.0.0.1
authorization: Bearer service-account-token
impersonate-user: johndoe
impersonate-group: dev
impersonate-group: ops
POST /apis/authorization.k8s.io/v1/selfsubjectaccessreviews HTTP/1.1
host: 127.0.0.1:53364
user-agent: node-fetch
transfer-encoding: chunked
accept: */*
accept-encoding: gzip, deflate, br
content-type: application/json
x-forwarded-for: 127.0.0.1
authorization: Bearer service-account-token
impersonate-user: johndoe
impersonate-group: dev
impersonate-group: ops
b0
{"spec":{"resourceAttributes":{"namespace":"kube-system","resource":"*","verb":"create"}},"kind":"SelfSubjectAccessReview","apiVersion":"authorization.k8s.io/v1","metadata":{}}
0
"
`;
43 changes: 40 additions & 3 deletions src/__tests__/stream-impersonator.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ MwIDAQAB
expect(destination.buffer.toString().includes(`\r\nAuthorization: Bearer ${token} trailing-data\r\n`));
});

it ("does not remove crlf+whitespace from body", async () => {
it("does not remove crlf+whitespace from body", async () => {
parser.boredServer = boredServer;
parser.publicKey = jwtPublicKey;

Expand All @@ -141,6 +141,43 @@ MwIDAQAB
expect(destination.buffer.toString()).toContain(`foo\r\nbar`);
});

it("transfer encoding chunked", () => {
parser.boredServer = boredServer;
parser.publicKey = jwtPublicKey;

const token = jwt.sign({
exp: Math.floor(Date.now() / 1000) + (60 * 60),
sub: "johndoe",
groups: ["dev", "ops"],
aud: [boredServer]
}, jwtPrivateKey, { algorithm: "RS256" });

stream.pipe(parser).pipe(destination);
stream.write(`GET /version HTTP/1.1\r\n`);
stream.write(`Host: 127.0.0.1:53364\r\n`);
stream.write(`User-Agent: node-fetch\r\n`);
stream.write(`Accept: */*\r\n`);
stream.write(`Accept-Encoding: gzip, deflate, br\r\n`);
stream.write(`Authorization: Bearer ${token}\r\n`);
stream.write(`X-Forwarded-For: 127.0.0.1\r\n\r\n`);
stream.write(`POST /apis/authorization.k8s.io/v1/selfsubjectaccessreviews HTTP/1.1\r\n`);
stream.write(`Host: 127.0.0.1:53364\r\n`);
stream.write(`User-Agent: node-fetch\r\n`);
stream.write(`Transfer-Encoding: chunked\r\n`);
stream.write(`Accept: */*\r\n`);
stream.write(`Accept-Encoding: gzip, deflate, br\r\n`);
stream.write(`Authorization: Bearer ${token}\r\n`);
stream.write(`Content-Type: application/json\r\n`);
stream.write(`X-Forwarded-For: 127.0.0.1\r\n`);
stream.write(`\r\n`);
stream.write(`b0\r\n`);
stream.write(`{"spec":{"resourceAttributes":{"namespace":"kube-system","resource":"*","verb":"create"}},"kind":"SelfSubjectAccessReview","apiVersion":"authorization.k8s.io/v1","metadata":{}}\r\n`);
stream.write(`0\r\n`);
stream.write(`\r\n`);

expect(destination.buffer.toString()).toMatchSnapshot();
});

it ("handles newline splitted to separate chunks", async () => {
parser.boredServer = boredServer;
parser.publicKey = jwtPublicKey;
Expand Down Expand Up @@ -225,7 +262,7 @@ MwIDAQAB
stream.write(" HTTP/1.1");

expect(() => {
stream.write("\r\r\n\n");
stream.write("\r\n\r\n\r\r\n\n");
}).toThrowError("Parse Error");
});

Expand Down Expand Up @@ -287,7 +324,7 @@ MwIDAQAB
expect(destination.buffer.toString()).toContain("first chunk second chunk");
});

it ("handles headers and body in same stream chunk", async () => {
it("handles headers and body in same stream chunk", async () => {
parser.boredServer = boredServer;
parser.publicKey = jwtPublicKey;

Expand Down
75 changes: 50 additions & 25 deletions src/stream-impersonator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ type Headers = Array<Array<string>>;

type GetSaToken = () => string;

const endOfHeadersMarker = `\r\n\r\n`;

export class StreamImpersonator extends Transform {
public boredServer = "";
public publicKey = "";

private headersReceived = false;
private chunks: Buffer[] = [];
private httpParser: HTTPParserJS;
private upgrade = false;
Expand Down Expand Up @@ -82,26 +85,17 @@ export class StreamImpersonator extends Transform {
return 0;
};

this.httpParser.onBody = (
bodyChunk: Buffer,
start: number,
len: number,
) => {
logger.trace("onBody");
this.chunks.push(bodyChunk.subarray(start, start + len));
};

this.httpParser.onMessageComplete = () => {
logger.trace("onMessageComplete");
this.flushChunks();
this.headersReceived = false;
};
}

private flushChunks() {
logger.trace("flushChunks");

if (this.chunks.length > 0) {
logger.trace("flushChunks -> writing chunks");
this.push(Buffer.concat(this.chunks));
this.chunks = [];
}
Expand All @@ -112,6 +106,30 @@ export class StreamImpersonator extends Transform {
callback();
}

private handleError(error: Error) {
this.headersReceived = false;
this.partialMessage = [];
this.chunks = [];
logger.error("[IMPERSONATOR] Error parsing HTTP data: %s", String(error));

throw error;
};

private executeParser(bufferToParse: Buffer) {
try {
const bytesParsed = this.httpParser.execute(bufferToParse);

if (bytesParsed instanceof Error) {
return this.handleError(bytesParsed);
}

// Remove parsed bytes from the partialMessage buffers
this.partialMessage = removeBytesFromBuffersHead(this.partialMessage, bytesParsed);
} catch (error) {
this.handleError(error as Error);
}
}

_transform(
chunk: Buffer,
_encoding: BufferEncoding,
Expand All @@ -122,33 +140,40 @@ export class StreamImpersonator extends Transform {
}

if (this.upgrade) {
logger.trace("upgrade in _transform");
this.push(chunk);

return callback();
}

this.chunks.push(chunk);
this.partialMessage.push(chunk);

const handleError = (err: Error) => {
this.partialMessage = [];
logger.error("[IMPERSONATOR] Error parsing HTTP data: %s", String(err));
throw err;
};
const receivedSoFar = Buffer.concat(this.partialMessage);
const headerEndIndex = receivedSoFar.indexOf(endOfHeadersMarker);

try {
const bufferToParse = Buffer.concat(this.partialMessage);
const bytesParsed = this.httpParser.execute(bufferToParse);
// Wait for more data if headers are incomplete and not received yet
if (headerEndIndex === -1 && !this.headersReceived) {
return callback();
}

if (bytesParsed instanceof Error) {
return handleError(bytesParsed);
}
// Parse headers if not parsed yet
if (headerEndIndex !== -1 && !this.headersReceived) {
this.headersReceived = true;

this.partialMessage = removeBytesFromBuffersHead(this.partialMessage, bytesParsed);
} catch (err) {
return handleError(err as Error);
// Extract headers
// +endOfHeadersMarker.length to include the \r\n\r\n in the header
const bufferToParse = Buffer.concat(this.partialMessage).subarray(0, headerEndIndex + endOfHeadersMarker.length);

this.executeParser(bufferToParse);

// onHeadersComplete sets this.chunks to [], we set the rest of the bytes after the header back
this.chunks = [Buffer.concat(this.partialMessage)];
}

const bufferToParse = Buffer.concat(this.partialMessage);

this.executeParser(bufferToParse);

callback();
}

Expand Down

0 comments on commit 67f1e0a

Please sign in to comment.