Skip to content

Commit

Permalink
fix: send router transform failures to live events (#2637)
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 authored Nov 3, 2022
1 parent 366c1f5 commit 6931b17
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 18 deletions.
19 changes: 4 additions & 15 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,6 @@ func (worker *workerT) processDestinationJobs() {
})

for _, destinationJob := range worker.destinationJobs {
var attemptedToSendTheJob bool
var errorAt string
respBodyArr := make([]string, 0)
if destinationJob.StatusCode == 200 || destinationJob.StatusCode == 0 {
Expand Down Expand Up @@ -753,8 +752,6 @@ func (worker *workerT) processDestinationJobs() {
respStatusCode = destinationResponseHandler.IsSuccessStatus(respStatusCode, respBody)
}

attemptedToSendTheJob = true

worker.deliveryTimeStat.End()
deliveryLatencyStat.End()

Expand Down Expand Up @@ -807,7 +804,6 @@ func (worker *workerT) processDestinationJobs() {
destinationJobMetadata: &_destinationJobMetadata,
respStatusCode: respStatusCode,
respBody: respBody,
attemptedToSendTheJob: attemptedToSendTheJob,
errorAt: errorAt,
})
}
Expand All @@ -823,7 +819,6 @@ func (worker *workerT) processDestinationJobs() {
for _, routerJobResponse := range routerJobResponses {
destinationJobMetadata := routerJobResponse.destinationJobMetadata
destinationJob := routerJobResponse.destinationJob
attemptedToSendTheJob := routerJobResponse.attemptedToSendTheJob
attemptNum := destinationJobMetadata.AttemptNum
respStatusCode = routerJobResponse.respStatusCode
status := jobsdb.JobStatusT{
Expand Down Expand Up @@ -855,27 +850,22 @@ func (worker *workerT) processDestinationJobs() {
userToJobIDMap[destinationJobMetadata.UserID] = destinationJobMetadata.JobID
}

if attemptedToSendTheJob {
status.AttemptNum++
}

status.AttemptNum++
status.ErrorResponse = routerutils.EmptyPayload
status.ErrorCode = strconv.Itoa(respStatusCode)

worker.postStatusOnResponseQ(respStatusCode, routerJobResponse.respBody, destinationJob.Message, respContentType, destinationJobMetadata, &status, routerJobResponse.errorAt)

worker.sendEventDeliveryStat(destinationJobMetadata, &status, &destinationJob.Destination)

if attemptedToSendTheJob {
worker.sendRouterResponseCountStat(&status, &destinationJob.Destination, routerJobResponse.errorAt)
}
worker.sendRouterResponseCountStat(&status, &destinationJob.Destination, routerJobResponse.errorAt)
}

// NOTE: Sending live events to config backend after the status objects are built completely.
destLiveEventSentMap := make(map[*types.DestinationJobT]struct{})
for _, routerJobResponse := range routerJobResponses {
// Sending only one destination live event for every destinationJob, if it was attemptedToSendTheJob
if _, ok := destLiveEventSentMap[routerJobResponse.destinationJob]; !ok && routerJobResponse.attemptedToSendTheJob {
// Sending only one destination live event for every destinationJob
if _, ok := destLiveEventSentMap[routerJobResponse.destinationJob]; !ok {
payload := routerJobResponse.destinationJob.Message
if routerJobResponse.destinationJob.Message == nil {
payload = routerJobResponse.destinationJobMetadata.JobT.EventPayload
Expand Down Expand Up @@ -954,7 +944,6 @@ type JobResponse struct {
respStatusCode int
respBody string
errorAt string
attemptedToSendTheJob bool
status *jobsdb.JobStatusT
}

Expand Down
10 changes: 7 additions & 3 deletions router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"testing"
"time"

"github.com/tidwall/gjson"

"github.com/rudderlabs/rudder-server/enterprise/reporting"

jsoniter "github.com/json-iterator/go"
Expand Down Expand Up @@ -966,7 +968,7 @@ var _ = Describe("Router", func() {
}).Return(nil)
c.mockRouterJobsDB.EXPECT().UpdateJobStatusInTx(gomock.Any(), gomock.Any(), gomock.Any(), []string{customVal["GA"]}, nil).Times(1).
Do(func(ctx context.Context, _ interface{}, statuses []*jobsdb.JobStatusT, _, _ interface{}) {
assertTransformJobStatuses(toRetryJobsList[0], statuses[0], jobsdb.Failed.State, "500", 1)
assertTransformJobStatuses(toRetryJobsList[0], statuses[0], jobsdb.Failed.State, "500", 2)
assertTransformJobStatuses(unprocessedJobsList[0], statuses[1], jobsdb.Waiting.State, "", 0)
assertTransformJobStatuses(unprocessedJobsList[1], statuses[2], jobsdb.Waiting.State, "", 0)
})
Expand Down Expand Up @@ -1374,8 +1376,10 @@ func assertJobStatus(job *jobsdb.JobT, status *jobsdb.JobStatusT, expectedState,
Expect(status.JobID).To(Equal(job.JobID))
Expect(status.JobState).To(Equal(expectedState))
Expect(status.ErrorCode).To(Equal(errorCode))
if attemptNum > 1 {
Expect(status.ErrorResponse).To(MatchJSON(errorResponse))
if attemptNum >= 1 {
Expect(gjson.GetBytes(status.ErrorResponse, "content-type").String()).To(Equal(gjson.Get(errorResponse, "content-type").String()))
Expect(gjson.GetBytes(status.ErrorResponse, "response").String()).To(Equal(gjson.Get(errorResponse, "response").String()))
Expect(gjson.GetBytes(status.ErrorResponse, "reason").String()).To(Equal(gjson.Get(errorResponse, "reason").String()))
}
Expect(status.RetryTime).To(BeTemporally("~", time.Now(), 10*time.Second))
Expect(status.ExecTime).To(BeTemporally("~", time.Now(), 10*time.Second))
Expand Down

0 comments on commit 6931b17

Please sign in to comment.