Skip to content

Commit

Permalink
reorganiser code
Browse files Browse the repository at this point in the history
  • Loading branch information
Florent-Bouisset committed Jun 14, 2024
1 parent fd5238a commit 7c18b67
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 35 deletions.
2 changes: 1 addition & 1 deletion src/core/main/worker/worker_main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,6 @@ function sendSegmentSinksStoreInfos(contentPreparer: ContentPreparer, messageId:
type: WorkerMessageType.SegmentSinkStoreUpdate,
contentId: currentContent.contentId,
value: { segmentSinkMetrics: segmentSinksMetrics,
messageId: messageId }
messageId }
});
}
2 changes: 1 addition & 1 deletion src/main_thread/init/media_source_content_initializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ export default class MediaSourceContentInitializer extends ContentInitializer {
stopListening();
this.trigger("loaded", { segmentSinksStore,
getSegmentSinkMetrics: async () => {
return segmentSinksStore.getSegmentSinksMetrics();
return new Promise((resolve) => resolve(segmentSinksStore.getSegmentSinksMetrics()));
}
});
}
Expand Down
61 changes: 28 additions & 33 deletions src/main_thread/init/multi_thread_content_initializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ export default class MultiThreadContentInitializer extends ContentInitializer {
*/
private _currentMediaSourceCanceller: TaskCanceller;


/**
* Stores the resolvers and the current messageId that is sent to the web worker to receive segment sink metrics.
* The purpose of collecting metrics is for monitoring and debugging.
*/
private _segmentMetrics: { messageId: number, resolvers: Record<number, (value: SegmentSinkMetrics | undefined) => void> }
/**
* Create a new `MultiThreadContentInitializer`, associated to the given
* settings.
Expand All @@ -105,6 +109,10 @@ export default class MultiThreadContentInitializer extends ContentInitializer {
this._currentMediaSourceCanceller = new TaskCanceller();
this._currentMediaSourceCanceller.linkToSignal(this._initCanceller.signal);
this._currentContentInfo = null;
this._segmentMetrics = {
messageId: 0,
resolvers: {}
}
}

/**
Expand Down Expand Up @@ -150,6 +158,12 @@ export default class MultiThreadContentInitializer extends ContentInitializer {
value: null,
});
});
this._initCanceller.signal.register(() => {
this._segmentMetrics = {
messageId: 0,
resolvers: {}
}
})
if (this._initCanceller.isUsed()) {
return;
}
Expand Down Expand Up @@ -1073,9 +1087,14 @@ export default class MultiThreadContentInitializer extends ContentInitializer {
// Should already be handled by the API
break;

case WorkerMessageType.SegmentSinkStoreUpdate:
// this._onSegmentSinkStoreUpdate(msgData.value.segmentSinkMetrics);
case WorkerMessageType.SegmentSinkStoreUpdate: {
if (this._currentContentInfo?.contentId !== msgData.contentId) {
return;
}
this._segmentMetrics.resolvers[msgData.value.messageId](msgData.value.segmentSinkMetrics);
delete this._segmentMetrics.resolvers[msgData.value.messageId];
break;
}
default:
assertUnreachable(msgData);
}
Expand Down Expand Up @@ -1459,40 +1478,16 @@ export default class MultiThreadContentInitializer extends ContentInitializer {
{ clearSignal: cancelSignal, emitCurrentValue: true },
);

let __messageId = 0
type ResolveFn = (value: SegmentSinkMetrics | undefined) => void;
let resolvers: Record<number,ResolveFn> = {};
const getSegmentSinkMetrics: () => Promise<SegmentSinkMetrics | undefined> = async () => {
__messageId++;
console.log("DEBUG METRICS: sending pull with msg ID", __messageId)
const _getSegmentSinkMetrics: () => Promise<SegmentSinkMetrics | undefined> = async () => {
const messageId = ++this._segmentMetrics.messageId;
sendMessage(this._settings.worker, {
type: MainThreadMessageType.PullSegmentSinkStoreInfos,
value: { messageId: __messageId},
value: { messageId},
});
return new Promise((resolve, reject) => {
resolvers[__messageId] = resolve;
return new Promise((resolve) => {
this._segmentMetrics.resolvers[messageId] = resolve;
})
}

const onMessageHandler = (message: MessageEvent) => {
const msgData = message.data as unknown as IWorkerMessage;
if(msgData.type === WorkerMessageType.SegmentSinkStoreUpdate) {
console.log("DEBUG METRICS: resolving pull with msg ID", msgData.value.messageId)
resolvers[msgData.value.messageId](msgData.value.segmentSinkMetrics);
delete resolvers[msgData.value.messageId];
}
}

this._settings.worker.addEventListener("message", onMessageHandler)

cancelSignal.register(() => {
resolvers = {};
__messageId = 0;
this._settings.worker.removeEventListener("message", onMessageHandler)
})



/**
* Emit a "loaded" events once the initial play has been performed and the
* media can begin playback.
Expand All @@ -1506,7 +1501,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer {
stopListening();
this.trigger("loaded", {
segmentSinksStore: null,
getSegmentSinkMetrics,
getSegmentSinkMetrics: _getSegmentSinkMetrics,
});
}
},
Expand Down

0 comments on commit 7c18b67

Please sign in to comment.