Skip to content

Commit

Permalink
fixup.
Browse files Browse the repository at this point in the history
Signed-off-by: Akos Kitta <a.kitta@arduino.cc>
  • Loading branch information
Akos Kitta authored and kittaakos committed Jul 14, 2022
1 parent aea550f commit a003831
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 46 deletions.
124 changes: 79 additions & 45 deletions arduino-ide-extension/src/node/board-discovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import {
AvailablePorts,
AttachedBoardsChangeEvent,
} from '../common/protocol';
import { Emitter } from '@theia/core/lib/common/event';
import { Emitter, Event } from '@theia/core/lib/common/event';
import { DisposableCollection } from '@theia/core/lib/common/disposable';
import { Disposable } from '@theia/core/shared/vscode-languageserver-protocol';
import { ArduinoCoreServiceClient } from './cli-protocol/cc/arduino/cli/commands/v1/commands_grpc_pb';
import { v4 } from 'uuid';
import { ServiceError } from './service-error';
import { BackendApplicationContribution } from '@theia/core/lib/node';
import { Deferred } from '@theia/core/lib/common/promise-util';

type Duplex = ClientDuplexStream<BoardListWatchRequest, BoardListWatchResponse>;
interface StreamWrapper extends Disposable {
Expand All @@ -30,7 +31,8 @@ interface StreamWrapper extends Disposable {

/**
* Singleton service for tracking the available ports and board and broadcasting the
* changes to all connected frontend instances. \
* changes to all connected frontend instances.
*
* Unlike other services, this is not connection scoped.
*/
@injectable()
Expand All @@ -45,16 +47,16 @@ export class BoardDiscovery
@inject(NotificationServiceServer)
private readonly notificationService: NotificationServiceServer;

// Used to know if the board watch process is already running to avoid
// starting it multiple times
private watching: boolean;
private watching: Deferred<void> | undefined;
private stopping: Deferred<void> | undefined;
private wrapper: StreamWrapper | undefined;
private readonly onStreamDidEndEmitter = new Emitter<void>(); // sent from the CLI when the discovery process is killed for example after the indexes update and the core client re-initialization.
private readonly onStreamDidCancelEmitter = new Emitter<void>(); // when the watcher is canceled by the IDE2
private readonly toDisposeOnStopWatch = new DisposableCollection();

/**
* Keys are the `address` of the ports.
*
* The `protocol` is ignored because the board detach event does not carry the protocol information,
* just the address.
* ```json
Expand All @@ -64,46 +66,57 @@ export class BoardDiscovery
* }
* ```
*/
private _state: AvailablePorts = {};
get state(): AvailablePorts {
return this._state;
private _availablePorts: AvailablePorts = {};
get availablePorts(): AvailablePorts {
return this._availablePorts;
}

onStart(): void {
this.start();
this.onClientDidRefresh(() => this.start());
this.onClientDidRefresh(() => this.restart());
}

private async restart(): Promise<void> {
this.logger.info('restarting before stop');
await this.stop();
this.logger.info('restarting after stop');
return this.start();
}

onStop(): void {
this.stop();
}

stop(): Promise<void> {
async stop(restart = false): Promise<void> {
this.logger.info('stop');
if (this.stopping) {
this.logger.info('stop already stopping');
return this.stopping.promise;
}
if (!this.watching) {
return;
}
this.stopping = new Deferred();
this.logger.info('>>> Stopping boards watcher...');
return new Promise<void>((resolve, reject) => {
const timeout = this.createTimeout(
BoardDiscovery.StopWatchTimeout,
reject
);
const timeout = this.createTimeout(10_000, reject);
const toDispose = new DisposableCollection();
toDispose.pushAll([
timeout,
this.onStreamDidEndEmitter.event(() => {
this.logger.info(
`<<< Received the end event from the stream. Boards watcher has been successfully stopped.`
);
this.watching = false;
const waitForEvent = (event: Event<unknown>) =>
event(() => {
this.logger.info('stop received event: either end or cancel');
toDispose.dispose();
this.stopping?.resolve();
this.stopping = undefined;
this.logger.info('stop stopped');
resolve();
}),
this.onStreamDidCancelEmitter.event(() => {
this.logger.info(
`<<< Received the cancel event from the stream. Boards watcher has been successfully stopped.`
);
this.watching = false;
toDispose.dispose();
resolve();
}),
if (restart) {
this.start();
}
});
toDispose.pushAll([
timeout,
waitForEvent(this.onStreamDidEndEmitter.event),
waitForEvent(this.onStreamDidCancelEmitter.event),
]);
this.logger.info('Canceling boards watcher...');
this.toDisposeOnStopWatch.dispose();
Expand Down Expand Up @@ -149,9 +162,14 @@ export class BoardDiscovery
}
const stream = client
.boardListWatch()
.on('end', () => this.onStreamDidEndEmitter.fire())
.on('end', () => {
this.logger.info('received end');
this.onStreamDidEndEmitter.fire();
})
.on('error', (error) => {
this.logger.info('error received');
if (ServiceError.isCancel(error)) {
this.logger.info('cancel error received!');
this.onStreamDidCancelEmitter.fire();
} else {
this.logger.error(
Expand All @@ -165,13 +183,21 @@ export class BoardDiscovery
stream,
uuid: v4(),
dispose: () => {
this.logger.info('disposing requesting cancel');
// Cancelling the stream will kill the discovery `builtin:mdns-discovery process`.
// The client (this class) will receive a `{"eventType":"quit","error":""}` response from the CLI.
stream.cancel();
this.logger.info('disposing canceled');
this.wrapper = undefined;
},
};
this.toDisposeOnStopWatch.pushAll([wrapper]);
this.toDisposeOnStopWatch.pushAll([
wrapper,
Disposable.create(() => {
this.watching?.reject(new Error(`Stopping watcher.`));
this.watching = undefined;
}),
]);
return wrapper;
}

Expand All @@ -188,17 +214,25 @@ export class BoardDiscovery
}

async start(): Promise<void> {
this.logger.info('start');
if (this.stopping) {
this.logger.info('start is stopping wait');
await this.stopping.promise;
this.logger.info('start stopped');
}
if (this.watching) {
// We want to avoid starting the board list watch process multiple
// times to meet unforeseen consequences
return;
this.logger.info('start already watching');
return this.watching.promise;
}
this.watching = new Deferred();
this.logger.info('start new deferred');
const { client, instance } = await this.coreClient;
const wrapper = await this.createWrapper(client);
wrapper.stream.on('data', async (resp: BoardListWatchResponse) => {
this.logger.info('onData', this.toJson(resp));
if (resp.getEventType() === 'quit') {
await this.stop();
this.logger.info('quit received');
this.stop();
return;
}

Expand All @@ -217,8 +251,8 @@ export class BoardDiscovery
throw new Error(`Unexpected event type: '${resp.getEventType()}'`);
}

const oldState = deepClone(this._state);
const newState = deepClone(this._state);
const oldState = deepClone(this._availablePorts);
const newState = deepClone(this._availablePorts);

const address = (detectedPort as any).getPort().getAddress();
const protocol = (detectedPort as any).getPort().getProtocol();
Expand Down Expand Up @@ -286,18 +320,21 @@ export class BoardDiscovery
},
};

this._state = newState;
this._availablePorts = newState;
this.notificationService.notifyAttachedBoardsDidChange(event);
}
});
this.logger.info('start request start watch');
await this.requestStartWatch(
new BoardListWatchRequest().setInstance(instance),
wrapper.stream
);
this.watching = true;
this.logger.info('start requested start watch');
this.watching.resolve();
this.logger.info('start resolved watching');
}

getAttachedBoards(state: AvailablePorts = this.state): Board[] {
getAttachedBoards(state: AvailablePorts = this.availablePorts): Board[] {
const attachedBoards: Board[] = [];
for (const portID of Object.keys(state)) {
const [, boards] = state[portID];
Expand All @@ -306,7 +343,7 @@ export class BoardDiscovery
return attachedBoards;
}

getAvailablePorts(state: AvailablePorts = this.state): Port[] {
getAvailablePorts(state: AvailablePorts = this.availablePorts): Port[] {
const availablePorts: Port[] = [];
for (const portID of Object.keys(state)) {
const [port] = state[portID];
Expand All @@ -315,6 +352,3 @@ export class BoardDiscovery
return availablePorts;
}
}
export namespace BoardDiscovery {
export const StopWatchTimeout = 10_000;
}
2 changes: 1 addition & 1 deletion arduino-ide-extension/src/node/boards-service-impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export class BoardsServiceImpl
protected readonly boardDiscovery: BoardDiscovery;

async getState(): Promise<AvailablePorts> {
return this.boardDiscovery.state;
return this.boardDiscovery.availablePorts;
}

async getAttachedBoards(): Promise<Board[]> {
Expand Down

0 comments on commit a003831

Please sign in to comment.