Skip to content

Commit

Permalink
GH-815: Exposed writable stream process API.
Browse files Browse the repository at this point in the history
Replaced `child_process` with `RawProcess` in LS contribution.

Signed-off-by: Akos Kitta <kittaakos@gmail.com>
  • Loading branch information
kittaakos committed Nov 17, 2017
1 parent 58aea94 commit 07eb061
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 38 deletions.
2 changes: 1 addition & 1 deletion packages/java/src/browser/java-resource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class JavaResourceResolver implements ResourceResolver {

resolve(uri: URI): JavaResource {
if (uri.scheme !== JAVA_SCHEME) {
throw new Error("The given uri is not a java uri: " + uri);
throw new Error("The given URI is not a valid Java uri: " + uri);
}
return new JavaResource(uri, this.clientContribution);
}
Expand Down
7 changes: 3 additions & 4 deletions packages/java/src/node/java-contribution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class JavaContribution extends BaseLanguageServerContribution {
const serverPath = path.resolve(__dirname, 'server');
const jarPaths = glob.sync('**/plugins/org.eclipse.equinox.launcher_*.jar', { cwd: serverPath });
if (jarPaths.length === 0) {
throw new Error('The java server launcher is not found.');
throw new Error('The Java server launcher is not found.');
}

const jarPath = path.resolve(serverPath, jarPaths[0]);
Expand Down Expand Up @@ -70,9 +70,8 @@ export class JavaContribution extends BaseLanguageServerContribution {
env.STDIN_PORT = inServer.address().port;
env.STDOUT_HOST = outServer.address().address;
env.STDOUT_PORT = outServer.address().port;
this.createProcessSocketConnection(inSocket, outSocket, command, args, {
env: env
}).then(serverConnection => this.forward(clientConnection, serverConnection));
this.createProcessSocketConnection(inSocket, outSocket, command, args, { env })
.then(serverConnection => this.forward(clientConnection, serverConnection));
});
}
}
3 changes: 2 additions & 1 deletion packages/languages/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"description": "Theia - Languages Extension",
"dependencies": {
"@theia/core": "^0.2.1",
"@theia/process": "^0.2.1",
"vscode-base-languageclient": "^0.0.1-alpha.3",
"vscode-languageserver": "^3.4.0"
},
Expand Down Expand Up @@ -46,4 +47,4 @@
"nyc": {
"extends": "../../configs/nyc.json"
}
}
}
35 changes: 19 additions & 16 deletions packages/languages/src/node/language-server-contribution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@

import * as net from 'net';
import * as cp from 'child_process';
import { injectable } from "inversify";
import { injectable, inject } from "inversify";
import { Message, isRequestMessage } from 'vscode-ws-jsonrpc';
import { InitializeParams, InitializeRequest } from 'vscode-languageserver-protocol';
import {
createProcessSocketConnection,
createProcessStreamConnection,
createStreamConnection,
forward,
IConnection
} from 'vscode-ws-jsonrpc/lib/server';
import { MaybePromise } from "@theia/core/lib/common";
import { LanguageContribution } from "../common";
import { RawProcess, RawProcessFactory } from '@theia/process/lib/node/raw-process';

export {
LanguageContribution, IConnection, Message
Expand All @@ -30,10 +31,14 @@ export interface LanguageServerContribution extends LanguageContribution {

@injectable()
export abstract class BaseLanguageServerContribution implements LanguageServerContribution {

abstract readonly id: string;
abstract readonly name: string;
abstract start(clientConnection: IConnection): void;

@inject(RawProcessFactory)
protected readonly processFactory: RawProcessFactory;

protected forward(clientConnection: IConnection, serverConnection: IConnection): void {
forward(clientConnection, serverConnection, this.map.bind(this));
}
Expand All @@ -48,27 +53,25 @@ export abstract class BaseLanguageServerContribution implements LanguageServerCo
return message;
}

protected createProcessSocketConnection(
outSocket: MaybePromise<net.Socket>, inSocket: MaybePromise<net.Socket>,
command: string, args?: string[], options?: cp.SpawnOptions
): Promise<IConnection> {
protected async createProcessSocketConnection(outSocket: MaybePromise<net.Socket>, inSocket: MaybePromise<net.Socket>,
command: string, args?: string[], options?: cp.SpawnOptions): Promise<IConnection> {

const process = this.spawnProcess(command, args, options);
return Promise.all([
Promise.resolve(outSocket),
Promise.resolve(inSocket)
]).then(result => createProcessSocketConnection(process, result[0], result[1]));
const [outSock, inSock] = await Promise.all([outSocket, inSocket]);
return createProcessSocketConnection(process.process, outSock, inSock);
}

protected createProcessStreamConnection(command: string, args?: string[], options?: cp.SpawnOptions): IConnection {
const process = this.spawnProcess(command, args, options);
return createProcessStreamConnection(process);
return createStreamConnection(process.output, process.input, () => process.kill());
}

protected spawnProcess(command: string, args?: string[], options?: cp.SpawnOptions): cp.ChildProcess {
const serverProcess = cp.spawn(command, args, options);
serverProcess.once('error', this.onDidFailSpawnProcess.bind(this));
serverProcess.stderr.on('data', this.logError.bind(this));
return serverProcess;
protected spawnProcess(command: string, args?: string[], options?: cp.SpawnOptions): RawProcess {
const rawProcess = this.processFactory({ command, args, options });
const { process } = rawProcess;
process.once('error', this.onDidFailSpawnProcess.bind(this));
process.stderr.on('data', this.logError.bind(this));
return rawProcess;
}

protected onDidFailSpawnProcess(error: Error): void {
Expand Down
23 changes: 15 additions & 8 deletions packages/process/src/node/process-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,26 @@
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*/
import { injectable } from 'inversify';
import { injectable, inject } from 'inversify';
import { Process } from './process';
import { Emitter, Event } from '@theia/core/lib/common';
import { ILogger } from '@theia/core/lib/common/logger';

@injectable()
export class ProcessManager {

protected readonly processes: Map<number, Process> = new Map();
protected id: number = 0;
protected readonly deleteEmitter = new Emitter<number>();
protected readonly processes: Map<number, Process>;
protected readonly deleteEmitter: Emitter<number>;

constructor( @inject(ILogger) protected logger: ILogger) {
this.processes = new Map();
this.deleteEmitter = new Emitter<number>();
}

register(process: Process): number {
const id = this.id;
this.processes.set(id, process);
this.id++;
return id;
this.processes.set(++this.id, process);
return this.id;
}

get(id: number): Process | undefined {
Expand All @@ -28,11 +32,14 @@ export class ProcessManager {

delete(process: Process): void {
process.kill();
this.processes.delete(process.id);
if (!this.processes.delete(process.id)) {
this.logger.warn(`The process was not registered via this manager. Anyway, we kill your process. PID: ${process.pid}.`);
}
this.deleteEmitter.fire(process.id);
}

get onDelete(): Event<number> {
return this.deleteEmitter.event;
}

}
3 changes: 2 additions & 1 deletion packages/process/src/node/process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import * as stream from 'stream';
import { injectable, inject, unmanaged } from "inversify";
import { injectable, inject } from "inversify";
import { ProcessManager } from './process-manager';
import { ILogger, Emitter, Event } from '@theia/core/lib/common';

Expand All @@ -27,6 +27,7 @@ export abstract class Process {
readonly exitEmitter: Emitter<IProcessExitEvent>;
readonly errorEmitter: Emitter<Error>;
abstract readonly pid: number;
abstract readonly input: stream.Writable;
abstract readonly output: stream.Readable;
protected _killed = false;

Expand Down
5 changes: 4 additions & 1 deletion packages/process/src/node/raw-process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ export interface RawProcessFactory {
@injectable()
export class RawProcess extends Process {

readonly input: stream.Writable;
readonly output: stream.Readable;
readonly errorOutput: stream.Readable;
protected process: child.ChildProcess;
// XXX: Do we have to make this public? How to attach additional listeners to the underlying process then?
readonly process: child.ChildProcess;

constructor(
@inject(RawProcessOptions) options: RawProcessOptions,
Expand All @@ -50,6 +52,7 @@ export class RawProcess extends Process {
this.process.on('exit', this.emitOnExit.bind(this));

this.output = this.process.stdout;
this.input = this.process.stdin;
this.errorOutput = this.process.stderr;
}

Expand Down
28 changes: 22 additions & 6 deletions packages/process/src/node/terminal-process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,37 @@ export const TerminalProcessFactory = Symbol("TerminalProcessFactory");
export type TerminalProcessFactory = (options: TerminalProcessOptions) => TerminalProcess;

/* Use this instead of the node-pty stream, since the node-pty stream is already resumed. */
export class TerminalReadableStream extends stream.Readable {
constructor(protected readonly terminal: any, opts?: any) {
class TerminalReadableStream extends stream.Readable {

constructor(protected readonly terminal: ITerminal, opts?: stream.ReadableOptions) {
super(opts);
this.terminal.on('data', (data: any) => {
this.push(data);
});
this.terminal.on('data', data => this.push(data));
}

/* This needs to be implemented as per node's API doc, even if it's empty. */
_read(size: number) {
}

}

class TerminalWritableStream extends stream.Writable {

constructor(protected readonly terminal: ITerminal) {
super({
write: (chunk, encoding, next) => {
this.terminal.write(chunk.toString());
next();
}
});
}

}

@injectable()
export class TerminalProcess extends Process {

readonly output: TerminalReadableStream;
readonly input: stream.Writable;
readonly output: stream.Readable;
protected readonly terminal: ITerminal;

constructor(
Expand All @@ -59,6 +74,7 @@ export class TerminalProcess extends Process {

this.terminal.on('exit', this.emitOnExit.bind(this));
this.output = new TerminalReadableStream(this.terminal);
this.input = new TerminalWritableStream(this.terminal);
}

get pid() {
Expand Down

0 comments on commit 07eb061

Please sign in to comment.