From a53657d958e7da9d06c7786800ec7a3fc8ce388a Mon Sep 17 00:00:00 2001 From: Akash Chetty Date: Mon, 7 Nov 2022 22:05:44 +0530 Subject: [PATCH] fix: use proper status code to handle warehouse process (#2659) --- router/batchrouter/batchrouter.go | 22 +++++--- router/batchrouter/batchrouter_test.go | 70 ++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 6 deletions(-) diff --git a/router/batchrouter/batchrouter.go b/router/batchrouter/batchrouter.go index bb42a657b8..07302f98d8 100644 --- a/router/batchrouter/batchrouter.go +++ b/router/batchrouter/batchrouter.go @@ -8,6 +8,7 @@ import ( stdjson "encoding/json" "errors" "fmt" + "io" "net/http" "net/url" "os" @@ -58,7 +59,6 @@ var ( mainLoopSleep, diagnosisTickerTime time.Duration uploadFreqInS int64 objectStorageDestinations []string - warehouseURL string warehouseServiceFailedTime time.Time warehouseServiceFailedTimeLock sync.RWMutex warehouseServiceMaxRetryTime time.Duration @@ -120,6 +120,7 @@ type HandleT struct { successfulJobCount stats.Measurement failedJobCount stats.Measurement abortedJobCount stats.Measurement + warehouseURL string backgroundGroup *errgroup.Group backgroundCtx context.Context @@ -1124,16 +1125,23 @@ func (brt *HandleT) postToWarehouse(batchJobs *BatchJobsT, output StorageUploadO if err != nil { brt.logger.Errorf("BRT: Failed to marshal WH staging file payload error:%v", err) } - uri := fmt.Sprintf(`%s/v1/process`, warehouseURL) + uri := fmt.Sprintf(`%s/v1/process`, brt.warehouseURL) resp, err := brt.netHandle.Post(uri, "application/json; charset=utf-8", bytes.NewBuffer(jsonPayload)) if err != nil { brt.logger.Errorf("BRT: Failed to route staging file URL to warehouse service@%v, error:%v", uri, err) - } else { - brt.logger.Infof("BRT: Routed successfully staging file URL to warehouse service@%v", uri) - defer resp.Body.Close() + return } + defer func() { _ = resp.Body.Close() }() + if resp.StatusCode == http.StatusOK { + _, err = io.Copy(io.Discard, resp.Body) + brt.logger.Infof("BRT: Routed successfully staging file URL to warehouse service@%v", uri) + } else { + body, _ := io.ReadAll(resp.Body) + err = fmt.Errorf("BRT: Failed to route staging file URL to warehouse service@%v, status: %v, body: %v", uri, resp.Status, string(body)) + brt.logger.Error(err) + } return } @@ -2252,7 +2260,6 @@ func loadConfig() { config.RegisterInt64ConfigVariable(30, &uploadFreqInS, true, 1, "BatchRouter.uploadFreqInS") objectStorageDestinations = []string{"S3", "GCS", "AZURE_BLOB", "MINIO", "DIGITAL_OCEAN_SPACES"} asyncDestinations = []string{"MARKETO_BULK_UPLOAD"} - warehouseURL = misc.GetWarehouseURL() // Time period for diagnosis ticker config.RegisterDurationConfigVariable(600, &diagnosisTickerTime, false, time.Second, []string{"Diagnostics.batchRouterTimePeriod", "Diagnostics.batchRouterTimePeriodInS"}...) config.RegisterDurationConfigVariable(3, &warehouseServiceMaxRetryTime, true, time.Hour, []string{"BatchRouter.warehouseServiceMaxRetryTime", "BatchRouter.warehouseServiceMaxRetryTimeinHr"}...) @@ -2295,6 +2302,9 @@ func (brt *HandleT) Setup(backendConfig backendconfig.BackendConfig, jobsDB, err // error is ignored as context.TODO() is passed, err is not expected. _ = brt.reporting.WaitForSetup(context.TODO(), types.CORE_REPORTING_CLIENT) } + if brt.warehouseURL == "" { + brt.warehouseURL = misc.GetWarehouseURL() + } brt.inProgressMap = map[string]bool{} brt.lastExecMap = map[string]int64{} diff --git a/router/batchrouter/batchrouter_test.go b/router/batchrouter/batchrouter_test.go index 19b6e01e67..13d97dfdb0 100644 --- a/router/batchrouter/batchrouter_test.go +++ b/router/batchrouter/batchrouter_test.go @@ -2,9 +2,16 @@ package batchrouter import ( "context" + jsonb "encoding/json" + "errors" "fmt" + "net/http" + "net/http/httptest" + "testing" "time" + "github.com/stretchr/testify/require" + "github.com/gofrs/uuid" "github.com/golang/mock/gomock" . "github.com/onsi/ginkgo/v2" @@ -300,3 +307,66 @@ func assertJobStatus(job *jobsdb.JobT, status *jobsdb.JobStatusT, expectedState, Expect(status.ExecTime).To(BeTemporally("~", time.Now(), 10*time.Second)) Expect(status.AttemptNum).To(Equal(attemptNum)) } + +func TestPostToWarehouse(t *testing.T) { + // TOT: Decouple this test from the actual warehouse + inputs := []struct { + name string + responseCode int + responseBody string + expectedError error + }{ + { + name: "should successfully post to warehouse", + responseBody: "OK", + responseCode: http.StatusOK, + }, + { + name: "should fail to post to warehouse", + responseCode: http.StatusNotFound, + responseBody: "Not Found", + expectedError: errors.New("BRT: Failed to route staging file URL to warehouse service@%s/v1/process, status: 404 Not Found, body: Not Found"), + }, + } + for _, input := range inputs { + t.Run(input.name, func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(input.responseCode) + _, _ = w.Write([]byte(input.responseBody)) + })) + t.Cleanup(ts.Close) + + job := HandleT{ + netHandle: ts.Client(), + logger: logger.NOP, + warehouseURL: ts.URL, + } + batchJobs := BatchJobsT{ + Jobs: []*jobsdb.JobT{ + { + EventPayload: jsonb.RawMessage(` + { + "receivedAt": "2019-10-12T07:20:50.52Z", + "metadata": { + "columns": { + "id": "string" + }, + "table": "tracks" + } + } + `), + WorkspaceId: "test-workspace", + Parameters: jsonb.RawMessage(`{}`), + }, + }, + BatchDestination: &DestinationT{}, + } + err := job.postToWarehouse(&batchJobs, StorageUploadOutput{}) + if input.expectedError != nil { + require.Equal(t, fmt.Sprintf(input.expectedError.Error(), ts.URL), err.Error()) + } else { + require.NoError(t, err) + } + }) + } +}