Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: destroy pipeline streams when returned stream errors #2437

Merged
merged 7 commits into from
Apr 15, 2024
Merged
10 changes: 10 additions & 0 deletions src/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2032,6 +2032,16 @@ class File extends ServiceObject<File, FileMetadata> {
emitStream.write(chunk, encoding, cb);
},
});
// If the write stream, which is returned to the caller, catches an error we need to make sure that
// at least one of the streams in the pipeline below gets notified so that they
// all get cleaned up / destroyed.
writeStream.once('error', e => {
emitStream.destroy(e);
});
// If the write stream is closed, cleanup the pipeline below by calling destroy on one of the streams.
writeStream.once('close', () => {
emitStream.destroy();
});

const transformStreams: Transform[] = [];

Expand Down
5 changes: 2 additions & 3 deletions src/nodejs-common/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,8 @@ export class Service {
};

if (reqOpts[GCCL_GCS_CMD_KEY]) {
reqOpts.headers[
'x-goog-api-client'
] += ` gccl-gcs-cmd/${reqOpts[GCCL_GCS_CMD_KEY]}`;
reqOpts.headers['x-goog-api-client'] +=
` gccl-gcs-cmd/${reqOpts[GCCL_GCS_CMD_KEY]}`;
}

if (reqOpts.shouldReturnStream) {
Expand Down
5 changes: 2 additions & 3 deletions src/resumable-upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -933,9 +933,8 @@ export class Upload extends Writable {
// `Content-Length` for multiple chunk uploads is the size of the chunk,
// not the overall object
headers['Content-Length'] = bytesToUpload;
headers[
'Content-Range'
] = `bytes ${this.offset}-${endingByte}/${totalObjectSize}`;
headers['Content-Range'] =
`bytes ${this.offset}-${endingByte}/${totalObjectSize}`;
} else {
headers['Content-Range'] = `bytes ${this.offset}-*/${this.contentLength}`;
}
Expand Down
5 changes: 2 additions & 3 deletions src/transfer-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,8 @@ class XMLMultiPartUploadHelper implements MultiPartUploadHelper {

// Prepend command feature to value, if not already there
if (!value.includes(GCCL_GCS_CMD_FEATURE.UPLOAD_SHARDED)) {
headers[
key
] = `${value} gccl-gcs-cmd/${GCCL_GCS_CMD_FEATURE.UPLOAD_SHARDED}`;
headers[key] =
`${value} gccl-gcs-cmd/${GCCL_GCS_CMD_FEATURE.UPLOAD_SHARDED}`;
}
} else if (key.toLocaleLowerCase().trim() === 'user-agent') {
userAgentFound = true;
Expand Down
Loading