Skip to content
This repository has been archived by the owner on Apr 4, 2023. It is now read-only.

Commit

Permalink
Add ability to track che tasks
Browse files Browse the repository at this point in the history
Signed-off-by: Roman Nikitenko <rnikiten@redhat.com>
  • Loading branch information
RomanNikitenko committed May 14, 2019
1 parent 18cd3d6 commit 6394f35
Show file tree
Hide file tree
Showing 16 changed files with 294 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ export class CheTaskClientImpl implements CheTaskClient {
private readonly onKillEventEmitter: Emitter<number>;
private taskInfoHandlers: ((id: number) => Promise<TaskInfo>)[] = [];
private runTaskHandlers: ((id: number, config: TaskConfiguration, ctx?: string) => Promise<void>)[] = [];
private taskExitedHandlers: ((id: number) => Promise<void>)[] = [];

constructor() {
this.onKillEventEmitter = new Emitter<number>();
}
Expand All @@ -30,14 +32,28 @@ export class CheTaskClientImpl implements CheTaskClient {

async getTaskInfo(id: number): Promise<TaskInfo | undefined> {
for (const taskInfoHandler of this.taskInfoHandlers) {
const taskInfo = await taskInfoHandler(id);
if (taskInfo) {
return taskInfo;
try {
const taskInfo = await taskInfoHandler(id);
if (taskInfo) {
return taskInfo;
}
} catch (e) {
// allow another handlers to handle request
}
}
return undefined;
}

async onTaskExited(id: number): Promise<void> {
for (const taskExitedHandler of this.taskExitedHandlers) {
try {
await taskExitedHandler(id);
} catch (e) {
// allow another handlers to handle request
}
}
}

get onKillEvent(): Event<number> {
return this.onKillEventEmitter.event;
}
Expand All @@ -46,11 +62,15 @@ export class CheTaskClientImpl implements CheTaskClient {
this.onKillEventEmitter.fire(id);
}

setTaskInfoHandler(handler: (id: number) => Promise<TaskInfo>) {
addTaskInfoHandler(handler: (id: number) => Promise<TaskInfo>) {
this.taskInfoHandlers.push(handler);
}

setRunTaskHandler(handler: (id: number, config: TaskConfiguration, ctx?: string) => Promise<void>) {
addRunTaskHandler(handler: (id: number, config: TaskConfiguration, ctx?: string) => Promise<void>) {
this.runTaskHandlers.push(handler);
}

addTaskExitedHandler(handler: (id: number) => Promise<void>) {
this.taskExitedHandlers.push(handler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import { CheTask, CheTaskMain, CheTaskService, CheTaskClient, PLUGIN_RPC_CONTEXT } from '../common/che-protocol';
import { RPCProtocol } from '@theia/plugin-ext/lib/api/rpc-protocol';
import { interfaces, injectable } from 'inversify';
import { TaskExitedEvent } from '@eclipse-che/plugin';

@injectable()
export class CheTaskMainImpl implements CheTaskMain {
Expand All @@ -20,8 +21,9 @@ export class CheTaskMainImpl implements CheTaskMain {
this.delegate = container.get(CheTaskService);
this.cheTaskClient = container.get(CheTaskClient);
this.cheTaskClient.onKillEvent(id => proxy.$killTask(id));
this.cheTaskClient.setTaskInfoHandler(id => proxy.$getTaskInfo(id));
this.cheTaskClient.setRunTaskHandler((id, config, ctx) => proxy.$runTask(id, config, ctx));
this.cheTaskClient.addTaskInfoHandler(id => proxy.$getTaskInfo(id));
this.cheTaskClient.addTaskExitedHandler(id => proxy.$onTaskExited(id));
this.cheTaskClient.addRunTaskHandler((id, config, ctx) => proxy.$runTask(id, config, ctx));
}
$registerTaskRunner(type: string): Promise<void> {
return this.delegate.registerTaskRunner(type);
Expand All @@ -30,4 +32,8 @@ export class CheTaskMainImpl implements CheTaskMain {
$disposeTaskRunner(type: string): Promise<void> {
return this.delegate.disposeTaskRunner(type);
}

$fireTaskExited(event: TaskExitedEvent): Promise<void> {
return this.delegate.fireTaskExited(event);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ export interface CheVariablesMain {

export interface CheTask {
registerTaskRunner(type: string, runner: che.TaskRunner): Promise<che.Disposable>;
fireTaskExited(taskId: number): Promise<void>;
fireTaskExited(event: che.TaskExitedEvent): Promise<void>;
$runTask(id: number, config: che.TaskConfiguration, ctx?: string): Promise<void>;
$onTaskExited(id: number): Promise<void>;
$killTask(id: number): Promise<void>;
$getTaskInfo(id: number): Promise<che.TaskInfo | undefined>;
}
Expand All @@ -85,6 +86,7 @@ export const CheTaskMain = Symbol('CheTaskMain');
export interface CheTaskMain {
$registerTaskRunner(type: string): Promise<void>;
$disposeTaskRunner(type: string): Promise<void>;
$fireTaskExited(event: che.TaskExitedEvent): Promise<void>;
}

export interface Variable {
Expand Down Expand Up @@ -400,14 +402,17 @@ export interface CheTaskService extends JsonRpcServer<CheTaskClient> {
registerTaskRunner(type: string): Promise<void>;
disposeTaskRunner(type: string): Promise<void>;
disconnectClient(client: CheTaskClient): void;
fireTaskExited(event: che.TaskExitedEvent): Promise<void>;
}

export const CheTaskClient = Symbol('CheTaskClient');
export interface CheTaskClient {
runTask(id: number, taskConfig: che.TaskConfiguration, ctx?: string): Promise<void>;
killTask(id: number): Promise<void>;
getTaskInfo(id: number): Promise<che.TaskInfo | undefined>;
setTaskInfoHandler(func: (id: number) => Promise<che.TaskInfo | undefined>): void;
setRunTaskHandler(func: (id: number, config: che.TaskConfiguration, ctx?: string) => Promise<void>): void;
onTaskExited(id: number): Promise<void>;
addTaskInfoHandler(func: (id: number) => Promise<che.TaskInfo | undefined>): void;
addRunTaskHandler(func: (id: number, config: che.TaskConfiguration, ctx?: string) => Promise<void>): void;
addTaskExitedHandler(func: (id: number) => Promise<void>): void;
onKillEvent: Event<number>
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import { injectable, interfaces } from 'inversify';
import { Task, TaskManager, TaskOptions, TaskRunnerRegistry } from '@theia/task/lib/node';
import { Disposable, ILogger } from '@theia/core';
import { TaskConfiguration, TaskInfo } from '@theia/task/lib/common/task-protocol';
import { TaskExitedEvent } from '@eclipse-che/plugin';

@injectable()
export class CheTaskServiceImpl implements CheTaskService {
private readonly runnerRegistry: TaskRunnerRegistry;
private readonly taskManager: TaskManager;
private readonly logger: ILogger;
private readonly disposableMap: Map<string, Disposable>;
private readonly cheTasks: CheTask[] = [];
private readonly clients: CheTaskClient[];
private taskId: number;
constructor(container: interfaces.Container) {
Expand All @@ -42,7 +44,9 @@ export class CheTaskServiceImpl implements CheTaskService {
for (const client of this.clients) {
await client.runTask(id, config, ctx);
}
return new CheTask(id, this.taskManager, this.logger, { label: config.label, config, context: ctx }, this.clients);
const cheTask = new CheTask(id, this.taskManager, this.logger, { label: config.label, config, context: ctx }, this.clients);
this.cheTasks.push(cheTask);
return cheTask;
};
}

Expand All @@ -67,6 +71,28 @@ export class CheTaskServiceImpl implements CheTaskService {
this.clients.splice(idx, 1);
}
}

async fireTaskExited(event: TaskExitedEvent): Promise<void> {
for (const task of this.cheTasks) {
try {
const runtimeInfo = await task.getRuntimeInfo();
if (runtimeInfo.execId === event.execId || runtimeInfo.taskId === event.taskId) {

task.fireTaskExited({ taskId: task.id, code: event.code, ctx: runtimeInfo.ctx });

task.onTaskExited();

const index = this.cheTasks.indexOf(task);
if (index > -1) {
this.cheTasks.splice(index, 1);
}
break;
}
} catch (e) {
// allow another handlers to handle request
}
}
}
}

class CheTask extends Task {
Expand All @@ -85,18 +111,44 @@ class CheTask extends Task {
for (const client of this.clients) {
const taskInfo = await client.getTaskInfo(this.taskId);
if (taskInfo) {
return {
taskId: this.taskId,
terminalId: taskInfo.terminalId,
ctx: taskInfo.ctx,
config: taskInfo.config
};
return this.toTaskInfo(taskInfo);
}
}
throw new Error('Information not found');
throw new Error(`Runtime Information for task ${this.options.label} is not found`);
}

async onTaskExited(): Promise<void> {
for (const client of this.clients) {
await client.onTaskExited(this.taskId);
}
}

async kill(): Promise<void> {
this.clients.forEach(client => client.killTask(this.taskId));
}

fireTaskExited(event: TaskExitedEvent): void {
super.fireTaskExited({ taskId: event.taskId, code: event.code, ctx: event.ctx });
}

private toTaskInfo(runtimeInfo: TaskInfo): TaskInfo {
const { taskId, terminalId, ctx, config, ...properties } = runtimeInfo;
const result = {
taskId: this.taskId,
terminalId,
ctx,
config
};

if (!properties) {
return result;
}

for (const key in properties) {
if (properties.hasOwnProperty(key)) {
result[key] = properties[key];
}
}
return result;
}
}
4 changes: 2 additions & 2 deletions extensions/eclipse-che-theia-plugin-ext/src/plugin/che-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ export function createAPIFactory(rpc: RPCProtocol): CheApiFactory {
registerTaskRunner(type: string, runner: che.TaskRunner): Promise<che.Disposable> {
return cheTaskImpl.registerTaskRunner(type, runner);
},
fireTaskExited(id: number): Promise<void> {
return cheTaskImpl.fireTaskExited(id);
fireTaskExited(event: che.TaskExitedEvent): Promise<void> {
return cheTaskImpl.fireTaskExited(event);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@
* SPDX-License-Identifier: EPL-2.0
**********************************************************************/
import { CheTask, CheTaskMain, PLUGIN_RPC_CONTEXT } from '../common/che-protocol';
import { TaskRunner, Disposable, Task, TaskInfo } from '@eclipse-che/plugin';
import { TaskRunner, Disposable, Task, TaskInfo, TaskExitedEvent, TaskConfiguration } from '@eclipse-che/plugin';
import { RPCProtocol } from '@theia/plugin-ext/lib/api/rpc-protocol';
import { TaskConfiguration } from '@theia/task/lib/common';

export class CheTaskImpl implements CheTask {
private readonly cheTaskMain: CheTaskMain;
Expand Down Expand Up @@ -54,15 +53,14 @@ export class CheTaskImpl implements CheTask {
}
}

async fireTaskExited(taskId: number): Promise<void> {
let id: number | undefined;
this.taskMap.forEach((value: Task, key: number) => {
if (value.getRuntimeInfo().taskId === taskId) {
id = key;
}
});
if (id) {
async $onTaskExited(id: number): Promise<void> {
const task = this.taskMap.get(id);
if (task) {
this.taskMap.delete(id);
}
}

async fireTaskExited(event: TaskExitedEvent): Promise<void> {
this.cheTaskMain.$fireTaskExited(event);
}
}
15 changes: 14 additions & 1 deletion extensions/eclipse-che-theia-plugin/src/che-proposed.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ declare module '@eclipse-che/plugin' {
export namespace task {
export function registerTaskRunner(type: string, runner: TaskRunner): Promise<Disposable>;
/** Needs to be executed when the task is finished */
export function fireTaskExited(id: number): Promise<void>;
export function fireTaskExited(event: TaskExitedEvent): Promise<void>;
}

/** A Task Runner knows how to run a Task of a particular type. */
Expand All @@ -132,6 +132,19 @@ declare module '@eclipse-che/plugin' {
readonly ctx?: string,
/** task config used for launching a task */
readonly config: TaskConfiguration
// tslint:disable-next-line:no-any
readonly [key: string]: any;
}

export interface TaskExitedEvent {
readonly taskId?: number;
readonly ctx?: string;

readonly code?: number;
readonly signal?: string;

// tslint:disable-next-line:no-any
readonly [key: string]: any;
}

export interface TaskConfiguration {
Expand Down
2 changes: 2 additions & 0 deletions plugins/task-plugin/src/che-task-backend-module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { ProjectPathVariableResolver } from './variable/project-path-variable-re
import { CheTaskRunner } from './task/che-task-runner';
import { ServerVariableResolver } from './variable/server-variable-resolver';
import { MachineExecClient } from './machine/machine-exec-client';
import { MachineExecWatcher } from './machine/machine-exec-watcher';
import { CheTerminalWidget, CheTerminalWidgetOptions, TerminalWidgetFactory } from './machine/terminal-widget';
import { CheTaskEventsHandler } from './preview/task-events-handler';
import { TasksPreviewManager } from './preview/tasks-preview-manager';
Expand All @@ -32,6 +33,7 @@ container.bind(CheTaskRunner).toSelf().inSingletonScope();
container.bind(MachinesPicker).toSelf().inSingletonScope();
container.bind(AttachTerminalClient).toSelf().inSingletonScope();
container.bind(MachineExecClient).toSelf().inSingletonScope();
container.bind(MachineExecWatcher).toSelf().inSingletonScope();
container.bind(ServerVariableResolver).toSelf().inSingletonScope();
container.bind(ProjectPathVariableResolver).toSelf().inSingletonScope();
container.bind(CheWorkspaceClient).toSelf().inSingletonScope();
Expand Down
14 changes: 13 additions & 1 deletion plugins/task-plugin/src/machine/attach-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import { injectable, inject } from 'inversify';
import { CheWorkspaceClient } from '../che-workspace-client';
import { ReconnectingWebSocket } from './websocket';
import { applySegmentsToUri } from '../uri-helper';
import { MachineExecWatcher } from './machine-exec-watcher';
import * as startPoint from '../task-plugin-backend';

const ATTACH_TERMINAL_SEGMENT: string = 'attach';

Expand All @@ -25,6 +27,9 @@ export class AttachTerminalClient {
@inject(CheWorkspaceClient)
protected readonly cheWorkspaceClient!: CheWorkspaceClient;

@inject(MachineExecWatcher)
protected readonly machineExecWatcher: MachineExecWatcher;

async connectTerminalProcess(terminalId: number, outputHandler: TerminalProcessOutputHandler): Promise<void> {
const termServerEndpoint = await this.cheWorkspaceClient.getMachineExecServerURL();

Expand All @@ -39,6 +44,13 @@ export class AttachTerminalClient {
webSocket.onError = (error: Error) => {
console.error('Websocket error:', error);
};
// TODO close webSocket when task is completed; event with runtime info is not implemented for plugin API at the moment

const disposable = this.machineExecWatcher.onExit(event => {
if (event.id === terminalId) {
webSocket.close();
disposable.dispose();
}
});
startPoint.getSubscriptions().push(disposable);
}
}
Loading

0 comments on commit 6394f35

Please sign in to comment.