diff --git a/regulation-worker/cmd/main.go b/regulation-worker/cmd/main.go index 13692a86f0..6d47fb7eb7 100644 --- a/regulation-worker/cmd/main.go +++ b/regulation-worker/cmd/main.go @@ -23,6 +23,7 @@ import ( "github.com/rudderlabs/rudder-server/regulation-worker/internal/service" "github.com/rudderlabs/rudder-server/services/filemanager" "github.com/rudderlabs/rudder-server/utils/logger" + "github.com/rudderlabs/rudder-server/utils/misc" ) var pkgLogger = logger.NewLogger().Child("regulation-worker") @@ -57,10 +58,9 @@ func Run(ctx context.Context) { if err != nil { panic(fmt.Errorf("error while getting workspaceId: %w", err)) } - svc := service.JobSvc{ API: &client.JobAPI{ - Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.timeout", 30, time.Second)}, + Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.regulationManager.timeout", 60, time.Second)}, URLPrefix: config.MustGetString("CONFIG_BACKEND_URL"), WorkspaceToken: config.MustGetString("CONFIG_BACKEND_TOKEN"), WorkspaceID: workspaceId, @@ -73,14 +73,16 @@ func Run(ctx context.Context) { FilesLimit: config.GetInt("REGULATION_WORKER_FILES_LIMIT", 1000), }, &api.APIManager{ - Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.timeout", 30, time.Second)}, + Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.transformer.timeout", 60, time.Second)}, DestTransformURL: config.MustGetString("DEST_TRANSFORM_URL"), }), } pkgLogger.Infof("calling looper with service: %v", svc) l := withLoop(svc) - err = l.Loop(ctx) + err = misc.WithBugsnag(func() error { + return l.Loop(ctx) + })() if err != nil && !errors.Is(err, context.Canceled) { pkgLogger.Errorf("error: %v", err) panic(err) diff --git a/regulation-worker/internal/client/client.go b/regulation-worker/internal/client/client.go index b036acf8d4..4a6d11375e 100644 --- a/regulation-worker/internal/client/client.go +++ b/regulation-worker/internal/client/client.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "os" "regexp" "strconv" "time" @@ -30,12 +31,9 @@ type JobAPI struct { // which is actually returned. func (j *JobAPI) Get(ctx context.Context) (model.Job, error) { pkgLogger.Debugf("making http request to regulation manager to get new job") - ctx, cancel := context.WithTimeout(ctx, time.Minute) - defer cancel() url := fmt.Sprintf("%s/dataplane/workspaces/%s/regulations/workerJobs", j.URLPrefix, j.WorkspaceID) pkgLogger.Debugf("making GET request to URL: %v", url) - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { pkgLogger.Errorf("error while create new http request: %v", err) @@ -45,10 +43,12 @@ func (j *JobAPI) Get(ctx context.Context) (model.Job, error) { req.SetBasicAuth(j.WorkspaceToken, "") req.Header.Set("Content-Type", "application/json") - pkgLogger.Debugf("making request: %v", req) resp, err := j.Client.Do(req) + if os.IsTimeout(err) { + stats.Default.NewStat("regulation_manager.request_timeout", stats.CountType).Count(1) + return model.Job{}, model.ErrRequestTimeout + } if err != nil { - pkgLogger.Errorf("http request failed with error: %v", err) return model.Job{}, err } defer func() { @@ -133,8 +133,11 @@ func (j *JobAPI) UpdateStatus(ctx context.Context, status model.JobStatus, jobID req.Header.Set("Content-Type", "application/json") resp, err := j.Client.Do(req) + if os.IsTimeout(err) { + stats.Default.NewStat("regulation_manager.request_timeout", stats.CountType).Count(1) + return model.ErrRequestTimeout + } if err != nil { - pkgLogger.Errorf("error while making http request: %v", err) return err } defer func() { _ = resp.Body.Close() }() diff --git a/regulation-worker/internal/client/client_test.go b/regulation-worker/internal/client/client_test.go index 342f9a855e..61728694c9 100644 --- a/regulation-worker/internal/client/client_test.go +++ b/regulation-worker/internal/client/client_test.go @@ -7,6 +7,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "github.com/rudderlabs/rudder-server/regulation-worker/internal/client" "github.com/rudderlabs/rudder-server/regulation-worker/internal/initialize" @@ -24,6 +25,7 @@ func TestGet(t *testing.T) { expectedErr error acutalErr error expectedUsrAttributeCount int + serverDelay int }{ { name: "Get request to get job: successful", @@ -44,17 +46,31 @@ func TestGet(t *testing.T) { respCode: 429, expectedErr: fmt.Errorf("unexpected response code: 429"), }, + { + name: "Get request to get model.ErrRequestTimeout", + workspaceID: "1001", + expectedErr: model.ErrRequestTimeout, + serverDelay: 1, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(tt.respCode) + if tt.respCode != 0 { + w.WriteHeader(tt.respCode) + } + time.Sleep(time.Duration(tt.serverDelay) * time.Millisecond) fmt.Fprintf(w, tt.respBody) })) defer svr.Close() - + httpClient := &http.Client{} + if tt.serverDelay > 0 { + httpClient = &http.Client{ + Timeout: time.Duration(tt.serverDelay) * time.Microsecond, + } + } c := client.JobAPI{ - Client: &http.Client{}, + Client: httpClient, WorkspaceID: tt.workspaceID, URLPrefix: svr.URL, } diff --git a/regulation-worker/internal/delete/api/api.go b/regulation-worker/internal/delete/api/api.go index f8413b75d6..b0db466035 100644 --- a/regulation-worker/internal/delete/api/api.go +++ b/regulation-worker/internal/delete/api/api.go @@ -11,6 +11,7 @@ import ( "fmt" "io" "net/http" + "os" "strings" "github.com/rudderlabs/rudder-server/regulation-worker/internal/model" @@ -31,24 +32,19 @@ type APIManager struct { // prepares payload based on (job,destDetail) & make an API call to transformer. // gets (status, failure_reason) which is converted to appropriate model.Error & returned to caller. func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map[string]interface{}, destName string) model.JobStatus { - pkgLogger.Debugf("deleting: %v", job, " from API destination: %v", destName) - method := "POST" + method := http.MethodPost endpoint := "/deleteUsers" url := fmt.Sprint(api.DestTransformURL, endpoint) - pkgLogger.Debugf("transformer url: %s", url) bodySchema := mapJobToPayload(job, strings.ToLower(destName), destConfig) - pkgLogger.Debugf("payload: %#v", bodySchema) reqBody, err := json.Marshal(bodySchema) if err != nil { - pkgLogger.Errorf("error while marshalling job request body: %v", err) return model.JobStatusFailed } req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewReader(reqBody)) if err != nil { - pkgLogger.Errorf("error while create new request: %v", err) return model.JobStatusFailed } req.Header.Set("Content-Type", "application/json") @@ -62,26 +58,24 @@ func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map fileCleaningTime.Start() defer fileCleaningTime.End() - pkgLogger.Debugf("sending request: %v", req) resp, err := api.Client.Do(req) if err != nil { - pkgLogger.Errorf("error while making http request: %v", err) + if os.IsTimeout(err) { + stats.Default.NewStat("regulation_worker_delete_api_timeout", stats.CountType).Count(1) + } return model.JobStatusFailed } defer resp.Body.Close() bodyBytes, err := io.ReadAll(resp.Body) if err != nil { - pkgLogger.Errorf("error while reading response body: %v", err) return model.JobStatusFailed } - bodyString := string(bodyBytes) - pkgLogger.Debugf("response body: %s", bodyString) var jobResp []JobRespSchema if err := json.Unmarshal(bodyBytes, &jobResp); err != nil { - pkgLogger.Errorf("error while decoding response body: %v", err) return model.JobStatusFailed } + switch resp.StatusCode { case http.StatusOK: diff --git a/regulation-worker/internal/model/model.go b/regulation-worker/internal/model/model.go index cb0ebbdb1b..94a5ac7afd 100644 --- a/regulation-worker/internal/model/model.go +++ b/regulation-worker/internal/model/model.go @@ -11,6 +11,7 @@ var ( ErrNoRunnableJob = errors.New("no runnable job found") ErrDestNotImplemented = errors.New("job deletion not implemented for the destination") ErrInvalidDestination = errors.New("invalid destination") + ErrRequestTimeout = errors.New("request timeout") ) type JobStatus string diff --git a/regulation-worker/internal/service/looper.go b/regulation-worker/internal/service/looper.go index ad37e80fbd..fc2edbab4c 100644 --- a/regulation-worker/internal/service/looper.go +++ b/regulation-worker/internal/service/looper.go @@ -7,7 +7,6 @@ import ( "strconv" "time" - backoff "github.com/cenkalti/backoff/v4" "github.com/rudderlabs/rudder-server/regulation-worker/internal/model" "github.com/rudderlabs/rudder-server/utils/logger" "github.com/rudderlabs/rudder-server/utils/misc" @@ -16,8 +15,7 @@ import ( var pkgLogger = logger.NewLogger().Child("service") type Looper struct { - Backoff backoff.BackOffContext - Svc JobSvc + Svc JobSvc } func (l *Looper) Loop(ctx context.Context) error { @@ -27,6 +25,10 @@ func (l *Looper) Loop(ctx context.Context) error { if err != nil { return fmt.Errorf("reading value: %s from env: %s", "INTERVAL_IN_MINUTES", err.Error()) } + retryDelay, err := getenvInt("RETRY_DELAY_IN_SECONDS", 60) + if err != nil { + return fmt.Errorf("reading value: %s from env: %s", "INTERVAL_IN_MINUTES", err.Error()) + } for { @@ -38,7 +40,15 @@ func (l *Looper) Loop(ctx context.Context) error { pkgLogger.Debugf("context cancelled... exiting infinite loop %v", err) return nil } - + continue + } + // this is to make sure that we don't panic when any of the API call fails with deadline exceeded error. + if err == model.ErrRequestTimeout { + pkgLogger.Errorf("context deadline exceeded... retrying after %d minute(s): %v", retryDelay, err) + if err := misc.SleepCtx(ctx, time.Duration(retryDelay)*time.Second); err != nil { + pkgLogger.Debugf("context cancelled... exiting infinite loop %v", err) + return nil + } continue } diff --git a/regulation-worker/internal/service/service.go b/regulation-worker/internal/service/service.go index cb26a189bc..0cb2c95272 100644 --- a/regulation-worker/internal/service/service.go +++ b/regulation-worker/internal/service/service.go @@ -4,7 +4,6 @@ package service // TODO: appropriate status var update and handling via model.status import ( "context" - "fmt" "time" "github.com/cenkalti/backoff" @@ -36,6 +35,7 @@ type JobSvc struct { // calls api-client.getJob(workspaceID) // calls api-client to get new job with workspaceID, which returns jobID. func (js *JobSvc) JobSvc(ctx context.Context) error { + loopStart := time.Now() // API request to get new job pkgLogger.Debugf("making API request to get job") job, err := js.API.Get(ctx) @@ -44,11 +44,6 @@ func (js *JobSvc) JobSvc(ctx context.Context) error { return err } - totalJobTime := stats.Default.NewTaggedStat("total_job_time", stats.TimerType, stats.Tags{"jobId": fmt.Sprintf("%d", job.ID), "workspaceId": job.WorkspaceID}) - totalJobTime.Start() - defer totalJobTime.End() - - pkgLogger.Debugf("job: %v", job) // once job is successfully received, calling updatestatus API to update the status of job to running. status := model.JobStatusRunning err = js.updateStatus(ctx, status, job.ID) @@ -65,7 +60,16 @@ func (js *JobSvc) JobSvc(ctx context.Context) error { return js.updateStatus(ctx, model.JobStatusFailed, job.ID) } + deletionStart := time.Now() + status = js.Deleter.Delete(ctx, job, destDetail) + + stats.Default.NewTaggedStat("deletion_time", stats.TimerType, stats.Tags{"workspaceId": job.WorkspaceID, "destinationid": destDetail.DestinationID, "destinationType": destDetail.Name, "status": string(status)}).Since(deletionStart) + if status == model.JobStatusComplete { + stats.Default.NewTaggedStat("deleted_user_count", stats.CountType, stats.Tags{"workspaceId": job.WorkspaceID, "destinationid": destDetail.DestinationID, "destinationType": destDetail.Name}).Count(len(job.Users)) + } + stats.Default.NewTaggedStat("loop_time", stats.TimerType, stats.Tags{"workspaceId": job.WorkspaceID, "destinationid": destDetail.DestinationID, "destinationType": destDetail.Name, "status": string(status)}).Since(loopStart) + return js.updateStatus(ctx, status, job.ID) }