Skip to content

Commit

Permalink
Fix flush() to retry fetch() on transient errors
Browse files Browse the repository at this point in the history
This changes flush() to retry the fetch() call on transient errors such
as EPIPE, ECONNREFUSED, and ECONNRESET.
  • Loading branch information
penberg committed Jan 21, 2024
1 parent 5db0765 commit 1d8a952
Showing 1 changed file with 23 additions and 2 deletions.
25 changes: 23 additions & 2 deletions src/http/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,7 @@ export class HttpStream extends Stream implements SqlOwner {
let promise;
try {
const request = createRequest();
const fetch = this.#fetch;
promise = fetch(request);
promise = this.#fetchWithRetry(request);
} catch (error) {
promise = Promise.reject(error);
}
Expand All @@ -356,6 +355,19 @@ export class HttpStream extends Stream implements SqlOwner {
});
}

#fetchWithRetry(request: Request, retryCount = 3): Promise<Response> {
try {
return this.#fetch(request);
} catch (error: any) {
if (isRetryableError(error)) {
if (retryCount > 0) {
return this.#fetchWithRetry(request, retryCount - 1);
}
}
throw error;
}
}

#createPipelineRequest(pipeline: Array<PipelineEntry>, endpoint: Endpoint): Request {
return this.#createRequest<proto.PipelineReqBody>(
new URL(endpoint.pipelinePath, this.#baseUrl),
Expand Down Expand Up @@ -417,6 +429,15 @@ export class HttpStream extends Stream implements SqlOwner {
}
}

function isRetryableError(error: any): boolean {
if (!error.errno) {
return false;
}
return error.errno === "EPIPE"
|| error.errno === "ECONNREFUSED"
|| error.errno === "ECONNRESET";
}

function handlePipelineResponse(pipeline: Array<PipelineEntry>, respBody: proto.PipelineRespBody): void {
if (respBody.results.length !== pipeline.length) {
throw new ProtoError("Server returned unexpected number of pipeline results");
Expand Down

0 comments on commit 1d8a952

Please sign in to comment.