Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: regulation worker avoid panic in case of timeout #2657

Merged
merged 5 commits into from
Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions regulation-worker/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/rudderlabs/rudder-server/regulation-worker/internal/service"
"github.com/rudderlabs/rudder-server/services/filemanager"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
)

var pkgLogger = logger.NewLogger().Child("regulation-worker")
Expand Down Expand Up @@ -57,10 +58,9 @@ func Run(ctx context.Context) {
if err != nil {
panic(fmt.Errorf("error while getting workspaceId: %w", err))
}

svc := service.JobSvc{
API: &client.JobAPI{
Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.timeout", 30, time.Second)},
Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.regulationManager.timeout", 60, time.Second)},
URLPrefix: config.MustGetString("CONFIG_BACKEND_URL"),
WorkspaceToken: config.MustGetString("CONFIG_BACKEND_TOKEN"),
WorkspaceID: workspaceId,
Expand All @@ -73,14 +73,16 @@ func Run(ctx context.Context) {
FilesLimit: config.GetInt("REGULATION_WORKER_FILES_LIMIT", 1000),
},
&api.APIManager{
Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.timeout", 30, time.Second)},
Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.transformer.timeout", 60, time.Second)},
DestTransformURL: config.MustGetString("DEST_TRANSFORM_URL"),
}),
}

pkgLogger.Infof("calling looper with service: %v", svc)
l := withLoop(svc)
err = l.Loop(ctx)
err = misc.WithBugsnag(func() error {
return l.Loop(ctx)
})()
if err != nil && !errors.Is(err, context.Canceled) {
pkgLogger.Errorf("error: %v", err)
panic(err)
Expand Down
15 changes: 9 additions & 6 deletions regulation-worker/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"os"
"regexp"
"strconv"
"time"
Expand All @@ -30,12 +31,9 @@ type JobAPI struct {
// which is actually returned.
func (j *JobAPI) Get(ctx context.Context) (model.Job, error) {
pkgLogger.Debugf("making http request to regulation manager to get new job")
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

url := fmt.Sprintf("%s/dataplane/workspaces/%s/regulations/workerJobs", j.URLPrefix, j.WorkspaceID)
pkgLogger.Debugf("making GET request to URL: %v", url)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
pkgLogger.Errorf("error while create new http request: %v", err)
Expand All @@ -45,10 +43,12 @@ func (j *JobAPI) Get(ctx context.Context) (model.Job, error) {
req.SetBasicAuth(j.WorkspaceToken, "")
req.Header.Set("Content-Type", "application/json")

pkgLogger.Debugf("making request: %v", req)
resp, err := j.Client.Do(req)
if os.IsTimeout(err) {
stats.Default.NewStat("regulation_manager.request_timeout", stats.CountType).Count(1)
return model.Job{}, model.ErrRequestTimeout
}
if err != nil {
pkgLogger.Errorf("http request failed with error: %v", err)
return model.Job{}, err
}
defer func() {
Expand Down Expand Up @@ -133,8 +133,11 @@ func (j *JobAPI) UpdateStatus(ctx context.Context, status model.JobStatus, jobID
req.Header.Set("Content-Type", "application/json")

resp, err := j.Client.Do(req)
if os.IsTimeout(err) {
stats.Default.NewStat("regulation_manager.request_timeout", stats.CountType).Count(1)
return model.ErrRequestTimeout
}
if err != nil {
pkgLogger.Errorf("error while making http request: %v", err)
return err
}
defer func() { _ = resp.Body.Close() }()
Expand Down
22 changes: 19 additions & 3 deletions regulation-worker/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/rudderlabs/rudder-server/regulation-worker/internal/client"
"github.com/rudderlabs/rudder-server/regulation-worker/internal/initialize"
Expand All @@ -24,6 +25,7 @@ func TestGet(t *testing.T) {
expectedErr error
acutalErr error
expectedUsrAttributeCount int
serverDelay int
}{
{
name: "Get request to get job: successful",
Expand All @@ -44,17 +46,31 @@ func TestGet(t *testing.T) {
respCode: 429,
expectedErr: fmt.Errorf("unexpected response code: 429"),
},
{
name: "Get request to get model.ErrRequestTimeout",
workspaceID: "1001",
expectedErr: model.ErrRequestTimeout,
serverDelay: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(tt.respCode)
if tt.respCode != 0 {
w.WriteHeader(tt.respCode)
}
time.Sleep(time.Duration(tt.serverDelay) * time.Second)
saurav-malani marked this conversation as resolved.
Show resolved Hide resolved
fmt.Fprintf(w, tt.respBody)
}))
defer svr.Close()

httpClient := &http.Client{}
if tt.serverDelay > 0 {
httpClient = &http.Client{
Timeout: time.Duration(tt.serverDelay) * time.Second / 2,
}
}
c := client.JobAPI{
Client: &http.Client{},
Client: httpClient,
WorkspaceID: tt.workspaceID,
URLPrefix: svr.URL,
}
Expand Down
18 changes: 6 additions & 12 deletions regulation-worker/internal/delete/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"io"
"net/http"
"os"
"strings"

"github.com/rudderlabs/rudder-server/regulation-worker/internal/model"
Expand All @@ -31,24 +32,19 @@ type APIManager struct {
// prepares payload based on (job,destDetail) & make an API call to transformer.
// gets (status, failure_reason) which is converted to appropriate model.Error & returned to caller.
func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map[string]interface{}, destName string) model.JobStatus {
pkgLogger.Debugf("deleting: %v", job, " from API destination: %v", destName)
method := "POST"
method := http.MethodPost
endpoint := "/deleteUsers"
url := fmt.Sprint(api.DestTransformURL, endpoint)
pkgLogger.Debugf("transformer url: %s", url)

bodySchema := mapJobToPayload(job, strings.ToLower(destName), destConfig)
pkgLogger.Debugf("payload: %#v", bodySchema)

reqBody, err := json.Marshal(bodySchema)
if err != nil {
pkgLogger.Errorf("error while marshalling job request body: %v", err)
return model.JobStatusFailed
}

req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewReader(reqBody))
if err != nil {
pkgLogger.Errorf("error while create new request: %v", err)
return model.JobStatusFailed
}
req.Header.Set("Content-Type", "application/json")
Expand All @@ -62,26 +58,24 @@ func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map
fileCleaningTime.Start()
defer fileCleaningTime.End()

pkgLogger.Debugf("sending request: %v", req)
resp, err := api.Client.Do(req)
if err != nil {
pkgLogger.Errorf("error while making http request: %v", err)
if os.IsTimeout(err) {
stats.Default.NewStat("regulation_worker_delete_api_timeout", stats.CountType).Count(1)
}
return model.JobStatusFailed
}
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
pkgLogger.Errorf("error while reading response body: %v", err)
return model.JobStatusFailed
}
bodyString := string(bodyBytes)
pkgLogger.Debugf("response body: %s", bodyString)

var jobResp []JobRespSchema
if err := json.Unmarshal(bodyBytes, &jobResp); err != nil {
pkgLogger.Errorf("error while decoding response body: %v", err)
return model.JobStatusFailed
}

switch resp.StatusCode {

case http.StatusOK:
Expand Down
1 change: 1 addition & 0 deletions regulation-worker/internal/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ var (
ErrNoRunnableJob = errors.New("no runnable job found")
ErrDestNotImplemented = errors.New("job deletion not implemented for the destination")
ErrInvalidDestination = errors.New("invalid destination")
ErrRequestTimeout = errors.New("request timeout")
)

type JobStatus string
Expand Down
18 changes: 14 additions & 4 deletions regulation-worker/internal/service/looper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strconv"
"time"

backoff "github.com/cenkalti/backoff/v4"
"github.com/rudderlabs/rudder-server/regulation-worker/internal/model"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
Expand All @@ -16,8 +15,7 @@ import (
var pkgLogger = logger.NewLogger().Child("service")

type Looper struct {
Backoff backoff.BackOffContext
Svc JobSvc
Svc JobSvc
}

func (l *Looper) Loop(ctx context.Context) error {
Expand All @@ -27,6 +25,10 @@ func (l *Looper) Loop(ctx context.Context) error {
if err != nil {
return fmt.Errorf("reading value: %s from env: %s", "INTERVAL_IN_MINUTES", err.Error())
}
retryDelay, err := getenvInt("RETRY_DELAY_IN_SECONDS", 60)
if err != nil {
return fmt.Errorf("reading value: %s from env: %s", "INTERVAL_IN_MINUTES", err.Error())
}

for {

Expand All @@ -38,7 +40,15 @@ func (l *Looper) Loop(ctx context.Context) error {
pkgLogger.Debugf("context cancelled... exiting infinite loop %v", err)
return nil
}

continue
}
// this is to make sure that we don't panic when any of the API call fails with deadline exceeded error.
if err == model.ErrRequestTimeout {
pkgLogger.Errorf("context deadline exceeded... retrying after %d minute(s): %v", retryDelay, err)
if err := misc.SleepCtx(ctx, time.Duration(retryDelay)*time.Second); err != nil {
pkgLogger.Debugf("context cancelled... exiting infinite loop %v", err)
return nil
}
continue
}

Expand Down
16 changes: 10 additions & 6 deletions regulation-worker/internal/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package service
// TODO: appropriate status var update and handling via model.status
import (
"context"
"fmt"
"time"

"github.com/cenkalti/backoff"
Expand Down Expand Up @@ -36,6 +35,7 @@ type JobSvc struct {
// calls api-client.getJob(workspaceID)
// calls api-client to get new job with workspaceID, which returns jobID.
func (js *JobSvc) JobSvc(ctx context.Context) error {
loopStart := time.Now()
// API request to get new job
pkgLogger.Debugf("making API request to get job")
job, err := js.API.Get(ctx)
Expand All @@ -44,11 +44,6 @@ func (js *JobSvc) JobSvc(ctx context.Context) error {
return err
}

totalJobTime := stats.Default.NewTaggedStat("total_job_time", stats.TimerType, stats.Tags{"jobId": fmt.Sprintf("%d", job.ID), "workspaceId": job.WorkspaceID})
totalJobTime.Start()
defer totalJobTime.End()

pkgLogger.Debugf("job: %v", job)
// once job is successfully received, calling updatestatus API to update the status of job to running.
status := model.JobStatusRunning
err = js.updateStatus(ctx, status, job.ID)
Expand All @@ -65,7 +60,16 @@ func (js *JobSvc) JobSvc(ctx context.Context) error {
return js.updateStatus(ctx, model.JobStatusFailed, job.ID)
}

deletionStart := time.Now()

status = js.Deleter.Delete(ctx, job, destDetail)

stats.Default.NewTaggedStat("deletion_time", stats.TimerType, stats.Tags{"workspaceId": job.WorkspaceID, "destinationid": destDetail.DestinationID, "destinationType": destDetail.Name, "status": string(status)}).Since(deletionStart)
if status == model.JobStatusComplete {
stats.Default.NewTaggedStat("deleted_user_count", stats.CountType, stats.Tags{"workspaceId": job.WorkspaceID, "destinationid": destDetail.DestinationID, "destinationType": destDetail.Name}).Count(len(job.Users))
}
stats.Default.NewTaggedStat("loop_time", stats.TimerType, stats.Tags{"workspaceId": job.WorkspaceID, "destinationid": destDetail.DestinationID, "destinationType": destDetail.Name, "status": string(status)}).Since(loopStart)

return js.updateStatus(ctx, status, job.ID)
}

Expand Down