Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(error): Fix error passthrough in queued tasks #145

Merged
merged 3 commits into from
Dec 9, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions src/core/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -339,28 +339,30 @@ export class ZarrArray<StoreGetOptions = any> {
// create promise queue with concurrency control
const queue = new PQueue({ concurrency: concurrencyLimit });

const allTasks = [];

if (progressCallback) {

let progress = 0;
let queueSize = 0;
for (const _ of indexer.iter()) queueSize += 1;
progressCallback({ progress: 0, queueSize: queueSize });
for (const proj of indexer.iter()) {
(async () => {
allTasks.push((async () => {
await queue.add(() => this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes, storeOptions));
progress += 1;
progressCallback({ progress: progress, queueSize: queueSize });
})();
})());
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, could we just use the queue but remove the async IIFE?

queue.add(async () => {
  await this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes, storeOptions);
  progress += 1;
  progressCallback({ progress: progress, queueSize: queueSize });
});

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And then keep queue.onIdle?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to do this and follow the pattern in the rest of the changes, but it brought back the same problem in my use case (trying to cancel chunk requests). So maybe it's something about how PQueue.onIdle works...?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the last part of this discussion sindresorhus/p-queue#26 can inform the use of onIdle here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've looked into the implementation of p-queue while implementing this PR and it seems like the only way to correctly throw on the task error is to await the return of the queue.add method, see here
awaiting the queue.onIdle won't throw on error. So it seemed to me you have to create an Array of queue.add return promises and use await Promise.all on this Array.

Copy link
Collaborator

@manzt manzt Nov 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok! Thank you for looking into it and for your patience in my response. Would you mind still wrapping the full task in the queue.add and add and "await" to the progressCallback?

  const task = queue.add(async () => {
    await this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes, storeOptions);
    progress += 1;
    await progressCallback({ progress: progress, queueSize: queueSize });
  });

  allTasks.push(task);


// ... 

await promise.all(allTasks);

This way if progressCallback throws we will also catch the error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion, I've update the code to remove the iife and move the progress callback inside the queued task.

I didn't await for the progressCallback though since it is typed as returning void, I suggest this is done in a different PR?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great. Thank you!


} else {
for (const proj of indexer.iter()) {
queue.add(() => this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes, storeOptions));
allTasks.push(queue.add(async () => this.chunkGetItem(proj.chunkCoords, proj.chunkSelection, out, proj.outSelection, indexer.dropAxes, storeOptions)));
}
}

// guarantees that all work on queue has finished
await queue.onIdle();
// guarantees that all work on queue has finished and throws if any of the tasks errored.
await Promise.all(allTasks);

// Return scalar instead of zero-dimensional array.
if (out.shape.length === 0) {
Expand Down Expand Up @@ -594,6 +596,8 @@ export class ZarrArray<StoreGetOptions = any> {

const queue = new PQueue({ concurrency: concurrencyLimit });

const allTasks = [];

if (progressCallback) {

let queueSize = 0;
Expand All @@ -603,24 +607,24 @@ export class ZarrArray<StoreGetOptions = any> {
progressCallback({ progress: 0, queueSize: queueSize });
for (const proj of indexer.iter()) {
const chunkValue = this.getChunkValue(proj, indexer, value, selectionShape);
(async () => {
allTasks.push((async () => {
await queue.add(() => this.chunkSetItem(proj.chunkCoords, proj.chunkSelection, chunkValue));
progress += 1;
progressCallback({ progress: progress, queueSize: queueSize });
})();
})());
}

} else {

for (const proj of indexer.iter()) {
const chunkValue = this.getChunkValue(proj, indexer, value, selectionShape);
queue.add(() => this.chunkSetItem(proj.chunkCoords, proj.chunkSelection, chunkValue));
allTasks.push(queue.add(() => this.chunkSetItem(proj.chunkCoords, proj.chunkSelection, chunkValue)));
}

}

// guarantees that all work on queue has finished
await queue.onIdle();
// guarantees that all work on queue has finished and throws if any of the tasks errored.
await Promise.all(allTasks);
}

private async chunkSetItem(chunkCoords: number[], chunkSelection: DimensionSelection[], value: number | NestedArray<TypedArray>) {
Expand Down