Skip to content

Commit

Permalink
Merge pull request #448 from Scalingo/feat/447/use_root_context_to_lo…
Browse files Browse the repository at this point in the history
…g_error

feat(nsqconsumer): log the error by using root context (ErrCtx)
  • Loading branch information
Soulou authored Jan 10, 2023
2 parents 0ca6fcb + d2059a5 commit b26db73
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 70 deletions.
1 change: 1 addition & 0 deletions nsqconsumer/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## To be Released

* feat: Start unwraps errors to find noRetry field that can be wrapped in ErrCtx. Also use ErrCtx to enrich the logger
* build(deps): bump github.com/Scalingo/go-utils/logger from 1.1.1 to 1.2.0
* build(deps): bump github.com/sirupsen/logrus from 1.8.1 to 1.9.0

Expand Down
151 changes: 86 additions & 65 deletions nsqconsumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/stvp/rollbar"
"gopkg.in/errgo.v1"

scalingoerrors "github.com/Scalingo/go-utils/errors/v2"
"github.com/Scalingo/go-utils/logger"
"github.com/Scalingo/go-utils/nsqproducer"
)
Expand Down Expand Up @@ -167,84 +168,104 @@ func (c *nsqConsumer) Start(ctx context.Context) func() {

consumer.SetLogger(log.New(os.Stderr, fmt.Sprintf("[nsq-consumer(%s)]", c.Topic), log.Flags()), nsq.LogLevelWarning)

consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(message *nsq.Message) (err error) {
defer func() {
if r := recover(); r != nil {
var errRecovered error
switch value := errRecovered.(type) {
case error:
errRecovered = value
default:
errRecovered = errgo.Newf("%v", value)
}
err = errgo.Newf("recover panic from nsq consumer: %+v", errRecovered)
c.logger.WithError(errRecovered).WithFields(logrus.Fields{"stacktrace": string(debug.Stack())}).Error("recover panic")
}
}()
consumer.AddConcurrentHandlers(nsq.HandlerFunc(c.nsqHandler), c.MaxInFlight)

if len(message.Body) == 0 {
err := errgo.New("body is blank, re-enqueued message")
c.logger.WithError(err).Error("blank message")
return err
}
var msg NsqMessageDeserialize
err = json.Unmarshal(message.Body, &msg)
if err != nil {
c.logger.WithError(err).Error("failed to unmarshal message")
return err
}
msg.NsqMsg = message

msgLogger := c.logger.WithFields(logrus.Fields{
"message_id": fmt.Sprintf("%s", message.ID),
"message_type": msg.Type,
"request_id": msg.RequestID,
})

ctx := context.WithValue(
context.WithValue(
context.Background(), "request_id", msg.RequestID,
), "logger", msgLogger,
)

if msg.At != 0 {
now := time.Now().Unix()
delay := msg.At - now
if delay > 0 {
return c.postponeMessage(ctx, msgLogger, msg, delay)
err = consumer.ConnectToNSQLookupds(c.NsqLookupdURLs)
if err != nil {
c.logger.WithError(err).Error("Fail to connect to NSQ lookupd")
os.Exit(1)
}

return func() {
consumer.Stop()
// block until stop process is complete
<-consumer.StopChan
}
}

func (c *nsqConsumer) nsqHandler(message *nsq.Message) (err error) {
defer func() {
if r := recover(); r != nil {
var errRecovered error
switch value := errRecovered.(type) {
case error:
errRecovered = value
default:
errRecovered = errgo.Newf("%v", value)
}
err = errgo.Newf("recover panic from nsq consumer: %+v", errRecovered)
c.logger.WithError(errRecovered).WithFields(logrus.Fields{"stacktrace": string(debug.Stack())}).Error("recover panic")
}
}()

before := time.Now()
if _, ok := c.SkipLogSet[msg.Type]; !ok {
msgLogger.Info("BEGIN Message")
if len(message.Body) == 0 {
err := errgo.New("body is blank, re-enqueued message")
c.logger.WithError(err).Error("blank message")
return err
}
var msg NsqMessageDeserialize
err = json.Unmarshal(message.Body, &msg)
if err != nil {
c.logger.WithError(err).Error("Fail to unmarshal message")
return err
}
msg.NsqMsg = message

msgLogger := c.logger.WithFields(logrus.Fields{
"message_id": fmt.Sprintf("%s", message.ID),
"message_type": msg.Type,
"request_id": msg.RequestID,
})

// Ignore linter here due to the usage of string as keys in context.
//nolint:staticcheck,revive
ctx := context.WithValue(context.WithValue(context.Background(), "request_id", msg.RequestID), "logger", msgLogger)

if msg.At != 0 {
now := time.Now().Unix()
delay := msg.At - now
if delay > 0 {
return c.postponeMessage(ctx, msgLogger, msg, delay)
}
}

before := time.Now()
if _, ok := c.SkipLogSet[msg.Type]; !ok {
msgLogger.Info("BEGIN Message")
}

err = c.MessageHandler(ctx, &msg)
if err != nil {
if nsqerr, ok := err.(Error); ok && nsqerr.noRetry {
msgLogger.WithError(err).Error("message handling error - noretry")
return nil
err = c.MessageHandler(ctx, &msg)
if err != nil {
var errLogger logrus.FieldLogger
noRetry := false

unwrapErr := err
for unwrapErr != nil {
switch handlerErr := unwrapErr.(type) {
case scalingoerrors.ErrCtx:
errLogger = logger.Get(handlerErr.Ctx())
case Error:
noRetry = handlerErr.noRetry
unwrapErr = handlerErr.error
}
msgLogger.WithError(err).Error("message handling error")
return err
unwrapErr = scalingoerrors.UnwrapError(unwrapErr)
}

if _, ok := c.SkipLogSet[msg.Type]; !ok {
msgLogger.WithField("duration", time.Since(before)).Info("END Message")
if errLogger == nil {
errLogger = msgLogger
}
return nil
}), c.MaxInFlight)

if err = consumer.ConnectToNSQLookupds(c.NsqLookupdURLs); err != nil {
c.logger.WithError(err).Fatalf("Fail to connect to NSQ lookupd")
if noRetry {
errLogger.WithError(err).Error("Message handling error - noretry")
return nil
}
errLogger.WithError(err).Error("Message handling error")
return err
}

return func() {
consumer.Stop()
// block until stop process is complete
<-consumer.StopChan
if _, ok := c.SkipLogSet[msg.Type]; !ok {
msgLogger.WithField("duration", time.Since(before)).Info("END Message")
}
return nil
}

func (c *nsqConsumer) postponeMessage(ctx context.Context, msgLogger logrus.FieldLogger, msg NsqMessageDeserialize, delay int64) error {
Expand Down
5 changes: 3 additions & 2 deletions nsqconsumer/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/Scalingo/go-utils/nsqconsumer
go 1.17

require (
github.com/Scalingo/go-utils/errors/v2 v2.2.0
github.com/Scalingo/go-utils/logger v1.2.0
github.com/Scalingo/go-utils/nsqproducer v1.1.2
github.com/nsqio/go-nsq v1.1.0
Expand All @@ -13,10 +14,10 @@ require (

require (
github.com/Scalingo/go-utils/env v1.1.0 // indirect
github.com/gofrs/uuid v4.2.0+incompatible // indirect
github.com/gofrs/uuid v4.3.1+incompatible // indirect
github.com/golang/snappy v0.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
golang.org/x/sys v0.3.0 // indirect
)

// In Dev you can uncomment the following line to use the local packages
Expand Down
12 changes: 9 additions & 3 deletions nsqconsumer/go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
github.com/Scalingo/errgo-rollbar v0.2.0/go.mod h1:InPX+PbaicLpePiWdmay2bJ4gtF4anm2L8T6rITx41A=
github.com/Scalingo/go-utils/env v1.1.0 h1:F6MBxtGJrc4SXF18Fqie6Lpvkv7JygJfr1WSRGitst0=
github.com/Scalingo/go-utils/env v1.1.0/go.mod h1:pYJjGXhXrRldYvXj9vt334BJWRsGup4Uw3XWqp2ADRI=
github.com/Scalingo/go-utils/errors/v2 v2.2.0 h1:n93hge0DzfZ3KbI/jdnxKDTRDD+PXsGwNPKyHRzQYEE=
github.com/Scalingo/go-utils/errors/v2 v2.2.0/go.mod h1:pkLy6Qz9UNm6FpXtFJGZRC0W5lqbqHpPchrQV80gw5E=
github.com/Scalingo/go-utils/logger v1.1.0/go.mod h1:ZJLfkPJ4kfTAliBBAka0IH06+SS+LmRetKuw00WAe+s=
github.com/Scalingo/go-utils/logger v1.2.0 h1:E3jtaoRxpIsFcZu/jsvWew8ttUAwKUYQufdPqGYp7EU=
github.com/Scalingo/go-utils/logger v1.2.0/go.mod h1:JArjD1gHdB/vwnlcVG7rYxuIY0tk8/VG4MtirnRwn8k=
Expand All @@ -13,8 +15,9 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/frankban/quicktest v1.2.2 h1:xfmOhhoH5fGPgbEAlhLpJH9p0z/0Qizio9osmvn9IUY=
github.com/frankban/quicktest v1.2.2/go.mod h1:Qh/WofXFeiAFII1aEBu529AtJo6Zg2VHscnEsbBnJ20=
github.com/gofrs/uuid v4.2.0+incompatible h1:yyYWMnhkhrKwwr8gAOcOCYxOOscHgDS9yZgBrnJfGa0=
github.com/gofrs/uuid v4.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gofrs/uuid v4.3.1+incompatible h1:0/KbAdpx3UXAx1kEOWHJeOkpbgRFGHVgv+CFIY7dBJI=
github.com/gofrs/uuid v4.3.1+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.2 h1:aeE13tS0IiQgFjYdoL8qN3K1N2bXXtI6Vi51/y7BpMw=
Expand All @@ -41,11 +44,13 @@ github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stvp/rollbar v0.5.1 h1:qvyWbd0RNL5V27MBumqCXlcU7ohmHeEtKX+Czc8oeuw=
github.com/stvp/rollbar v0.5.1/go.mod h1:/fyFC854GgkbHRz/rSsiYc6h84o0G5hxBezoQqRK7Ho=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
Expand All @@ -64,8 +69,9 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211020174200-9d6173849985/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand Down

0 comments on commit b26db73

Please sign in to comment.