Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-782588: Retry result request for async query if still in progress #824

Merged
merged 1 commit into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 38 additions & 17 deletions async.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,46 @@ func (sr *snowflakeRestful) getAsync(
defer close(errChannel)
token, _, _ := sr.TokenAccessor.GetTokens()
headers[headerAuthorizationKey] = fmt.Sprintf(headerSnowflakeToken, token)
resp, err := sr.FuncGet(ctx, sr, URL, headers, timeout)
if err != nil {
logger.WithContext(ctx).Errorf("failed to get response. err: %v", err)
sfError.Message = err.Error()
errChannel <- sfError
return err
}
if resp.Body != nil {

var err error
var respd execResponse
retry := 0
retryPattern := []int32{1, 1, 2, 3, 4, 8, 10}
sfc-gh-igarish marked this conversation as resolved.
Show resolved Hide resolved

for {
sfc-gh-pfus marked this conversation as resolved.
Show resolved Hide resolved
resp, err := sr.FuncGet(ctx, sr, URL, headers, timeout)
if err != nil {
logger.WithContext(ctx).Errorf("failed to get response. err: %v", err)
sfError.Message = err.Error()
errChannel <- sfError
return err
}
defer resp.Body.Close()
}

respd := execResponse{}
err = json.NewDecoder(resp.Body).Decode(&respd)
resp.Body.Close()
if err != nil {
logger.WithContext(ctx).Errorf("failed to decode JSON. err: %v", err)
sfError.Message = err.Error()
errChannel <- sfError
return err
respd = execResponse{} // reset the response
err = json.NewDecoder(resp.Body).Decode(&respd)
if err != nil {
logger.WithContext(ctx).Errorf("failed to decode JSON. err: %v", err)
sfError.Message = err.Error()
errChannel <- sfError
return err
}
if respd.Code != queryInProgressAsyncCode {
// If the query takes longer than 45 seconds to complete the results are not returned.
sfc-gh-pfus marked this conversation as resolved.
Show resolved Hide resolved
// If the query is still in progress after 45 seconds, retry the request to the /results endpoint.
// For all other scenarios continue processing results response
break
} else {
// Sleep before retrying get result request. Exponential backoff up to 5 seconds.
// Once 5 second backoff is reached it will keep retrying with this sleeptime.
sleepTime := time.Millisecond * time.Duration(500*retryPattern[retry])
logger.WithContext(ctx).Infof("Query execution still in progress. Sleep for %v ms", sleepTime)
time.Sleep(sleepTime)
}
sfc-gh-pfus marked this conversation as resolved.
Show resolved Hide resolved
if retry < len(retryPattern)-1 {
sfc-gh-ext-simba-lb marked this conversation as resolved.
Show resolved Hide resolved
retry++
}

}

sc := &snowflakeConn{rest: sr, cfg: cfg}
Expand Down
35 changes: 35 additions & 0 deletions async_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,38 @@ func retrieveRows(rows *RowsExtended, ch chan string) {
ch <- s
close(ch)
}

func TestLongRunningAsyncQuery(t *testing.T) {
db := openDB(t)
defer db.Close()

ctx, _ := WithMultiStatement(context.Background(), 0)
query := "CALL SYSTEM$WAIT(50, 'SECONDS');use snowflake_sample_data"
sfc-gh-pfus marked this conversation as resolved.
Show resolved Hide resolved

rows, err := db.QueryContext(WithAsyncMode(ctx), query)
if err != nil {
t.Fatalf("failed to run a query. %v, err: %v", query, err)
}
defer rows.Close()
var v string
i := 0
for {
sfc-gh-ext-simba-lb marked this conversation as resolved.
Show resolved Hide resolved
for rows.Next() {
err := rows.Scan(&v)
if err != nil {
t.Fatalf("failed to get result. err: %v", err)
}
if v == "" {
t.Fatal("should have returned a result")
}
results := []string{"waited 50 seconds", "Statement executed successfully."}
if v != results[i] {
t.Fatalf("unexpected result returned. expected: %v, but got: %v", results[i], v)
}
i++
}
if !rows.NextResultSet() {
break
}
}
}