Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Sep 15, 2023
1 parent 6c16717 commit 9eadca3
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 91 deletions.
17 changes: 5 additions & 12 deletions processor/events_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package processor
import (
"time"

"github.com/samber/lo"

"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/processor/transformer"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/samber/lo"
)

func (proc *Handle) getDroppedJobs(response transformer.Response, eventsToTransform []transformer.TransformerEvent) []*jobsdb.JobT {
Expand All @@ -18,20 +19,12 @@ func (proc *Handle) getDroppedJobs(response transformer.Response, eventsToTransf
// in transformer response, multiple messageIDs could be batched together
successFullMessageIDs := make([]string, 0)
lo.ForEach(response.Events, func(e transformer.TransformerResponse, _ int) {
if len(e.Metadata.MessageIDs) > 0 {
successFullMessageIDs = append(successFullMessageIDs, e.Metadata.MessageIDs...)
} else {
successFullMessageIDs = append(successFullMessageIDs, e.Metadata.MessageID)
}
successFullMessageIDs = append(successFullMessageIDs, e.Metadata.GetMessagesIDs()...)
})
// for failed as well
failedMessageIDs := make([]string, 0)
lo.ForEach(response.FailedEvents, func(e transformer.TransformerResponse, _ int) {
if len(e.Metadata.MessageIDs) > 0 {
failedMessageIDs = append(failedMessageIDs, e.Metadata.MessageIDs...)
} else {
failedMessageIDs = append(failedMessageIDs, e.Metadata.MessageID)
}
failedMessageIDs = append(failedMessageIDs, e.Metadata.GetMessagesIDs()...)
})
// the remainder of the messageIDs are those that are dropped
// we get jobs for those dropped messageIDs - for rsources_stats_collector
Expand All @@ -40,7 +33,7 @@ func (proc *Handle) getDroppedJobs(response transformer.Response, eventsToTransf
droppedJobs := make([]*jobsdb.JobT, 0)
for _, e := range eventsToTransform {
if _, ok := droppedMessageIDKeys[e.Metadata.MessageID]; ok {
var params = struct {
params := struct {
SourceJobRunID string `json:"source_job_run_id"`
SourceTaskRunID string `json:"source_task_run_id"`
SourceID string `json:"source_id"`
Expand Down
56 changes: 32 additions & 24 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,12 +961,12 @@ func (proc *Handle) getDestTransformerEvents(response transformer.Response, comm
for i := range response.Events {
// Update metrics maps
userTransformedEvent := &response.Events[i]
var messages []types.SingularEventT
if len(userTransformedEvent.Metadata.MessageIDs) > 0 {
messages = lo.Map(userTransformedEvent.Metadata.MessageIDs, func(msgID string, _ int) types.SingularEventT { return eventsByMessageID[msgID].SingularEvent })
} else {
messages = []types.SingularEventT{eventsByMessageID[userTransformedEvent.Metadata.MessageID].SingularEvent}
}
messages := lo.Map(
userTransformedEvent.Metadata.GetMessagesIDs(),
func(msgID string, _ int) types.SingularEventT {
return eventsByMessageID[msgID].SingularEvent
},
)

for _, message := range messages {
proc.updateMetricMaps(successCountMetadataMap, successCountMap, connectionDetailsMap, statusDetailsMap, userTransformedEvent, jobsdb.Succeeded.State, stage, func() json.RawMessage {
Expand Down Expand Up @@ -1181,12 +1181,12 @@ func (proc *Handle) getFailedEventJobs(response transformer.Response, commonMeta
var failedEventsToStore []*jobsdb.JobT
for i := range response.FailedEvents {
failedEvent := &response.FailedEvents[i]
var messages []types.SingularEventT
if len(failedEvent.Metadata.MessageIDs) > 0 {
messages = lo.Map(failedEvent.Metadata.MessageIDs, func(msgID string, _ int) types.SingularEventT { return eventsByMessageID[msgID].SingularEvent })
} else {
messages = []types.SingularEventT{eventsByMessageID[failedEvent.Metadata.MessageID].SingularEvent}
}
messages := lo.Map(
failedEvent.Metadata.GetMessagesIDs(),
func(msgID string, _ int) types.SingularEventT {
return eventsByMessageID[msgID].SingularEvent
},
)
payload, err := jsonfast.Marshal(messages)
if err != nil {
proc.logger.Errorf(`[Processor: getFailedEventJobs] Failed to unmarshal list of failed events: %v`, err)
Expand Down Expand Up @@ -1825,6 +1825,7 @@ func (proc *Handle) transformations(partition string, in *transformationMessage)
procErrorJobsByDestID := make(map[string][]*jobsdb.JobT)
var batchDestJobs []*jobsdb.JobT
var destJobs []*jobsdb.JobT
var droppedJobs []*jobsdb.JobT
routerDestIDs := make(map[string]struct{})

destProcStart := time.Now()
Expand Down Expand Up @@ -1856,6 +1857,7 @@ func (proc *Handle) transformations(partition string, in *transformationMessage)
for o := range chOut {
destJobs = append(destJobs, o.destJobs...)
batchDestJobs = append(batchDestJobs, o.batchDestJobs...)
droppedJobs = append(droppedJobs, o.droppedJobs...)
routerDestIDs = lo.Assign(routerDestIDs, o.routerDestIDs)
in.reportMetrics = append(in.reportMetrics, o.reportMetrics...)
for k, v := range o.errorsPerDestID {
Expand All @@ -1874,6 +1876,7 @@ func (proc *Handle) transformations(partition string, in *transformationMessage)
in.statusList,
destJobs,
batchDestJobs,
droppedJobs,

procErrorJobsByDestID,
in.procErrorJobs,
Expand All @@ -1893,6 +1896,7 @@ type storeMessage struct {
statusList []*jobsdb.JobStatusT
destJobs []*jobsdb.JobT
batchDestJobs []*jobsdb.JobT
droppedJobs []*jobsdb.JobT

procErrorJobsByDestID map[string][]*jobsdb.JobT
procErrorJobs []*jobsdb.JobT
Expand All @@ -1913,6 +1917,7 @@ func (sm *storeMessage) merge(subJob *storeMessage) {
sm.statusList = append(sm.statusList, subJob.statusList...)
sm.destJobs = append(sm.destJobs, subJob.destJobs...)
sm.batchDestJobs = append(sm.batchDestJobs, subJob.batchDestJobs...)
sm.droppedJobs = append(sm.droppedJobs, subJob.droppedJobs...)

sm.procErrorJobs = append(sm.procErrorJobs, subJob.procErrorJobs...)
for id, job := range subJob.procErrorJobsByDestID {
Expand Down Expand Up @@ -2066,6 +2071,10 @@ func (proc *Handle) Store(partition string, in *storeMessage) {
if err != nil {
return fmt.Errorf("publishing rsources stats: %w", err)
}
err = proc.saveDroppedJobs(in.droppedJobs, tx.Tx())
if err != nil {
return fmt.Errorf("saving dropped jobs: %w", err)
}

if proc.isReportingEnabled() {
proc.reporting.Report(in.reportMetrics, tx.SqlTx())
Expand Down Expand Up @@ -2123,6 +2132,7 @@ type transformSrcDestOutput struct {
batchDestJobs []*jobsdb.JobT
errorsPerDestID map[string][]*jobsdb.JobT
routerDestIDs map[string]struct{}
droppedJobs []*jobsdb.JobT
}

func (proc *Handle) transformSrcDest(
Expand Down Expand Up @@ -2157,6 +2167,7 @@ func (proc *Handle) transformSrcDest(
destJobs := make([]*jobsdb.JobT, 0)
routerDestIDs := make(map[string]struct{})
procErrorJobsByDestID := make(map[string][]*jobsdb.JobT)
droppedJobs := make([]*jobsdb.JobT, 0)

proc.config.configSubscriberLock.RLock()
destType := proc.config.destinationIDtoTypeMap[destID]
Expand Down Expand Up @@ -2225,8 +2236,7 @@ func (proc *Handle) transformSrcDest(
var successCountMetadataMap map[string]MetricMetadata
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, transformer.UserTransformerStage, trackingPlanEnabled, transformationEnabled)
failedJobs, failedMetrics, failedCountMap := proc.getFailedEventJobs(response, commonMetaData, eventsByMessageID, transformer.UserTransformerStage, transformationEnabled, trackingPlanEnabled)
droppedJobs := proc.getDroppedJobs(response, eventList)
proc.saveFailedJobs(append(failedJobs, droppedJobs...))
droppedJobs = append(droppedJobs, proc.getDroppedJobs(response, eventList)...)
if _, ok := procErrorJobsByDestID[destID]; !ok {
procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0)
}
Expand Down Expand Up @@ -2294,8 +2304,7 @@ func (proc *Handle) transformSrcDest(
var successCountMetadataMap map[string]MetricMetadata
eventsToTransform, successMetrics, successCountMap, successCountMetadataMap = proc.getDestTransformerEvents(response, commonMetaData, eventsByMessageID, destination, transformer.EventFilterStage, trackingPlanEnabled, transformationEnabled)
failedJobs, failedMetrics, failedCountMap := proc.getFailedEventJobs(response, commonMetaData, eventsByMessageID, transformer.EventFilterStage, transformationEnabled, trackingPlanEnabled)
droppedJobs := proc.getDroppedJobs(response, eventsToTransform)
proc.saveFailedJobs(append(failedJobs, droppedJobs...))
droppedJobs = append(droppedJobs, proc.getDroppedJobs(response, eventList)...)
proc.logger.Debug("Supported messages filtering output size", len(eventsToTransform))

// REPORTING - START
Expand Down Expand Up @@ -2365,8 +2374,7 @@ func (proc *Handle) transformSrcDest(
destTransformationStat.numEvents.Count(len(eventsToTransform))
destTransformationStat.numOutputSuccessEvents.Count(len(response.Events))
destTransformationStat.numOutputFailedEvents.Count(len(failedJobs))
droppedJobs := proc.getDroppedJobs(response, eventsToTransform)
proc.saveFailedJobs(append(failedJobs, droppedJobs...))
droppedJobs = append(droppedJobs, proc.getDroppedJobs(response, eventList)...)

if _, ok := procErrorJobsByDestID[destID]; !ok {
procErrorJobsByDestID[destID] = make([]*jobsdb.JobT, 0)
Expand Down Expand Up @@ -2492,17 +2500,17 @@ func (proc *Handle) transformSrcDest(
errorsPerDestID: procErrorJobsByDestID,
reportMetrics: reportMetrics,
routerDestIDs: routerDestIDs,
droppedJobs: droppedJobs,
}
}

func (proc *Handle) saveFailedJobs(failedJobs []*jobsdb.JobT) {
func (proc *Handle) saveDroppedJobs(failedJobs []*jobsdb.JobT, tx *jobsdb.Tx) error {
if len(failedJobs) > 0 {
rsourcesStats := rsources.NewFailedJobsCollector(proc.rsourcesService)
rsourcesStats.JobsFailed(failedJobs)
_ = proc.writeErrorDB.WithTx(func(tx *jobsdb.Tx) error {
return rsourcesStats.Publish(context.TODO(), tx.Tx)
})
rsourcesStats := rsources.NewDroppedJobsCollector(proc.rsourcesService)
rsourcesStats.JobsDropped(failedJobs)
return rsourcesStats.Publish(context.TODO(), tx.Tx)
}
return nil
}

func ConvertToFilteredTransformerResponse(events []transformer.TransformerEvent, filter bool) transformer.Response {
Expand Down
11 changes: 1 addition & 10 deletions processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1321,7 +1321,7 @@ var _ = Describe("Processor", Ordered, func() {
}
})

c.MockRsourcesService.EXPECT().IncrementStats(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(nil)
c.MockRsourcesService.EXPECT().IncrementStats(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2).Return(nil) // one for newly stored jobs and one for dropped jobs
c.mockArchivalDB.EXPECT().
WithStoreSafeTx(
gomock.Any(),
Expand Down Expand Up @@ -1795,11 +1795,6 @@ var _ = Describe("Processor", Ordered, func() {
StoreInTx(gomock.Any(), gomock.Any(), gomock.Any()).
AnyTimes()

// will be used to save failed events to failed keys table
c.mockWriteProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *jobsdb.Tx) error) {
_ = f(&jobsdb.Tx{})
}).Times(1)

// One Store call is expected for all events
c.mockWriteProcErrorsDB.EXPECT().Store(gomock.Any(), gomock.Any()).Times(1).
Do(func(ctx context.Context, jobs []*jobsdb.JobT) {
Expand Down Expand Up @@ -1938,10 +1933,6 @@ var _ = Describe("Processor", Ordered, func() {
assertJobStatus(unprocessedJobsList[0], statuses[0], jobsdb.Succeeded.State)
})

c.mockWriteProcErrorsDB.EXPECT().WithTx(gomock.Any()).Do(func(f func(tx *jobsdb.Tx) error) {
_ = f(&jobsdb.Tx{})
}).Return(nil).Times(1)

// One Store call is expected for all events
c.mockWriteProcErrorsDB.EXPECT().Store(gomock.Any(), gomock.Any()).Times(1).
Do(func(ctx context.Context, jobs []*jobsdb.JobT) {
Expand Down
7 changes: 7 additions & 0 deletions processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ type Metadata struct {
SourceDefinitionType string `json:"-"`
}

func (m Metadata) GetMessagesIDs() []string {
if len(m.MessageIDs) > 0 {
return m.MessageIDs
}
return []string{m.MessageID}
}

type TransformerEvent struct {
Message types.SingularEventT `json:"message"`
Metadata Metadata `json:"metadata"`
Expand Down
23 changes: 3 additions & 20 deletions services/debugger/transformation/transformationStatusUploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,8 +370,9 @@ func (h *Handle) processRecordTransformationStatus(tStatus *TransformationStatus
}

for _, failedEvent := range tStatus.FailedEvents {
if len(failedEvent.Metadata.MessageIDs) > 0 {
for _, msgID := range failedEvent.Metadata.MessageIDs {
messageIDs := failedEvent.Metadata.GetMessagesIDs()
for _, msgID := range messageIDs {
if msgID != "" {
reportedMessageIDs[msgID] = struct{}{}
singularEventWithReceivedAt := tStatus.EventsByMessageID[msgID]
eventBefore := getEventBeforeTransform(singularEventWithReceivedAt.SingularEvent, singularEventWithReceivedAt.ReceivedAt)
Expand All @@ -390,24 +391,6 @@ func (h *Handle) processRecordTransformationStatus(tStatus *TransformationStatus
IsError: true,
})
}
} else if failedEvent.Metadata.MessageID != "" {
reportedMessageIDs[failedEvent.Metadata.MessageID] = struct{}{}
singularEventWithReceivedAt := tStatus.EventsByMessageID[failedEvent.Metadata.MessageID]
eventBefore := getEventBeforeTransform(singularEventWithReceivedAt.SingularEvent, singularEventWithReceivedAt.ReceivedAt)
eventAfter := &EventsAfterTransform{
Error: failedEvent.Error,
ReceivedAt: time.Now().Format(misc.RFC3339Milli),
StatusCode: failedEvent.StatusCode,
}

h.RecordTransformationStatus(&TransformStatusT{
TransformationID: tID,
SourceID: tStatus.SourceID,
DestinationID: tStatus.DestID,
EventBefore: eventBefore,
EventsAfter: eventAfter,
IsError: true,
})
}
}

Expand Down
15 changes: 10 additions & 5 deletions services/rsources/stats_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type StatsCollector interface {
// FailedJobsStatsCollector collects stats for failed jobs
type FailedJobsStatsCollector interface {
StatsPublisher
JobsFailed(jobs []*jobsdb.JobT)
JobsDropped(jobs []*jobsdb.JobT)
}

// NewStatsCollector creates a new stats collector
Expand All @@ -58,9 +58,10 @@ func NewStatsCollector(jobservice JobService) StatsCollector {
}
}

// NewFailedJobsCollector creates a new stats collector for publishing failed job stats and records
func NewFailedJobsCollector(jobservice JobService) FailedJobsStatsCollector {
// NewDroppedJobsCollector creates a new stats collector for publishing failed job stats and records
func NewDroppedJobsCollector(jobservice JobService) FailedJobsStatsCollector {
return &statsCollector{
skipFailedRecords: true,
jobService: jobservice,
jobIdsToStatKeyIndex: map[int64]statKey{},
jobIdsToRecordIdIndex: map[int64]json.RawMessage{},
Expand All @@ -81,6 +82,7 @@ func (sk statKey) String() string {
var _ StatsCollector = (*statsCollector)(nil)

type statsCollector struct {
skipFailedRecords bool
processing bool
jobService JobService
jobIdsToStatKeyIndex map[int64]statKey
Expand Down Expand Up @@ -115,7 +117,7 @@ func (r *statsCollector) JobsStored(jobs []*jobsdb.JobT) {
r.buildStats(jobs, nil, true)
}

func (r *statsCollector) JobsFailed(jobs []*jobsdb.JobT) {
func (r *statsCollector) JobsDropped(jobs []*jobsdb.JobT) {
r.processing = true
r.buildStats(jobs, nil, true)
jobStatuses := make([]*jobsdb.JobStatusT, 0, len(jobs))
Expand Down Expand Up @@ -247,13 +249,16 @@ func (r *statsCollector) buildStats(jobs []*jobsdb.JobT, failedJobs map[uuid.UUI
if incrementIn {
stats.In++
}
r.jobIdsToStatKeyIndex[job.JobID] = sk
if r.skipFailedRecords {
continue
}
if recordId != "" && recordId != "null" && recordId != `""` {
recordIdJson := json.RawMessage(recordId)
if json.Valid(recordIdJson) {
r.jobIdsToRecordIdIndex[job.JobID] = recordIdJson
}
}
r.jobIdsToStatKeyIndex[job.JobID] = sk
}
}
}
23 changes: 3 additions & 20 deletions services/rsources/stats_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var _ = Describe("Using StatsCollector", Serial, func() {
mockCtrl = gomock.NewController(GinkgoT())
js = NewMockJobService(mockCtrl)
statsCollector = NewStatsCollector(js)
failedRecordsCollector = NewFailedJobsCollector(js)
failedRecordsCollector = NewDroppedJobsCollector(js)
jobs = []*jobsdb.JobT{}
jobErrors = map[uuid.UUID]string{}
jobStatuses = []*jobsdb.JobStatusT{}
Expand Down Expand Up @@ -340,9 +340,9 @@ var _ = Describe("Using StatsCollector", Serial, func() {
})
})

Context("it calls failedRecordsCollector.JobsFailed", func() {
Context("it calls failedRecordsCollector.JobsDropped", func() {
BeforeEach(func() {
failedRecordsCollector.JobsFailed(jobs)
failedRecordsCollector.JobsDropped(jobs)
})

It("publishes both in and out stats and adds failed records", func() {
Expand All @@ -362,23 +362,6 @@ var _ = Describe("Using StatsCollector", Serial, func() {
}).
Times(1)

failedRecords := []json.RawMessage{}
for i := 0; i < len(jobs); i++ {
failedRecords = append(failedRecords, []byte(`"recordId"`))
}
js.EXPECT().
AddFailedRecords(
gomock.Any(),
gomock.Any(),
params.JobRunID,
JobTargetKey{
TaskRunID: params.TaskRunID,
SourceID: params.SourceID,
DestinationID: params.DestinationID,
},
failedRecords).
Times(1)

err := failedRecordsCollector.Publish(context.TODO(), nil)
Expect(err).To(BeNil())
})
Expand Down

0 comments on commit 9eadca3

Please sign in to comment.