Skip to content

Commit

Permalink
chore: improve courier logging (#3943)
Browse files Browse the repository at this point in the history
  • Loading branch information
aeneasr committed Jun 3, 2024
1 parent 050a4dc commit fbbac77
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 66 deletions.
55 changes: 25 additions & 30 deletions courier/courier_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ import (
)

func (c *courier) DispatchMessage(ctx context.Context, msg Message) error {
logger := c.deps.Logger().
WithField("message_id", msg.ID).
WithField("message_nid", msg.NID).
WithField("message_type", msg.Type).
WithField("message_template_type", msg.TemplateType).
WithField("message_subject", msg.Subject)

if err := c.deps.CourierPersister().IncrementMessageSendCount(ctx, msg.ID); err != nil {
c.deps.Logger().
logger.
WithError(err).
WithField("message_id", msg.ID).
WithField("message_nid", msg.NID).
Error(`Unable to increment the message's "send_count" field`)
return err
}
Expand All @@ -24,28 +29,21 @@ func (c *courier) DispatchMessage(ctx context.Context, msg Message) error {
return errors.Errorf("message %s has unknown channel %q", msg.ID.String(), msg.Channel)
}

logger = logger.
WithField("channel", channel.ID())

if err := channel.Dispatch(ctx, msg); err != nil {
return err
}

if err := c.deps.CourierPersister().SetMessageStatus(ctx, msg.ID, MessageStatusSent); err != nil {
c.deps.Logger().
logger.
WithError(err).
WithField("message_id", msg.ID).
WithField("message_nid", msg.NID).
WithField("channel", channel.ID()).
Error(`Unable to set the message status to "sent".`)
return err
}

c.deps.Logger().
WithField("message_id", msg.ID).
WithField("message_nid", msg.NID).
WithField("message_type", msg.Type).
WithField("message_template_type", msg.TemplateType).
WithField("message_subject", msg.Subject).
WithField("channel", channel.ID()).
Debug("Courier sent out message.")
logger.Debug("Courier sent out message.")

return nil
}
Expand All @@ -63,27 +61,28 @@ func (c *courier) DispatchQueue(ctx context.Context) error {
}

for k, msg := range messages {
logger := c.deps.Logger().
WithField("message_id", msg.ID).
WithField("message_nid", msg.NID).
WithField("message_type", msg.Type).
WithField("message_template_type", msg.TemplateType).
WithField("message_subject", msg.Subject)

if msg.SendCount > maxRetries {
if err := c.deps.CourierPersister().SetMessageStatus(ctx, msg.ID, MessageStatusAbandoned); err != nil {
c.deps.Logger().
logger.
WithError(err).
WithField("message_id", msg.ID).
WithField("message_nid", msg.NID).
Error(`Unable to set the retried message's status to "abandoned".`)
return err
}

// Skip the message
c.deps.Logger().
WithField("message_id", msg.ID).
WithField("message_nid", msg.NID).
logger.
Warnf(`Message was abandoned because it did not deliver after %d attempts`, msg.SendCount)
} else if err := c.DispatchMessage(ctx, msg); err != nil {
if err := c.deps.CourierPersister().RecordDispatch(ctx, msg.ID, CourierMessageDispatchStatusFailed, err); err != nil {
c.deps.Logger().
logger.
WithError(err).
WithField("message_id", msg.ID).
WithField("message_nid", msg.NID).
Error(`Unable to record failure log entry.`)
if c.failOnDispatchError {
return err
Expand All @@ -92,10 +91,8 @@ func (c *courier) DispatchQueue(ctx context.Context) error {

for _, replace := range messages[k:] {
if err := c.deps.CourierPersister().SetMessageStatus(ctx, replace.ID, MessageStatusQueued); err != nil {
c.deps.Logger().
logger.
WithError(err).
WithField("message_id", replace.ID).
WithField("message_nid", replace.NID).
Error(`Unable to reset the failed message's status to "queued".`)
if c.failOnDispatchError {
return err
Expand All @@ -107,10 +104,8 @@ func (c *courier) DispatchQueue(ctx context.Context) error {
return err
}
} else if err := c.deps.CourierPersister().RecordDispatch(ctx, msg.ID, CourierMessageDispatchStatusSuccess, nil); err != nil {
c.deps.Logger().
logger.
WithError(err).
WithField("message_id", msg.ID).
WithField("message_nid", msg.NID).
Error(`Unable to record success log entry.`)
// continue with execution, as the message was successfully dispatched
}
Expand Down
23 changes: 12 additions & 11 deletions courier/http_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"encoding/json"
"fmt"

"github.com/tidwall/gjson"

"github.com/pkg/errors"

"github.com/ory/kratos/courier/template"
Expand Down Expand Up @@ -89,25 +91,24 @@ func (c *httpChannel) Dispatch(ctx context.Context, msg Message) (err error) {
return errors.WithStack(err)
}

logger := c.d.Logger().
WithField("http_server", gjson.GetBytes(c.requestConfig, "url").String()).
WithField("message_id", msg.ID).
WithField("message_nid", msg.NID).
WithField("message_type", msg.Type).
WithField("message_template_type", msg.TemplateType).
WithField("message_subject", msg.Subject)

if res.StatusCode >= 200 && res.StatusCode < 300 {
c.d.Logger().
WithField("message_id", msg.ID).
WithField("message_type", msg.Type).
WithField("message_template_type", msg.TemplateType).
WithField("message_subject", msg.Subject).
Debug("Courier sent out mailer.")
logger.Debug("Courier sent out mailer.")
return nil
}

err = errors.Errorf(
"unable to dispatch mail delivery because upstream server replied with status code %d",
res.StatusCode,
)
c.d.Logger().
WithField("message_id", msg.ID).
WithField("message_type", msg.Type).
WithField("message_template_type", msg.TemplateType).
WithField("message_subject", msg.Subject).
logger.
WithError(err).
Error("sending mail via HTTP failed.")
return errors.WithStack(err)
Expand Down
45 changes: 20 additions & 25 deletions courier/smtp_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ func (c *SMTPChannel) Dispatch(ctx context.Context, msg Message) error {
}
}

if cfg == nil {
return errors.WithStack(herodot.ErrInternalServerError.WithErrorf("Courier tried to deliver an email but SMTP channel is misconfigured."))
}

gm := mail.NewMessage()
if cfg.FromName == "" {
gm.SetHeader("From", cfg.FromAddress)
Expand All @@ -82,31 +86,30 @@ func (c *SMTPChannel) Dispatch(ctx context.Context, msg Message) error {

gm.SetBody("text/plain", msg.Body)

logger := c.d.Logger().
WithField("smtp_server", fmt.Sprintf("%s:%d", c.smtpClient.Host, c.smtpClient.Port)).
WithField("smtp_ssl_enabled", c.smtpClient.SSL).
WithField("message_from", cfg.FromAddress).
WithField("message_id", msg.ID).
WithField("message_nid", msg.NID).
WithField("message_type", msg.Type).
WithField("message_template_type", msg.TemplateType).
WithField("message_subject", msg.Subject)

tmpl, err := c.newEmailTemplateFromMessage(c.d, msg)
if err != nil {
c.d.Logger().
WithError(err).
WithField("message_id", msg.ID).
WithField("message_nid", msg.NID).
Error(`Unable to get email template from message.`)
logger.
WithError(err).Error(`Unable to get email template from message.`)
} else if htmlBody, err := tmpl.EmailBody(ctx); err != nil {
c.d.Logger().
WithError(err).
WithField("message_id", msg.ID).
WithField("message_nid", msg.NID).
Error(`Unable to get email body from template.`)
logger.
WithError(err).Error(`Unable to get email body from template.`)
} else {
gm.AddAlternative("text/html", htmlBody)
}

if err := c.smtpClient.DialAndSend(ctx, gm); err != nil {
c.d.Logger().
WithError(err).
WithField("smtp_server", fmt.Sprintf("%s:%d", c.smtpClient.Host, c.smtpClient.Port)).
WithField("smtp_ssl_enabled", c.smtpClient.SSL).
WithField("message_from", cfg.FromAddress).
WithField("message_id", msg.ID).
WithField("message_nid", msg.NID).
Error("Unable to send email using SMTP connection.")

var protoErr *textproto.Error
Expand All @@ -119,10 +122,8 @@ func (c *SMTPChannel) Dispatch(ctx context.Context, msg Message) error {
// See https://en.wikipedia.org/wiki/List_of_SMTP_server_return_codes
// If the SMTP server responds with 5xx, sending the message should not be retried (without changing something about the request)
if err := c.d.CourierPersister().SetMessageStatus(ctx, msg.ID, MessageStatusAbandoned); err != nil {
c.d.Logger().
logger.
WithError(err).
WithField("message_id", msg.ID).
WithField("message_nid", msg.NID).
Error(`Unable to reset the retried message's status to "abandoned".`)
return err
}
Expand All @@ -132,13 +133,7 @@ func (c *SMTPChannel) Dispatch(ctx context.Context, msg Message) error {
WithError(err.Error()).WithReason("failed to send email via smtp"))
}

c.d.Logger().
WithField("message_id", msg.ID).
WithField("message_nid", msg.NID).
WithField("message_type", msg.Type).
WithField("message_template_type", msg.TemplateType).
WithField("message_subject", msg.Subject).
Debug("Courier sent out message.")
logger.Debug("Courier sent out message.")

return nil
}

0 comments on commit fbbac77

Please sign in to comment.