diff --git a/src/web-socket-handler.ts b/src/web-socket-handler.ts index 9b50d2ba5b..31d2332823 100644 --- a/src/web-socket-handler.ts +++ b/src/web-socket-handler.ts @@ -4,7 +4,13 @@ import stream = require('stream'); import { V1Status } from './api'; import { KubeConfig } from './config'; -const protocols = ['v4.channel.k8s.io', 'v3.channel.k8s.io', 'v2.channel.k8s.io', 'channel.k8s.io']; +const protocols = [ + 'v5.channel.k8s.io', + 'v4.channel.k8s.io', + 'v3.channel.k8s.io', + 'v2.channel.k8s.io', + 'channel.k8s.io', +]; export type TextHandler = (text: string) => boolean; export type BinaryHandler = (stream: number, buff: Buffer) => boolean; @@ -17,12 +23,37 @@ export interface WebSocketInterface { ): Promise; } +export interface StreamInterface { + stdin: stream.Readable; + stdout: stream.Writable; + stderr: stream.Writable; +} + export class WebSocketHandler implements WebSocketInterface { public static readonly StdinStream: number = 0; public static readonly StdoutStream: number = 1; public static readonly StderrStream: number = 2; public static readonly StatusStream: number = 3; public static readonly ResizeStream: number = 4; + public static readonly CloseStream: number = 255; + + public static supportsClose(protocol: string): boolean { + return protocol === 'v5.channel.k8s.io'; + } + + public static closeStream(streamNum: number, streams: StreamInterface): void { + switch (streamNum) { + case WebSocketHandler.StdinStream: + streams.stdin.pause(); + break; + case WebSocketHandler.StdoutStream: + streams.stdout.end(); + break; + case WebSocketHandler.StderrStream: + streams.stderr.end(); + break; + } + } public static handleStandardStreams( streamNum: number, @@ -39,6 +70,7 @@ export class WebSocketHandler implements WebSocketInterface { stderr.write(buff); } else if (streamNum === WebSocketHandler.StatusStream) { // stream closing. + // Hacky, change tests to use the stream interface if (stdout && stdout !== process.stdout) { stdout.end(); } @@ -69,6 +101,12 @@ export class WebSocketHandler implements WebSocketInterface { }); stdin.on('end', () => { + if (WebSocketHandler.supportsClose(ws.protocol)) { + const buff = Buffer.alloc(2); + buff.writeUint8(this.CloseStream, 0); + buff.writeUint8(this.StdinStream, 1); + ws.send(buff); + } ws.close(); }); // Keep the stream open @@ -141,7 +179,16 @@ export class WebSocketHandler implements WebSocketInterface { // factory is really just for test injection public constructor( readonly config: KubeConfig, - readonly socketFactory?: (uri: string, opts: WebSocket.ClientOptions) => WebSocket.WebSocket, + readonly socketFactory?: ( + uri: string, + protocols: string[], + opts: WebSocket.ClientOptions, + ) => WebSocket.WebSocket, + readonly streams: StreamInterface = { + stdin: process.stdin, + stdout: process.stdout, + stderr: process.stderr, + }, ) {} /** @@ -173,7 +220,7 @@ export class WebSocketHandler implements WebSocketInterface { return await new Promise((resolve, reject) => { const client = this.socketFactory - ? this.socketFactory(uri, opts) + ? this.socketFactory(uri, protocols, opts) : new WebSocket(uri, protocols, opts); let resolved = false; @@ -191,11 +238,17 @@ export class WebSocketHandler implements WebSocketInterface { client.onmessage = ({ data }: { data: WebSocket.Data }) => { // TODO: support ArrayBuffer and Buffer[] data types? if (typeof data === 'string') { + if (data.charCodeAt(0) === WebSocketHandler.CloseStream) { + WebSocketHandler.closeStream(data.charCodeAt(1), this.streams); + } if (textHandler && !textHandler(data)) { client.close(); } } else if (data instanceof Buffer) { - const streamNum = data.readInt8(0); + const streamNum = data.readUint8(0); + if (streamNum === WebSocketHandler.CloseStream) { + WebSocketHandler.closeStream(data.readInt8(1), this.streams); + } if (binaryHandler && !binaryHandler(streamNum, data.subarray(1))) { client.close(); } diff --git a/src/web-socket-handler_test.ts b/src/web-socket-handler_test.ts index f3113061ac..9827feba1a 100644 --- a/src/web-socket-handler_test.ts +++ b/src/web-socket-handler_test.ts @@ -2,6 +2,7 @@ import { promisify } from 'util'; import { expect } from 'chai'; import WebSocket = require('isomorphic-ws'); import { ReadableStreamBuffer, WritableStreamBuffer } from 'stream-buffers'; +import stream from 'node:stream'; import { V1Status } from './api'; import { KubeConfig } from './config'; @@ -119,7 +120,7 @@ describe('WebSocket', () => { const handler = new WebSocketHandler( kc, - (uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => { + (uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => { uriOut = uri; return mockWs as WebSocket.WebSocket; }, @@ -170,7 +171,7 @@ describe('WebSocket', () => { const handler = new WebSocketHandler( kc, - (uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => { + (uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => { uriOut = uri; return mockWs as WebSocket.WebSocket; }, @@ -239,7 +240,7 @@ describe('WebSocket', () => { const handler = new WebSocketHandler( kc, - (uri: string, opts: WebSocket.ClientOptions): WebSocket.WebSocket => { + (uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => { uriOut = uri; return mockWs as WebSocket.WebSocket; }, @@ -303,6 +304,107 @@ describe('WebSocket', () => { }); }); +describe('V5 protocol support', () => { + it('should handle close', async () => { + const kc = new KubeConfig(); + const host = 'foo.company.com'; + const server = `https://${host}`; + kc.clusters = [ + { + name: 'cluster', + server, + } as Cluster, + ] as Cluster[]; + kc.contexts = [ + { + cluster: 'cluster', + user: 'user', + } as Context, + ] as Context[]; + kc.users = [ + { + name: 'user', + } as User, + ]; + + const mockWs = { + protocol: 'v5.channel.k8s.io', + } as WebSocket.WebSocket; + let uriOut = ''; + let endCalled = false; + const handler = new WebSocketHandler( + kc, + (uri: string, protocols: string[], opts: WebSocket.ClientOptions): WebSocket.WebSocket => { + uriOut = uri; + return mockWs as WebSocket.WebSocket; + }, + { + stdin: process.stdin, + stderr: process.stderr, + stdout: { + end: () => { + endCalled = true; + }, + } as stream.Writable, + }, + ); + const path = '/some/path'; + + const promise = handler.connect(path, null, null); + await setImmediatePromise(); + + expect(uriOut).to.equal(`wss://${host}${path}`); + + const event = { + target: mockWs, + type: 'open', + }; + mockWs.onopen!(event); + const errEvt = { + error: {}, + message: 'some message', + type: 'some type', + target: mockWs, + }; + const closeBuff = Buffer.alloc(2); + closeBuff.writeUint8(255, 0); + closeBuff.writeUint8(WebSocketHandler.StdoutStream, 1); + + mockWs.onmessage!({ + data: closeBuff, + type: 'type', + target: mockWs, + }); + await promise; + expect(endCalled).to.be.true; + }); + it('should handle closing stdin < v4 protocol', () => { + const ws = { + // send is not defined, so this will throw if we try to send the close message. + close: () => {}, + } as WebSocket; + const stdinStream = new ReadableStreamBuffer(); + WebSocketHandler.handleStandardInput(ws, stdinStream); + stdinStream.emit('end'); + }); + it('should handle closing stdin v5 protocol', () => { + let sent: Buffer | null = null; + const ws = { + protocol: 'v5.channel.k8s.io', + send: (data) => { + sent = data; + }, + close: () => {}, + } as WebSocket; + const stdinStream = new ReadableStreamBuffer(); + WebSocketHandler.handleStandardInput(ws, stdinStream); + stdinStream.emit('end'); + expect(sent).to.not.be.null; + expect(sent!.readUint8(0)).to.equal(255); // CLOSE signal + expect(sent!.readUint8(1)).to.equal(0); // Stdin stream is #0 + }); +}); + describe('Restartable Handle Standard Input', () => { it('should throw on negative retry', () => { const p = new Promise(() => {});