Skip to content

Commit

Permalink
Add createServer (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
kylecarbs committed Feb 5, 2019
1 parent 2dced39 commit 7ca010a
Show file tree
Hide file tree
Showing 11 changed files with 1,671 additions and 50 deletions.
45 changes: 43 additions & 2 deletions packages/protocol/src/browser/client.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { ReadWriteConnection, InitData, OperatingSystem, ISharedProcessData } from "../common/connection";
import { NewEvalMessage, ServerMessage, EvalDoneMessage, EvalFailedMessage, TypedValue, ClientMessage, NewSessionMessage, TTYDimensions, SessionOutputMessage, CloseSessionInputMessage, WorkingInitMessage, NewConnectionMessage } from "../proto";
import { NewEvalMessage, ServerMessage, EvalDoneMessage, EvalFailedMessage, TypedValue, ClientMessage, NewSessionMessage, TTYDimensions, SessionOutputMessage, CloseSessionInputMessage, WorkingInitMessage, NewConnectionMessage, NewServerMessage } from "../proto";
import { Emitter, Event } from "@coder/events";
import { logger, field } from "@coder/logger";
import { ChildProcess, SpawnOptions, ServerProcess, ServerSocket, Socket } from "./command";
import { ChildProcess, SpawnOptions, ServerProcess, ServerSocket, Socket, ServerListener, Server } from "./command";

/**
* Client accepts an arbitrary connection intended to communicate with the Server.
Expand All @@ -18,6 +18,9 @@ export class Client {
private connectionId: number = 0;
private readonly connections: Map<number, ServerSocket> = new Map();

private serverId: number = 0;
private readonly servers: Map<number, ServerListener> = new Map();

private _initData: InitData | undefined;
private initDataEmitter = new Emitter<InitData>();
private initDataPromise: Promise<InitData>;
Expand Down Expand Up @@ -189,6 +192,14 @@ export class Client {
return socket;
}

public createServer(callback?: () => void): Server {
const id = this.serverId++;
const server = new ServerListener(this.connection, id, callback);
this.servers.set(id, server);

return server;
}

private doSpawn(command: string, args: string[] = [], options?: SpawnOptions, isFork: boolean = false, isBootstrapFork: boolean = true): ChildProcess {
const id = this.sessionId++;
const newSess = new NewSessionMessage();
Expand Down Expand Up @@ -333,6 +344,36 @@ export class Client {
this.sharedProcessActiveEmitter.emit({
socketPath: message.getSharedProcessActive()!.getSocketPath(),
});
} else if (message.hasServerEstablished()) {
const s = this.servers.get(message.getServerEstablished()!.getId());
if (!s) {
return;
}
s.emit("connect");
} else if (message.hasServerConnectionEstablished()) {
const s = this.servers.get(message.getServerConnectionEstablished()!.getServerId());
if (!s) {
return;
}
const conId = message.getServerConnectionEstablished()!.getConnectionId();
const serverSocket = new ServerSocket(this.connection, conId);
serverSocket.emit("connect");
this.connections.set(conId, serverSocket);
s.emit("connection", serverSocket);
} else if (message.getServerFailure()) {
const s = this.servers.get(message.getServerFailure()!.getId());
if (!s) {
return;
}
s.emit("error", new Error(message.getNewSessionFailure()!.getReason().toString()));
this.servers.delete(message.getNewSessionFailure()!.getId());
} else if (message.hasServerClose()) {
const s = this.servers.get(message.getServerClose()!.getId());
if (!s) {
return;
}
s.emit("close");
this.servers.delete(message.getServerClose()!.getId());
}
}
}
82 changes: 81 additions & 1 deletion packages/protocol/src/browser/command.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as events from "events";
import * as stream from "stream";
import { ReadWriteConnection } from "../common/connection";
import { ShutdownSessionMessage, ClientMessage, WriteToSessionMessage, ResizeSessionTTYMessage, TTYDimensions as ProtoTTYDimensions, ConnectionOutputMessage, ConnectionCloseMessage } from "../proto";
import { ShutdownSessionMessage, ClientMessage, WriteToSessionMessage, ResizeSessionTTYMessage, TTYDimensions as ProtoTTYDimensions, ConnectionOutputMessage, ConnectionCloseMessage, ServerCloseMessage, NewServerMessage } from "../proto";

export interface TTYDimensions {
readonly columns: number;
Expand Down Expand Up @@ -237,3 +237,83 @@ export class ServerSocket extends events.EventEmitter implements Socket {
throw new Error("Method not implemented.");
}
}

export interface Server {
addListener(event: "close", listener: () => void): this;
addListener(event: "connect", listener: (socket: Socket) => void): this;
addListener(event: "error", listener: (err: Error) => void): this;

on(event: "close", listener: () => void): this;
on(event: "connection", listener: (socket: Socket) => void): this;
on(event: "error", listener: (err: Error) => void): this;

once(event: "close", listener: () => void): this;
once(event: "connection", listener: (socket: Socket) => void): this;
once(event: "error", listener: (err: Error) => void): this;

removeListener(event: "close", listener: () => void): this;
removeListener(event: "connection", listener: (socket: Socket) => void): this;
removeListener(event: "error", listener: (err: Error) => void): this;

emit(event: "close"): boolean;
emit(event: "connection"): boolean;
emit(event: "error"): boolean;

listen(path: string, listeningListener?: () => void): this;
close(callback?: () => void): this;

readonly listening: boolean;
}

export class ServerListener extends events.EventEmitter implements Server {
private _listening: boolean = false;

public constructor(
private readonly connection: ReadWriteConnection,
private readonly id: number,
connectCallback?: () => void,
) {
super();

this.on("connect", () => {
this._listening = true;
if (connectCallback) {
connectCallback();
}
});
}

public get listening(): boolean {
return this._listening;
}

public listen(path: string, listener?: () => void): this {
const ns = new NewServerMessage();
ns.setId(this.id);
ns.setPath(path!);
const cm = new ClientMessage();
cm.setNewServer(ns);
this.connection.send(cm.serializeBinary());

if (typeof listener !== "undefined") {
this.once("connect", listener);
}

return this;
}

public close(callback?: Function | undefined): this {
const closeMsg = new ServerCloseMessage();
closeMsg.setId(this.id);
closeMsg.setReason("Manually closed");
const clientMsg = new ClientMessage();
clientMsg.setServerClose(closeMsg);
this.connection.send(clientMsg.serializeBinary());

if (callback) {
callback();
}

return this;
}
}
47 changes: 46 additions & 1 deletion packages/protocol/src/node/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import * as net from "net";
import * as nodePty from "node-pty";
import * as stream from "stream";
import { TextEncoder } from "text-encoding";
import { NewSessionMessage, ServerMessage, SessionDoneMessage, SessionOutputMessage, IdentifySessionMessage, NewConnectionMessage, ConnectionEstablishedMessage, NewConnectionFailureMessage, ConnectionCloseMessage, ConnectionOutputMessage } from "../proto";
import { NewSessionMessage, ServerMessage, SessionDoneMessage, SessionOutputMessage, IdentifySessionMessage, NewConnectionMessage, ConnectionEstablishedMessage, NewConnectionFailureMessage, ConnectionCloseMessage, ConnectionOutputMessage, NewServerMessage, ServerEstablishedMessage, NewServerFailureMessage, ServerCloseMessage, ServerConnectionEstablishedMessage } from "../proto";
import { SendableConnection } from "../common/connection";
import { ServerOptions } from "./server";

Expand Down Expand Up @@ -180,3 +180,48 @@ export const handleNewConnection = (connection: SendableConnection, newConnectio

return socket;
};

export const handleNewServer = (connection: SendableConnection, newServer: NewServerMessage, addSocket: (socket: net.Socket) => number, onExit: () => void): net.Server => {
const s = net.createServer();

try {
s.listen(newServer.getPath() ? newServer.getPath() : newServer.getPort(), () => {
const se = new ServerEstablishedMessage();
se.setId(newServer.getId());
const sm = new ServerMessage();
sm.setServerEstablished(se);
connection.send(sm.serializeBinary());
});
} catch (ex) {
const sf = new NewServerFailureMessage();
sf.setId(newServer.getId());
const sm = new ServerMessage();
sm.setServerFailure(sf);
connection.send(sm.serializeBinary());

onExit();
}

s.on("close", () => {
const sc = new ServerCloseMessage();
sc.setId(newServer.getId());
const sm = new ServerMessage();
sm.setServerClose(sc);
connection.send(sm.serializeBinary());

onExit();
});

s.on("connection", (socket) => {
const socketId = addSocket(socket);

const sock = new ServerConnectionEstablishedMessage();
sock.setServerId(newServer.getId());
sock.setConnectionId(socketId);
const sm = new ServerMessage();
sm.setServerConnectionEstablished(sock);
connection.send(sm.serializeBinary());
});

return s;
};
24 changes: 23 additions & 1 deletion packages/protocol/src/node/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { logger, field } from "@coder/logger";
import { ClientMessage, WorkingInitMessage, ServerMessage, NewSessionMessage, WriteToSessionMessage } from "../proto";
import { evaluate } from "./evaluate";
import { ReadWriteConnection } from "../common/connection";
import { Process, handleNewSession, handleNewConnection } from "./command";
import { Process, handleNewSession, handleNewConnection, handleNewServer } from "./command";
import * as net from "net";

export interface ServerOptions {
Expand All @@ -22,6 +22,9 @@ export class Server {

private readonly sessions: Map<number, Process> = new Map();
private readonly connections: Map<number, net.Socket> = new Map();
private readonly servers: Map<number, net.Server> = new Map();

private connectionId: number = Number.MAX_SAFE_INTEGER;

public constructor(
private readonly connection: ReadWriteConnection,
Expand Down Expand Up @@ -147,9 +150,28 @@ export class Server {
return;
}
c.end();
} else if (message.hasNewServer()) {
const s = handleNewServer(this.connection, message.getNewServer()!, (socket) => {
const id = this.connectionId--;
this.connections.set(id, socket);
return id;
}, () => {
this.connections.delete(message.getNewServer()!.getId());
});
this.servers.set(message.getNewServer()!.getId(), s);
} else if (message.hasServerClose()) {
const s = this.getServer(message.getServerClose()!.getId());
if (!s) {
return;
}
s.close();
}
}

private getServer(id: number): net.Server | undefined {
return this.servers.get(id);
}

private getConnection(id: number): net.Socket | undefined {
return this.connections.get(id);
}
Expand Down
16 changes: 11 additions & 5 deletions packages/protocol/src/proto/client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ message ClientMessage {
NewConnectionMessage new_connection = 6;
ConnectionOutputMessage connection_output = 7;
ConnectionCloseMessage connection_close = 8;
NewServerMessage new_server = 9;
ServerCloseMessage server_close = 10;

// node.proto
NewEvalMessage new_eval = 9;
NewEvalMessage new_eval = 11;
}
}

Expand All @@ -31,15 +33,19 @@ message ServerMessage {
ConnectionOutputMessage connection_output = 6;
ConnectionCloseMessage connection_close = 7;
ConnectionEstablishedMessage connection_established = 8;
NewServerFailureMessage server_failure = 9;
ServerEstablishedMessage server_established = 10;
ServerCloseMessage server_close = 11;
ServerConnectionEstablishedMessage server_connection_established = 12;

// node.proto
EvalFailedMessage eval_failed = 9;
EvalDoneMessage eval_done = 10;
EvalFailedMessage eval_failed = 13;
EvalDoneMessage eval_done = 14;

WorkingInitMessage init = 11;
WorkingInitMessage init = 15;

// vscode.proto
SharedProcessActiveMessage shared_process_active = 12;
SharedProcessActiveMessage shared_process_active = 16;
}
}

Expand Down
Loading

0 comments on commit 7ca010a

Please sign in to comment.