Skip to content

Commit

Permalink
fix(batchrouter): concurrent modification of job parameters causes pa…
Browse files Browse the repository at this point in the history
…nic (#2631)
  • Loading branch information
atzoum authored Oct 31, 2022
1 parent 37d32f1 commit 79e3e34
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 58 deletions.
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.0.0-20220926161630-eccd6366d1be // indirect
golang.org/x/exp v0.0.0-20210220032938-85be41e4509f // indirect
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect
golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
Expand All @@ -213,3 +213,5 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/pelletier/go-toml/v2 v2.0.5 // indirect
)

require github.com/samber/lo v1.33.0
12 changes: 4 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ contrib.go.opencensus.io/exporter/aws v0.0.0-20200617204711-c478e41e60e9/go.mod
contrib.go.opencensus.io/exporter/stackdriver v0.13.10/go.mod h1:I5htMbyta491eUxufwwZPQdcKvvgzMB4O9ni41YnIM8=
contrib.go.opencensus.io/integrations/ocsql v0.1.7/go.mod h1:8DsSdjz3F+APR+0z0WkU1aRorQCFfRxvqjUUPMbF3fE=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
github.com/Azure/azure-amqp-common-go/v3 v3.2.1/go.mod h1:O6X1iYHP7s2x7NjUKsXVhkwWrQhxrd+d8/3rRadj4CI=
github.com/Azure/azure-amqp-common-go/v3 v3.2.2/go.mod h1:O6X1iYHP7s2x7NjUKsXVhkwWrQhxrd+d8/3rRadj4CI=
Expand Down Expand Up @@ -906,6 +905,8 @@ github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w=
github.com/samber/lo v1.33.0 h1:2aKucr+rQV6gHpY3bpeZu69uYoQOzVhGT3J22Op6Cjk=
github.com/samber/lo v1.33.0/go.mod h1:HLeWcJRRyLKp3+/XBJvOrerCQn9mhdKMHyd7IRlgeQ8=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/seccomp/libseccomp-golang v0.9.2-0.20220502022130-f33da4d89646/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg=
github.com/segmentio/backo-go v0.0.0-20160424052352-204274ad699c h1:rsRTAcCR5CeNLkvgBVSjQoDGRRt6kggsE6XYBqCv2KQ=
Expand Down Expand Up @@ -1084,7 +1085,6 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
golang.org/x/exp v0.0.0-20190731235908-ec7cb31e5a56/go.mod h1:JhuoJpWY28nO4Vef9tZUw9qufEGTyX1+7lmHxV5q5G4=
golang.org/x/exp v0.0.0-20190829153037-c13cbed26979/go.mod h1:86+5VVa7VpoJ4kLfm080zCjGlMRFzhUhsZKEZO7MGek=
golang.org/x/exp v0.0.0-20191002040644-a1355ae1e2c3/go.mod h1:NOZ3BPKG0ec/BKJQgnvsSFpcKLM5xXVWnvZS97DWHgE=
golang.org/x/exp v0.0.0-20191030013958-a1ab85dbe136/go.mod h1:JXzH8nQsPlswgeRAPE3MuO9GYsAcnJvJ4vnMwN/5qkY=
Expand All @@ -1093,8 +1093,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20210220032938-85be41e4509f h1:GrkO5AtFUU9U/1f5ctbIBXtBGeSJbWwIYfIsTcFMaX4=
golang.org/x/exp v0.0.0-20210220032938-85be41e4509f/go.mod h1:I6l2HNBLBZEcrOoCpyKLdY2lHoRZ8lI4x60KMCQDft4=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
Expand All @@ -1118,15 +1118,12 @@ golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPI
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
golang.org/x/mobile v0.0.0-20201217150744-e6ae53a27f4f/go.mod h1:skQtrUTUwhdJvXM/2KKJzY8pDgNr9I/FOMqDVRPBUS4=
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.1.1-0.20191209134235-331c550502dd/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.1-0.20200828183125-ce943fd02449/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
Expand Down Expand Up @@ -1326,7 +1323,6 @@ golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191216173652-a0e659d51361/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200117012304-6edc0a871e69/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200117161641-43d50277825c/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200122220014-bf1340f18c4a/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
Expand Down
68 changes: 20 additions & 48 deletions router/batchrouter/batchrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"time"

jsoniter "github.com/json-iterator/go"
"github.com/samber/lo"
"github.com/thoas/go-funk"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -1212,7 +1213,7 @@ func (brt *HandleT) setJobStatus(batchJobs *BatchJobsT, isWarehouse bool, errOcc
}

var parameters JobParametersT
err = tryUnmarshalJSON(job.JobID, job.Parameters, &parameters)
err = json.Unmarshal(job.Parameters, &parameters)
if err != nil {
brt.logger.Error("Unmarshal of job parameters failed. ", string(job.Parameters))
}
Expand Down Expand Up @@ -1660,18 +1661,7 @@ func (worker *workerT) workerProcess() {
brt.logger.Debugf("BRT: %s: DB Read Complete for parameter Filters: %v retryList: %v, unprocessedList: %v, total: %v", brt.destType, parameterFilters, len(toRetry.Jobs), len(combinedList)-len(toRetry.Jobs), len(combinedList))
}
} else {
for _, job := range batchDestData.jobs {
var parameters JobParametersT
err := tryUnmarshalJSON(job.JobID, job.Parameters, &parameters)
if err != nil {
worker.brt.logger.Error("BRT: %s: Unmarshal of job parameters failed. ", worker.brt.destType, string(job.Parameters))
}

if parameters.DestinationID == batchDest.Destination.ID {
combinedList = append(combinedList, job)
}
}

combinedList = batchDestData.jobs
brt.logger.Debugf("BRT: %s: length of jobs for destination id: %s is %d", worker.brt.destType, batchDest.Destination.ID, len(combinedList))
}

Expand Down Expand Up @@ -1930,12 +1920,8 @@ func (*HandleT) getNamespace(config interface{}, source backendconfig.SourceT, d

func (brt *HandleT) isDestInProgress(destID string) bool {
brt.inProgressMapLock.RLock()
if brt.inProgressMap[destID] {
brt.inProgressMapLock.RUnlock()
return true
}
brt.inProgressMapLock.RUnlock()
return false
defer brt.inProgressMapLock.RUnlock()
return brt.inProgressMap[destID]
}

func (brt *HandleT) setDestInProgress(destID string, starting bool) {
Expand Down Expand Up @@ -2023,14 +2009,23 @@ func (brt *HandleT) readAndProcess() {
brtQueryStat.End()
brt.logger.Debugf("BRT: %s: Length of jobs received: %d", brt.destType, len(jobs))

jobsByDesID := lo.GroupBy(jobs, func(job *jobsdb.JobT) string {
var parameters JobParametersT
if json.Unmarshal(job.Parameters, &parameters) != nil {
brt.logger.Error("BRT: %s: Unmarshal of job parameters failed. ", brt.destType, string(job.Parameters))
}
return parameters.DestinationID
})
var wg sync.WaitGroup
for destID, batchDest := range destinationsMap {
brt.setDestInProgress(destID, true)

wg.Add(1)
brt.processQ <- &BatchDestinationDataT{batchDestination: *batchDest, jobs: jobs, parentWG: &wg}
for destID, destJobs := range jobsByDesID {
if batchDest, ok := destinationsMap[destID]; ok {
brt.setDestInProgress(destID, true)
wg.Add(1)
brt.processQ <- &BatchDestinationDataT{batchDestination: *batchDest, jobs: destJobs, parentWG: &wg}
} else {
brt.logger.Errorf("BRT: %s: Destination %s not found in destinationsMap", brt.destType, destID)
}
}

wg.Wait()
}
}
Expand Down Expand Up @@ -2434,26 +2429,3 @@ func (brt *HandleT) updateProcessedEventsMetrics(statusList []*jobsdb.JobStatusT
}
}
}

func tryUnmarshalJSON(jobID int64, data []byte, v interface{}) (err error) {
c := cap(data)
l := len(data)

startingData := make([]byte, l)
copy(startingData, data)

defer func() {
if r := recover(); r != nil {
pkgLogger.Warnf(
"Panic while unmarshalling json (%d): starting slice [%d:%d]: ending slice: [%d:%d]: %v",
jobID, l, c, len(data), cap(data), r,
)
pkgLogger.Warnf("Starting data before unmarshalling json panic: %s", startingData)
pkgLogger.Warnf("Ending data after unmarshalling json panic: %s", data)
panic(r)
}
}()

err = json.Unmarshal(data, v)
return
}
2 changes: 1 addition & 1 deletion router/batchrouter/batchrouterBenchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func Benchmark_JSONUnmarshal(b *testing.B) {
g.Go(func() error {
for i := range jobs {
var params JobParametersT
_ = tryUnmarshalJSON(jobs[i].JobID, jobs[i].Parameters, &params)
_ = json.Unmarshal(jobs[i].Parameters, &params)
}

return nil
Expand Down

0 comments on commit 79e3e34

Please sign in to comment.