From 99299789e01192e3f40b15bc09c6d423965912c2 Mon Sep 17 00:00:00 2001 From: Andreas Karlsson Date: Fri, 8 Nov 2024 15:50:17 +0100 Subject: [PATCH] Replace LogFunc with the standard slog.Logger --- README.md | 38 ++--- benchmark_test.go | 5 +- client.go | 192 ++++++++++++++---------- examples/client/main.go | 2 - examples/server/main.go | 8 +- examples/tls-server/main.go | 5 +- logging.go | 276 +++++++++++++++++++++++------------ logging_test.go | 135 +++++++++++++---- middleware/ack.go | 16 +- middleware/ack_test.go | 4 +- middleware/panic_recovery.go | 9 +- server.go | 85 ++++++----- server_test.go | 4 +- testing.go | 13 -- 14 files changed, 499 insertions(+), 293 deletions(-) diff --git a/README.md b/README.md index 23a7e5c..4034bdf 100644 --- a/README.md +++ b/README.md @@ -88,8 +88,7 @@ can be changed by calling chainable methods. ```go server := NewServer("amqp://guest:guest@localhost:5672"). - WithDebugLogger(log.Printf). - WithErrorLogger(log.Printf). + WithLogger(logger). WithTLS(&tls.Config{}) ``` @@ -115,10 +114,10 @@ request := NewRequest(). response, err := client.Send(request) if err != nil { - log.Fatal(err.Error()) + slog.Error(err.Error()) } -log.Print(string(response.Body)) +slog.Info(string(response.Body)) ``` The client will not connect while being created, instead this happens when the @@ -139,8 +138,7 @@ Example of available methods for chaining. ```go client := NewClient("amqp://guest:guest@localhost:5672"). - WithDebugLogger(log.Printf). - WithErrorLogger(log.Printf). + WithLogger(logger). WithDialConfig(amqp.Config{}). WithTLS(&tls.Config{}). WithReplyToConsumerArgs(amqp.Table{}). @@ -399,24 +397,30 @@ server.ListenAndServe() ## Logging -You can specify two optional loggers for debugging and errors or unexpected -behaviour. By default only error logging is turned on and is logged via the log -package's standard logging. +You can specify your own `slog.Logger` instance. By default amqp-rpc will log +errors using the logger from `slog.Default()`. Some logs will contain data +contained in a `amqp.Delivery` or `amqp.Publishing`, including any headers. If +you want to avoid logging some of the fields you can use an `slog.Handler` to +filter out the fields you don't want to log. -You can provide your own logging function for both error and debug on both the -client and the server. +The library will log using two different levels: `slog.LevelDebug` and +`slog.LevelInfo`. + +If you want to use something other than `slog` for logging, you can implement a +`slog.Handler` wrapper that wraps your preferred logging implementation. ```go -debugLogger := log.New(os.Stdout, "DEBUG - ", log.LstdFlags) -errorLogger := log.New(os.Stdout, "ERROR - ", log.LstdFlags) +logger := slog.New( + slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ + Level: slog.LevelDebug, + }), +) server := NewServer(url). - WithErrorLogger(errorLogger.Printf). - WithDebugLogger(debugLogger.Printf) + WithLogger(logger) client := NewClient(url). - WithErrorLogger(errorLogger.Printf). - WithDebugLogger(debugLogger.Printf) + WithLogger(logger) ``` This is perfect when using a logger which supports debugging as a separate diff --git a/benchmark_test.go b/benchmark_test.go index 13d48b7..ce9084f 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -2,7 +2,6 @@ package amqprpc import ( "context" - "log" "testing" "time" @@ -19,13 +18,11 @@ func Benchmark(b *testing.B) { time.Sleep(1 * time.Second) confirmingClient := NewClient(testURL). - WithTimeout(3 * time.Minute). - WithErrorLogger(log.Printf) + WithTimeout(3 * time.Minute) defer confirmingClient.Stop() fastClient := NewClient(testURL). - WithErrorLogger(log.Printf). WithTimeout(3 * time.Minute). WithConfirmMode(false) diff --git a/client.go b/client.go index b867e07..3d9c7f6 100644 --- a/client.go +++ b/client.go @@ -5,7 +5,7 @@ import ( "crypto/tls" "errors" "fmt" - "log" + "log/slog" "sync/atomic" "time" @@ -109,15 +109,7 @@ type Client struct { // wantStop tells the runForever function to exit even on connection errors. wantStop int32 - // errorLog specifies an optional logger for amqp errors, unexpected - // behavior etc. If nil, logging is done via the log package's standard - // logger. - errorLog LogFunc - - // debugLog specifies an optional logger for debugging, this logger will - // print most of what is happening internally. - // If nil, logging is not done. - debugLog LogFunc + logger *slog.Logger // Sender is the main send function called after all middlewares has been // chained and called. This field can be overridden to simplify testing. @@ -152,11 +144,11 @@ func NewClient(url string) *Client { middlewares: []ClientMiddlewareFunc{}, timeout: time.Second * 10, maxRetries: 10, - errorLog: log.Printf, // use the standard logger default. - //nolint:revive // Keep variables for clarity - debugLog: func(format string, args ...interface{}) {}, // don't print anything default. } + // use the standard logger default. + c.WithLogger(slog.Default()) + c.Sender = c.send // Set default values to use when creating channels and consumers. @@ -211,16 +203,13 @@ func (c *Client) WithTLS(tlsConfig *tls.Config) *Client { return c } -// WithErrorLogger sets the logger to use for error logging. -func (c *Client) WithErrorLogger(f LogFunc) *Client { - c.errorLog = f - - return c -} - -// WithDebugLogger sets the logger to use for debug logging. -func (c *Client) WithDebugLogger(f LogFunc) *Client { - c.debugLog = f +// WithLogger sets the logger to use for error and debug logging. By default +// the library will log errors using the logger from [slog.Default]. Some logs +// will contain data contained in a [amqp.Delivery] or [amqp.Publishing], +// including any headers. If you want to avoid logging some of the fields you +// can use an [slog.Handler] to filter out the fields you don't want to log. +func (c *Client) WithLogger(logger *slog.Logger) *Client { + c.logger = logger.With("component", "amqprpc-client") return c } @@ -310,20 +299,27 @@ func (c *Client) runForever() { go func() { for { - c.debugLog("client: connecting...") + c.logger.Debug("connecting...") err := c.runOnce() if err == nil { - c.debugLog("client: finished gracefully") + c.logger.Debug("finished gracefully") break } if atomic.LoadInt32(&c.wantStop) == 1 { - c.debugLog("client: finished with error %s", err.Error()) + c.logger.Error("finished with error", + slog.Any("error", err), + ) + break } - c.errorLog("client: got error: %s, will reconnect in %v second(s)", err, 0.5) + c.logger.Error("got error: will reconnect", + slog.Any("error", err), + slog.String("eta", "0.5s"), + ) + time.Sleep(500 * time.Millisecond) } @@ -340,7 +336,7 @@ func (c *Client) runForever() { // amqp error if the underlying connection or socket isn't gracefully closed. // It will also block until the connection is gone. func (c *Client) runOnce() error { - c.debugLog("client: starting up...") + c.logger.Debug("starting up...") inputConn, outputConn, err := createConnections(c.url, c.name, c.dialconfig) if err != nil { @@ -407,7 +403,7 @@ func (c *Client) runOnce() error { // is closed for any reason, and when this happens the messages will be put back // in chan requests unless we have retried to many times. func (c *Client) runPublisher(ouputChan *amqp.Channel) { - c.debugLog("client: running publisher...") + c.logger.Debug("running publisher...") // Monitor the closing of this channel. We need to do this in a separate, // goroutine to ensure we won't get a deadlock inside the select below @@ -428,7 +424,7 @@ func (c *Client) runPublisher(ouputChan *amqp.Channel) { case <-onClose: // The channels for publishing responses was closed, once the // client has started again. This loop will be restarted. - c.debugLog("client: publisher stopped after channel was closed") + c.logger.Debug("publisher stopped after channel was closed") return case request := <-c.requests: // Set the ReplyTo if needed, or ensure it's empty if it's not. @@ -438,7 +434,9 @@ func (c *Client) runPublisher(ouputChan *amqp.Channel) { request.Publishing.ReplyTo = "" } - c.debugLog("client: publishing %s", request.Publishing.CorrelationId) + c.logger.Debug("publishing request", + slog.String("correlation_id", request.Publishing.CorrelationId), + ) // Setup the delivery tag for this request. nextDeliveryTag++ @@ -461,10 +459,10 @@ func (c *Client) runPublisher(ouputChan *amqp.Channel) { c.retryRequest(request, err) - c.errorLog( - "client: publisher stopped because of error: %s, request: %s", - err.Error(), - stringifyRequestForLog(request), + c.logger.Error( + "publisher stopped because of error", + slog.Any("error", err), + slogGroupFor("request", slogAttrsForRequest(request)), ) return @@ -492,10 +490,10 @@ func (c *Client) runPublisher(ouputChan *amqp.Channel) { func (c *Client) retryRequest(request *Request, err error) { if request.numRetries >= c.maxRetries { // We have already retried too many times - c.errorLog( - "client: could not publish, giving up: reason: %s, %s", - err.Error(), - stringifyRequestForLog(request), + c.logger.Error( + "could not publish, giving up", + slog.Any("error", err), + slogGroupFor("request", slogAttrsForRequest(request)), ) // We shouldn't wait for confirmations any more because they will never @@ -511,15 +509,17 @@ func (c *Client) retryRequest(request *Request, err error) { request.numRetries++ go func() { - c.debugLog("client: queuing request for retry: reason: %s, %s", err.Error(), stringifyRequestForLog(request)) + c.logger.Debug("queuing request for retry", + slog.Any("error", err), + slogGroupFor("request", slogAttrsForRequest(request)), + ) select { case c.requests <- request: case <-request.AfterTimeout(): - c.errorLog( - "client: request timed out while waiting for retry reason: %s, %s", - err.Error(), - stringifyRequestForLog(request), + c.logger.Debug("request timed out while waiting for retry", + slog.Any("error", err), + slogGroupFor("request", slogAttrsForRequest(request)), ) } }() @@ -541,11 +541,21 @@ func (c *Client) runConfirmsConsumer(confirms chan amqp.Confirmation, returns ch // This could happen if we stop waiting for requests to return due // to a timeout. But since returns are normally very fast that // would mean that something isn't quite right on the amqp server. - c.errorLog("client: got return for unknown request: %s", stringifyReturnForLog(ret)) + // Note: We use LogAttrs here because .Error takes a variadic + // []any slice. + c.logger.LogAttrs( + context.Background(), + slog.LevelError, + "got return for unknown request", + slogAttrsForReturn(&ret)..., + ) + continue } - c.debugLog("client: publishing is returned by server: %s", ret.CorrelationId) + c.logger.Debug("publishing is returned by server", + slog.String("correlation_id", ret.CorrelationId), + ) request.returned = &ret @@ -561,11 +571,17 @@ func (c *Client) runConfirmsConsumer(confirms chan amqp.Confirmation, returns ch // would mean that something isn't quite right on the amqp server. // Unfortunately there isn't any way of getting more information // than the delivery tag from a confirmation. - c.errorLog("client: got confirmation of unknown request: %d", confirm.DeliveryTag) + c.logger.Error( + "got confirmation of unknown request", + slog.Uint64("delivery_tag", confirm.DeliveryTag), + ) + continue } - c.debugLog("client: confirming request %s", request.Publishing.CorrelationId) + c.logger.Debug("confirming request", + slog.String("correlation_id", request.Publishing.CorrelationId), + ) c.confirmRequest(request) @@ -594,11 +610,6 @@ func (c *Client) runConfirmsConsumer(confirms chan amqp.Confirmation, returns ch if !request.Reply { // The request isn't expecting a reply so we need give a nil // response instead to signal that we're done. - c.debugLog( - "client: sending nil response after confirmation due to no reply wanted %s", - request.Publishing.CorrelationId, - ) - c.respondToRequest(request, nil) } } @@ -611,10 +622,10 @@ func (c *Client) respondToRequest(request *Request, response *amqp.Delivery) { case request.response <- response: return case <-time.After(chanSendWaitTime): - c.errorLog( - "client: nobody is waiting for response on: %s, response: %s", - stringifyRequestForLog(request), - stringifyDeliveryForLog(response), + c.logger.Error( + "nobody is waiting for response", + slogGroupFor("request", slogAttrsForRequest(request)), + slogGroupFor("delivery", slogAttrsForDelivery(response)), ) } } @@ -625,10 +636,10 @@ func (c *Client) respondErrorToRequest(request *Request, err error) { case request.errChan <- err: return case <-time.After(chanSendWaitTime): - c.errorLog( - "nobody is waiting for error on: %s, error: %s", - stringifyRequestForLog(request), - err.Error(), + c.logger.Error( + "nobody is waiting for error", + slog.Any("error", err), + slogGroupFor("request", slogAttrsForRequest(request)), ) } } @@ -640,7 +651,10 @@ func (c *Client) confirmRequest(request *Request) { case request.confirmed <- struct{}{}: return case <-time.After(chanSendWaitTime): - c.errorLog("nobody is waiting for confirmation on: %s", stringifyRequestForLog(request)) + c.logger.Error( + "nobody is waiting for confirmation", + slogGroupFor("request", slogAttrsForRequest(request)), + ) } } @@ -678,31 +692,39 @@ func (c *Client) runRepliesConsumer(inChan *amqp.Channel) error { } go func() { - c.debugLog("client: running replies consumer...") + c.logger.Debug("running replies consumer...") for response := range messages { request, ok := c.requestsMap.GetByCorrelationID(response.CorrelationId) if !ok { - c.errorLog( - "client: could not find where to reply. CorrelationId: %s", - stringifyDeliveryForLog(&response), + // The request has probably timed out between when it was in the + // queue and when the user stopped waiting for it. We can safely + // log this as a debug message. + c.logger.Debug( + "could not find where to reply", + slogGroupFor("response", slogAttrsForDelivery(&response)), ) continue } - c.debugLog("client: forwarding reply %s", response.CorrelationId) + c.logger.Debug("forwarding reply", + slog.String("correlation_id", response.CorrelationId), + ) responseCopy := response select { case request.response <- &responseCopy: case <-time.After(chanSendWaitTime): - c.errorLog("client: could not send to reply response chan: %s", stringifyRequestForLog(request)) + c.logger.Error( + "nobody is waiting on response on request", + slogGroupFor("request", slogAttrsForRequest(request)), + ) } } - c.debugLog("client: replies consumer is done") + c.logger.Debug("replies consumer is done") }() return nil @@ -746,17 +768,22 @@ func (c *Client) send(r *Request) (*amqp.Delivery, error) { r.startTimeout(c.timeout) timeoutChan := r.AfterTimeout() - c.debugLog("client: queuing request %s", r.Publishing.CorrelationId) + c.logger.Debug("queuing request", slog.String("correlation_id", r.Publishing.CorrelationId)) select { case c.requests <- r: // successful send. case <-timeoutChan: - c.debugLog("client: timeout while waiting for request queue %s", r.Publishing.CorrelationId) + c.logger.Debug("timeout while waiting for request queue %s", + slog.String("correlation_id", r.Publishing.CorrelationId), + ) + return nil, fmt.Errorf("%w while waiting for request queue", ErrRequestTimeout) } - c.debugLog("client: waiting for reply of %s", r.Publishing.CorrelationId) + c.logger.Debug("waiting for reply", + slog.String("correlation_id", r.Publishing.CorrelationId), + ) // We hang here until the request has been published (or when confirm-mode // is on; confirmed). @@ -764,7 +791,10 @@ func (c *Client) send(r *Request) (*amqp.Delivery, error) { case <-r.confirmed: // got confirmation. case <-timeoutChan: - c.debugLog("client: timeout while waiting for request confirmation %s", r.Publishing.CorrelationId) + c.logger.Debug("timeout while waiting for request confirmation", + slog.String("correlation_id", r.Publishing.CorrelationId), + ) + return nil, fmt.Errorf("%w while waiting for confirmation", ErrRequestTimeout) } @@ -772,15 +802,25 @@ func (c *Client) send(r *Request) (*amqp.Delivery, error) { // until a response is received and close the channel when it's read. select { case err := <-r.errChan: - c.debugLog("client: error for %s, %s", r.Publishing.CorrelationId, err.Error()) + c.logger.Debug("error for request", + slog.Any("error", err), + slog.String("correlation_id", r.Publishing.CorrelationId), + ) + return nil, err case <-timeoutChan: - c.debugLog("client: timeout for %s", r.Publishing.CorrelationId) + c.logger.Debug("timeout for request", + slog.String("correlation_id", r.Publishing.CorrelationId), + ) + return nil, fmt.Errorf("%w while waiting for response", ErrRequestTimeout) case delivery := <-r.response: - c.debugLog("client: got delivery for %s", r.Publishing.CorrelationId) + c.logger.Debug("got response", + slog.String("correlation_id", r.Publishing.CorrelationId), + ) + return delivery, nil } } diff --git a/examples/client/main.go b/examples/client/main.go index e4f853e..1235e84 100644 --- a/examples/client/main.go +++ b/examples/client/main.go @@ -3,7 +3,6 @@ package main import ( "bufio" "fmt" - "log" "os" "time" @@ -12,7 +11,6 @@ import ( func main() { c := amqprpc.NewClient("amqp://guest:guest@localhost:5672/") - c.WithErrorLogger(log.New(os.Stdout, "ERROR - ", log.LstdFlags).Printf) reader := bufio.NewReader(os.Stdin) diff --git a/examples/server/main.go b/examples/server/main.go index ec28f25..a14186e 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -3,7 +3,7 @@ package main import ( "context" "fmt" - "log" + "log/slog" "os" "os/signal" "strings" @@ -16,12 +16,8 @@ import ( ) func main() { - errorLogger := log.New(os.Stdout, "ERROR - ", log.LstdFlags) - s := amqprpc.NewServer("amqp://guest:guest@localhost:5672/"). - AddMiddleware(amqprpcmw.PanicRecoveryLogging(errorLogger.Printf)) - - s.WithErrorLogger(errorLogger.Printf) + AddMiddleware(amqprpcmw.PanicRecoveryLogging(slog.Default())) s.Bind(amqprpc.DirectBinding("upper", upper)) s.Bind(amqprpc.DirectBinding("beat", beat)) diff --git a/examples/tls-server/main.go b/examples/tls-server/main.go index e4d0f8b..890bd3f 100644 --- a/examples/tls-server/main.go +++ b/examples/tls-server/main.go @@ -6,9 +6,9 @@ import ( "log" "os" - amqprpc "github.com/0x4b53/amqp-rpc/v5" - amqp "github.com/rabbitmq/amqp091-go" + + amqprpc "github.com/0x4b53/amqp-rpc/v5" ) var ( @@ -24,7 +24,6 @@ func main() { s := amqprpc.NewServer(url).WithDialConfig(amqp.Config{ TLSClientConfig: cert.TLSConfig(), }) - s.WithErrorLogger(logger.Printf) s.Bind(amqprpc.DirectBinding("hello_world", handleHelloWorld)) s.Bind(amqprpc.DirectBinding("client_usage", handleClientUsage)) diff --git a/logging.go b/logging.go index b59d9ed..06daa07 100644 --- a/logging.go +++ b/logging.go @@ -1,183 +1,267 @@ package amqprpc import ( - "fmt" - "sort" - "strings" + "log/slog" amqp "github.com/rabbitmq/amqp091-go" ) -/* -LogFunc is used for logging in amqp-rpc. It makes it possible to define your own logging. - -Here is an example where the logger from the log package is used: - - debugLogger := log.New(os.Stdout, "DEBUG - ", log.LstdFlags) - errorLogger := log.New(os.Stdout, "ERROR - ", log.LstdFlags) - - server := NewServer(url) - server.WithErrorLogger(errorLogger.Printf) - server.WithDebugLogger(debugLogger.Printf) - -It can also be used with for example a Logrus logger: - - logger := logrus.New() - logger.SetLevel(logrus.DebugLevel) - logger.Formatter = &logrus.JSONFormatter{} - - s.WithErrorLogger(logger.Warnf) - s.WithDebugLogger(logger.Debugf) - - client := NewClient(url) - client.WithErrorLogger(logger.Errorf) - client.WithDebugLogger(logger.Debugf) -*/ -type LogFunc func(format string, args ...interface{}) - -func stringifyTableForLog(v amqp.Table) string { +// slogAttrsForTable returns a slice of slog.Attr from an amqp.Table. This will +// NOT filter out sensitive information. That should be done in a custom +// slog.Handler together with the users own application code. +func slogAttrsForTable(v amqp.Table) []slog.Attr { if len(v) == 0 { - return "[]" + return nil } - vals := []string{} + attrs := []slog.Attr{} for key, val := range v { if inner, ok := val.(amqp.Table); ok { - val = stringifyTableForLog(inner) - } - - strVal := fmt.Sprintf("%v", val) + attrs = append(attrs, slogGroupFor( + key, + slogAttrsForTable(inner), + )) - if strVal == "" { continue } - vals = append(vals, fmt.Sprintf("%s=%s", key, strVal)) + attrs = append(attrs, slog.Any(key, val)) } - sort.Strings(vals) - - return fmt.Sprintf("[%s]", strings.Join(vals, ", ")) + return attrs } -func stringifyDeliveryForLog(v *amqp.Delivery) string { +func slogAttrsForDelivery(v *amqp.Delivery) []slog.Attr { if v == nil { - return "[nil]" + return nil } - vals := []string{} - - if v.Exchange != "" { - vals = append(vals, fmt.Sprintf("Exchange=%s", v.Exchange)) + vals := []slog.Attr{ + slog.String("correlation_id", v.CorrelationId), + slog.String("exchange", v.Exchange), + slog.Bool("redelivered", v.Redelivered), } if v.RoutingKey != "" { - vals = append(vals, fmt.Sprintf("RoutingKey=%s", v.RoutingKey)) + vals = append(vals, slog.String("routing_key", v.RoutingKey)) + } + + if v.ContentType != "" { + vals = append(vals, slog.String("content_type", v.ContentType)) + } + + if v.ContentEncoding != "" { + vals = append(vals, slog.String("content_encoding", v.ContentEncoding)) + } + + if v.DeliveryMode != 0 { + vals = append(vals, slog.Uint64("delivery_mode", uint64(v.DeliveryMode))) + } + + if v.Priority != 0 { + vals = append(vals, slog.Uint64("priority", uint64(v.Priority))) + } + + if v.ReplyTo != "" { + vals = append(vals, slog.String("reply_to", v.ReplyTo)) + } + + if v.Expiration != "" { + vals = append(vals, slog.String("expiration", v.Expiration)) + } + + if v.MessageId != "" { + vals = append(vals, slog.String("message_id", v.MessageId)) + } + + if !v.Timestamp.IsZero() { + vals = append(vals, slog.Time("timestamp", v.Timestamp)) } if v.Type != "" { - vals = append(vals, fmt.Sprintf("Type=%s", v.Type)) + vals = append(vals, slog.String("type", v.Type)) } - if v.CorrelationId != "" { - vals = append(vals, fmt.Sprintf("CorrelationId=%s", v.CorrelationId)) + if v.UserId != "" { + vals = append(vals, slog.String("user_id", v.UserId)) } if v.AppId != "" { - vals = append(vals, fmt.Sprintf("AppId=%s", v.AppId)) + vals = append(vals, slog.String("app_id", v.AppId)) } - if v.UserId != "" { - vals = append(vals, fmt.Sprintf("UserId=%s", v.UserId)) + if v.ConsumerTag != "" { + vals = append(vals, slog.String("consumer_tag", v.ConsumerTag)) } - if len(v.Headers) > 0 { - vals = append(vals, fmt.Sprintf("Headers=%s", stringifyTableForLog(v.Headers))) + if v.DeliveryTag != 0 { + vals = append(vals, slog.Uint64("delivery_tag", v.DeliveryTag)) } - return fmt.Sprintf("[%s]", strings.Join(vals, ", ")) + if v.Headers != nil { + vals = append(vals, slogGroupFor("headers", slogAttrsForTable(v.Headers))) + } + + return vals } -func stringifyPublishingForLog(v amqp.Publishing) string { - vals := []string{} +func slogAttrsForPublishing(v *amqp.Publishing) []slog.Attr { + if v == nil { + return nil + } - if v.CorrelationId != "" { - vals = append(vals, fmt.Sprintf("CorrelationID=%s", v.CorrelationId)) + vals := []slog.Attr{ + slog.String("correlation_id", v.CorrelationId), } - if v.Type != "" { - vals = append(vals, fmt.Sprintf("Type=%s", v.Type)) + if v.ContentType != "" { + vals = append(vals, slog.String("content_type", v.ContentType)) } - if v.AppId != "" { - vals = append(vals, fmt.Sprintf("AppId=%s", v.AppId)) + if v.ContentEncoding != "" { + vals = append(vals, slog.String("content_encoding", v.ContentEncoding)) + } + + if v.DeliveryMode != 0 { + vals = append(vals, slog.Uint64("delivery_mode", uint64(v.DeliveryMode))) + } + + if v.Priority != 0 { + vals = append(vals, slog.Uint64("priority", uint64(v.Priority))) + } + + if v.ReplyTo != "" { + vals = append(vals, slog.String("reply_to", v.ReplyTo)) + } + + if v.Expiration != "" { + vals = append(vals, slog.String("expiration", v.Expiration)) + } + + if v.MessageId != "" { + vals = append(vals, slog.String("message_id", v.MessageId)) + } + + if !v.Timestamp.IsZero() { + vals = append(vals, slog.Time("timestamp", v.Timestamp)) + } + + if v.Type != "" { + vals = append(vals, slog.String("type", v.Type)) } if v.UserId != "" { - vals = append(vals, fmt.Sprintf("UserId=%s", v.UserId)) + vals = append(vals, slog.String("user_id", v.UserId)) } - if len(v.Headers) > 0 { - vals = append(vals, fmt.Sprintf("Headers=%s", stringifyTableForLog(v.Headers))) + if v.AppId != "" { + vals = append(vals, slog.String("app_id", v.AppId)) } - return fmt.Sprintf("[%s]", strings.Join(vals, ", ")) + if v.Headers != nil { + vals = append(vals, slogGroupFor("headers", slogAttrsForTable(v.Headers))) + } + + return vals } -func stringifyRequestForLog(v *Request) string { +func slogAttrsForRequest(v *Request) []slog.Attr { if v == nil { - return "[nil]" + return nil } - vals := []string{} - - if v.Exchange != "" { - vals = append(vals, fmt.Sprintf("Exchange=%s", v.Exchange)) + vals := []slog.Attr{ + slog.String("exchange", v.Exchange), + slog.Bool("Reply", v.Reply), + slogGroupFor("publishing", slogAttrsForPublishing(&v.Publishing)), } if v.RoutingKey != "" { - vals = append(vals, fmt.Sprintf("RoutingKey=%s", v.RoutingKey)) + vals = append(vals, slog.String("routing_key", v.RoutingKey)) } - vals = append(vals, fmt.Sprintf("Publishing=%s", stringifyPublishingForLog(v.Publishing))) + if v.Mandatory { + vals = append(vals, slog.Bool("mandatory", v.Mandatory)) + } + + if v.Timeout > 0 { + vals = append(vals, slog.Duration("timeout", v.Timeout)) + } - return fmt.Sprintf("[%s]", strings.Join(vals, ", ")) + return vals } -func stringifyReturnForLog(v amqp.Return) string { - vals := []string{ - fmt.Sprintf("ReplyCode=%d", v.ReplyCode), - fmt.Sprintf("ReplyText=%s", v.ReplyText), +func slogAttrsForReturn(v *amqp.Return) []slog.Attr { + if v == nil { + return nil } - if v.Exchange != "" { - vals = append(vals, fmt.Sprintf("Exchange=%s", v.Exchange)) + vals := []slog.Attr{ + slog.String("correlation_id", v.CorrelationId), + slog.Uint64("reply_code", uint64(v.ReplyCode)), + slog.String("reply_text", v.ReplyText), + slog.String("exchange", v.Exchange), } if v.RoutingKey != "" { - vals = append(vals, fmt.Sprintf("RoutingKey=%s", v.RoutingKey)) + vals = append(vals, slog.String("routing_key", v.RoutingKey)) } - if v.CorrelationId != "" { - vals = append(vals, fmt.Sprintf("CorrelationID=%s", v.CorrelationId)) + if v.ContentType != "" { + vals = append(vals, slog.String("content_type", v.ContentType)) } - if v.Type != "" { - vals = append(vals, fmt.Sprintf("Type=%s", v.Type)) + if v.ContentEncoding != "" { + vals = append(vals, slog.String("content_encoding", v.ContentEncoding)) } - if v.AppId != "" { - vals = append(vals, fmt.Sprintf("AppId=%s", v.AppId)) + if v.DeliveryMode != 0 { + vals = append(vals, slog.Uint64("delivery_mode", uint64(v.DeliveryMode))) + } + + if v.Priority != 0 { + vals = append(vals, slog.Uint64("priority", uint64(v.Priority))) + } + + if v.ReplyTo != "" { + vals = append(vals, slog.String("reply_to", v.ReplyTo)) + } + + if v.Expiration != "" { + vals = append(vals, slog.String("expiration", v.Expiration)) + } + + if v.MessageId != "" { + vals = append(vals, slog.String("message_id", v.MessageId)) + } + + if !v.Timestamp.IsZero() { + vals = append(vals, slog.Time("timestamp", v.Timestamp)) + } + + if v.Type != "" { + vals = append(vals, slog.String("type", v.Type)) } if v.UserId != "" { - vals = append(vals, fmt.Sprintf("UserId=%s", v.UserId)) + vals = append(vals, slog.String("user_id", v.UserId)) + } + + if v.AppId != "" { + vals = append(vals, slog.String("app_id", v.AppId)) } - if len(v.Headers) > 0 { - vals = append(vals, fmt.Sprintf("Headers=%s", stringifyTableForLog(v.Headers))) + if v.Headers != nil { + vals = append(vals, slogGroupFor("headers", slogAttrsForTable(v.Headers))) } - return fmt.Sprintf("[%s]", strings.Join(vals, ", ")) + return vals +} + +func slogGroupFor(key string, attrs []slog.Attr) slog.Attr { + return slog.Attr{ + Key: key, + Value: slog.GroupValue(attrs...), + } } diff --git a/logging_test.go b/logging_test.go index a4f7f67..47f0954 100644 --- a/logging_test.go +++ b/logging_test.go @@ -2,7 +2,8 @@ package amqprpc import ( "io" - "log" + "log/slog" + "slices" "testing" "time" @@ -14,11 +15,13 @@ func TestServerLogging(t *testing.T) { reader, writer := io.Pipe() go func() { - logger := log.New(writer, "TEST", log.LstdFlags) + logger := slog.New(slog.NewTextHandler(writer, &slog.HandlerOptions{ + Level: slog.LevelDebug, + })) + logger = logger.With("my_attr", "TEST") - s := NewServer(testURL) - s.WithDebugLogger(logger.Printf) - s.WithErrorLogger(logger.Printf) + s := NewServer(testURL). + WithLogger(logger) stop := startAndWait(s) stop() @@ -39,11 +42,16 @@ func TestClientLogging(t *testing.T) { reader, writer := io.Pipe() go func() { - logger := log.New(writer, "TEST", log.LstdFlags) + logger := slog.New( + slog.NewTextHandler(writer, &slog.HandlerOptions{ + Level: slog.LevelDebug, + }), + ).With( + "my_attr", "TEST", + ) c := NewClient("amqp://guest:guest@localhost:5672/") - c.WithDebugLogger(logger.Printf) - c.WithErrorLogger(logger.Printf) + c.WithLogger(logger) _, err := c.Send(NewRequest().WithRoutingKey("foobar").WithTimeout(time.Millisecond)) c.Stop() @@ -63,11 +71,12 @@ func TestClientLogging(t *testing.T) { assert.Contains(t, string(buf), "TEST", "logs are prefixed with TEST") } -func Test_stringifyForLog(t *testing.T) { +func Test_slogAttrs(t *testing.T) { headers := amqp.Table{ "foo": "bar", "nested": amqp.Table{ "baz": 13, + "apa": 13, }, } @@ -105,40 +114,114 @@ func Test_stringifyForLog(t *testing.T) { tests := []struct { name string item interface{} - want string + want []slog.Attr }{ { name: "delivery", - item: delivery, - want: "[Exchange=exchange, RoutingKey=routing_key, Type=type, CorrelationId=coorelation1, UserId=jane, Headers=[foo=bar, nested=[baz=13]]]", + item: &delivery, + want: []slog.Attr{ + slog.String("correlation_id", "coorelation1"), + slog.String("exchange", "exchange"), + slog.Bool("redelivered", false), + slog.String("routing_key", "routing_key"), + slog.String("type", "type"), + slog.String("user_id", "jane"), + slog.Group("headers", + slog.String("foo", "bar"), + slog.Group("nested", + slog.String("baz", "13"), + slog.String("apa", "13"), + ), + ), + }, }, { name: "request", - item: request, - want: "[Exchange=exchange, RoutingKey=routing_key, Publishing=[CorrelationID=coorelation1, AppId=amqprpc, UserId=jane, Headers=[foo=bar, nested=[baz=13]]]]", + item: &request, + want: []slog.Attr{ + slog.String("exchange", "exchange"), + slog.Bool("Reply", false), + slog.String("routing_key", "routing_key"), + slog.Group("publishing", + slog.String("correlation_id", "coorelation1"), + slog.String("user_id", "jane"), + slog.String("app_id", "amqprpc"), + slog.Group("headers", + slog.String("foo", "bar"), + slog.Group("nested", + slog.String("apa", "13"), + slog.String("baz", "13"), + ), + ), + ), + }, }, { name: "return", - item: ret, - want: "[ReplyCode=412, ReplyText=NO_ROUTE, Exchange=exchange, RoutingKey=routing_key, CorrelationID=coorelation1, AppId=amqprpc, UserId=jane, Headers=[foo=bar, nested=[baz=13]]]", + item: &ret, + want: []slog.Attr{ + slog.String("correlation_id", "coorelation1"), + slog.Uint64("reply_code", 412), + slog.String("reply_text", "NO_ROUTE"), + slog.String("exchange", "exchange"), + slog.String("routing_key", "routing_key"), + slog.String("app_id", "amqprpc"), + slog.String("user_id", "jane"), + slog.Group("headers", + slog.String("foo", "bar"), + slog.Group("nested", + slog.String("baz", "13"), + slog.String("apa", "13"), + ), + ), + }, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var got string + var got []slog.Attr switch v := tt.item.(type) { - case amqp.Delivery: - got = stringifyDeliveryForLog(&v) - assert.Equal(t, "[nil]", stringifyDeliveryForLog(nil)) - case amqp.Return: - got = stringifyReturnForLog(v) - case Request: - got = stringifyRequestForLog(&v) - assert.Equal(t, "[nil]", stringifyRequestForLog(nil)) + case *amqp.Delivery: + got = slogAttrsForDelivery(v) + + assert.Nil(t, slogAttrsForDelivery(nil)) + case *amqp.Return: + got = slogAttrsForReturn(v) + + assert.Nil(t, slogAttrsForDelivery(nil)) + case *Request: + got = slogAttrsForRequest(v) + + assert.Nil(t, slogAttrsForRequest(nil)) + default: + t.Fatalf("unknown type: %T", v) } - assert.Equal(t, tt.want, got) + slogAttrsSort(tt.want) + slogAttrsSort(got) + + assert.ElementsMatch(t, tt.want, got) }) } } + +func slogAttrsSort(attrs []slog.Attr) { + slices.SortFunc(attrs, func(a, b slog.Attr) int { + if a.Key < b.Key { + return -1 + } + + if a.Key > b.Key { + return 1 + } + + return 0 + }) + + for _, attr := range attrs { + if attr.Value.Kind() == slog.KindGroup { + slogAttrsSort(attr.Value.Group()) + } + } +} diff --git a/middleware/ack.go b/middleware/ack.go index 12ff55f..fe5e2dd 100644 --- a/middleware/ack.go +++ b/middleware/ack.go @@ -2,6 +2,7 @@ package middleware import ( "context" + "log/slog" amqp "github.com/rabbitmq/amqp091-go" @@ -16,9 +17,12 @@ type OnErrFunc func(err error, delivery amqp.Delivery) // returned from `Ack`. // // middleware := AckDelivery(OnAckErrorLog(log.Printf)) -func OnAckErrorLog(logFn amqprpc.LogFunc) OnErrFunc { +func OnAckErrorLog(logger *slog.Logger) OnErrFunc { return func(err error, delivery amqp.Delivery) { - logFn("could not ack delivery (%s): %v\n", delivery.CorrelationId, err) + logger.Error("could not ack delivery (%s): %v\n", + slog.Any("error", err), + slog.String("correlation_id", delivery.CorrelationId), + ) } } @@ -26,8 +30,8 @@ func OnAckErrorLog(logFn amqprpc.LogFunc) OnErrFunc { // try to send on the passed channel. If no one is consuming on the passed // channel the middleware will not block but instead log a message about missing // channel consumers. -func OnAckErrorSendOnChannel(logFn amqprpc.LogFunc, ch chan struct{}) OnErrFunc { - logErr := OnAckErrorLog(logFn) +func OnAckErrorSendOnChannel(logger *slog.Logger, ch chan struct{}) OnErrFunc { + logErr := OnAckErrorLog(logger) return func(err error, delivery amqp.Delivery) { logErr(err, delivery) @@ -35,7 +39,9 @@ func OnAckErrorSendOnChannel(logFn amqprpc.LogFunc, ch chan struct{}) OnErrFunc select { case ch <- struct{}{}: default: - logFn("ack middleware: could not send on channel, no one is consuming\n") + logger.Error("ack middleware: could not send on channel, no one is consuming", + slog.String("correlation_id", delivery.CorrelationId), + ) } } } diff --git a/middleware/ack_test.go b/middleware/ack_test.go index 7a286c1..302280d 100644 --- a/middleware/ack_test.go +++ b/middleware/ack_test.go @@ -3,7 +3,7 @@ package middleware import ( "context" "errors" - "log" + "log/slog" "sync/atomic" "testing" "time" @@ -65,7 +65,7 @@ func TestAckDelivery(t *testing.T) { // Block until ready. <-isListening - handler := AckDelivery(OnAckErrorSendOnChannel(log.Printf, ch))(tt.handler) + handler := AckDelivery(OnAckErrorSendOnChannel(slog.Default(), ch))(tt.handler) rw := amqprpc.ResponseWriter{Publishing: &amqp.Publishing{}} d := amqp.Delivery{Acknowledger: acknowledger, CorrelationId: "id-1234"} diff --git a/middleware/panic_recovery.go b/middleware/panic_recovery.go index 394b57f..e6e2bf7 100644 --- a/middleware/panic_recovery.go +++ b/middleware/panic_recovery.go @@ -2,6 +2,7 @@ package middleware import ( "context" + "log/slog" amqp "github.com/rabbitmq/amqp091-go" @@ -28,8 +29,12 @@ func PanicRecovery(onRecovery func(interface{}, context.Context, *amqprpc.Respon // PanicRecoveryLogging is a middleware that will recover any panics caused by // a handler down the middleware chain. If a panic happens the value will be // logged to the provided logFunc. -func PanicRecoveryLogging(logFunc amqprpc.LogFunc) amqprpc.ServerMiddlewareFunc { +func PanicRecoveryLogging(logger *slog.Logger) amqprpc.ServerMiddlewareFunc { return PanicRecovery(func(r interface{}, _ context.Context, _ *amqprpc.ResponseWriter, d amqp.Delivery) { - logFunc("recovered (%s): %v", d.CorrelationId, r) + logger.Error( + "panic recovered", + slog.Any("error", r), + slog.String("correlation_id", d.CorrelationId), + ) }) } diff --git a/server.go b/server.go index 76fdf5d..56c792b 100644 --- a/server.go +++ b/server.go @@ -4,7 +4,7 @@ import ( "context" "crypto/tls" "fmt" - "log" + "log/slog" "sync" "sync/atomic" "time" @@ -70,14 +70,8 @@ type Server struct { // isRunning is 1 when the server is running. isRunning int32 - // errorLog specifies an optional logger for amqp errors, unexpected behavior etc. - // If nil, logging is done via the log package's standard logger. - errorLog LogFunc - - // debugLog specifies an optional logger for debugging, this logger will - // print most of what is happening internally. - // If nil, logging is not done. - debugLog LogFunc + // logger is the logger used for logging. + logger *slog.Logger } // NewServer will return a pointer to a new Server. @@ -90,14 +84,15 @@ func NewServer(url string) *Server { dialconfig: amqp.Config{ Dial: amqp.DefaultDial(2 * time.Second), }, - errorLog: log.Printf, // use the standard logger default. //nolint:revive // Keep variables for clarity - debugLog: func(format string, args ...interface{}) {}, // don't print anything default. // We ensure to always create a channel so we can call `Restart` without // blocking. restartChan: make(chan struct{}), } + // use the standard logger default. + server.WithLogger(slog.Default()) + return &server } @@ -141,16 +136,13 @@ func (s *Server) WithTLS(tlsConfig *tls.Config) *Server { return s } -// WithErrorLogger sets the logger to use for error logging. -func (s *Server) WithErrorLogger(f LogFunc) *Server { - s.errorLog = f - - return s -} - -// WithDebugLogger sets the logger to use for debug logging. -func (s *Server) WithDebugLogger(f LogFunc) *Server { - s.debugLog = f +// WithLogger sets the logger to use for error and debug logging. By default +// the library will log errors using the logger from [slog.Default]. Some logs +// will contain data contained in a [amqp.Delivery] or [amqp.Publishing], +// including any headers. If you want to avoid logging some of the fields you +// can use an [slog.Handler] to filter out the fields you don't want to log. +func (s *Server) WithLogger(logger *slog.Logger) *Server { + s.logger = logger.With("component", "amqprpc-server") return s } @@ -220,12 +212,15 @@ func (s *Server) ListenAndServe() { // connection problems and have call Stop(). If the channel isn't // read/closed within 500ms, retry. if err != nil { - s.errorLog("server: got error: %v, will reconnect in %v second(s)", err, 0.5) + s.logger.Error("got error, will reconnect", + "error", err, + "eta", "0.5s", + ) select { case _, ok := <-s.stopChan: if !ok { - s.debugLog("server: the stopChan was triggered in a reconnect loop, exiting") + s.logger.Debug("the stopChan was triggered in a reconnect loop, exiting") break } case <-time.After(500 * time.Millisecond): @@ -239,12 +234,12 @@ func (s *Server) ListenAndServe() { // then we re-create it here. s.responses = make(chan processedRequest) - s.debugLog("server: listener restarting") + s.logger.Debug("restarting listener") continue } - s.debugLog("server: listener exiting gracefully") + s.logger.Debug("listener exiting gracefully") break } @@ -253,7 +248,7 @@ func (s *Server) ListenAndServe() { } func (s *Server) listenAndServe() (bool, error) { - s.debugLog("server: starting listener: %s", s.url) + s.logger.Debug("starting listener", slog.String("url", s.url)) // We are using two different connections here because: // "It's advisable to use separate connections for Channel.Publish and @@ -317,9 +312,9 @@ func (s *Server) listenAndServe() (bool, error) { } if shouldRestart { - s.debugLog("server: restarting server") + s.logger.Debug("restarting server") } else { - s.debugLog("server: gracefully shutting down") + s.logger.Debug("gracefully shutting down") } // 1. Tell amqp we want to shut down by canceling all the consumers. @@ -407,7 +402,10 @@ func (s *Server) runHandler( wg.Add(1) defer wg.Done() - s.debugLog("server: waiting for messages on queue '%s'", queueName) + s.logger.Debug( + "waiting for messages on queue", + slog.String("queue", queueName), + ) for delivery := range deliveries { // Add one delta to the wait group each time a delivery is handled so @@ -416,7 +414,11 @@ func (s *Server) runHandler( // a delivery is finished even though we handle them concurrently. wg.Add(1) - s.debugLog("server: got delivery on queue %v correlation id %v", queueName, delivery.CorrelationId) + s.logger.Debug( + "got delivery", + slog.String("queue", queueName), + slog.String("correlation_id", delivery.CorrelationId), + ) rw := ResponseWriter{ Publishing: &amqp.Publishing{ @@ -444,7 +446,10 @@ func (s *Server) runHandler( }(delivery) } - s.debugLog("server: stopped waiting for messages on queue '%s'", queueName) + s.logger.Debug( + "stopped waiting for messages on queue", + slog.String("queue", queueName), + ) } func (s *Server) responder(outCh *amqp.Channel, wg *sync.WaitGroup) { @@ -452,9 +457,10 @@ func (s *Server) responder(outCh *amqp.Channel, wg *sync.WaitGroup) { defer wg.Done() for response := range s.responses { - s.debugLog( - "server: publishing response to %s, correlation id: %s", - response.replyTo, response.publishing.CorrelationId, + s.logger.Debug( + "publishing response", + slog.String("reply_to", response.replyTo), + slog.String("correlation_id", response.replyTo), ) err := outCh.PublishWithContext( @@ -471,10 +477,13 @@ func (s *Server) responder(outCh *amqp.Channel, wg *sync.WaitGroup) { // We resend the response here so that other running goroutines // that have a working outCh can pick up this response. - s.errorLog( - "server: retrying publishing response to %s, reason: %s, response: %s", - response.replyTo, err.Error(), stringifyPublishingForLog(response.publishing), + s.logger.Error( + "retrying publishing response", + slog.Any("error", err), + slog.String("reply_to", response.replyTo), + slogGroupFor("publishing", slogAttrsForPublishing(&response.publishing)), ) + s.responses <- response return @@ -512,7 +521,7 @@ func (s *Server) Restart() { select { case s.restartChan <- struct{}{}: default: - s.debugLog("server: no listener on restartChan, ensure server is running") + s.logger.Debug("no listener on restartChan, ensure server is running") } } diff --git a/server_test.go b/server_test.go index ca86f8f..c5091af 100644 --- a/server_test.go +++ b/server_test.go @@ -3,7 +3,6 @@ package amqprpc import ( "context" "fmt" - "log" "testing" "time" @@ -154,8 +153,7 @@ func TestManualRestart(t *testing.T) { restartChan := make(chan struct{}) s := NewServer(testURL). - WithRestartChan(restartChan). - WithDebugLogger(log.Printf) + WithRestartChan(restartChan) s.OnStarted(func(_, _ *amqp.Connection, _, _ *amqp.Channel) { hasStarted <- struct{}{} diff --git a/testing.go b/testing.go index 91bcc8f..dc06e94 100644 --- a/testing.go +++ b/testing.go @@ -186,8 +186,6 @@ func closeConnections(names ...string) { func testServer() *Server { server := NewServer(testURL). - WithDebugLogger(testLogFunc("debug")). - WithErrorLogger(testLogFunc("ERROR")). WithDialConfig( amqp.Config{ Properties: amqp.Table{ @@ -205,8 +203,6 @@ func testServer() *Server { func testClient() *Client { return NewClient(testURL). - WithDebugLogger(testLogFunc("debug")). - WithErrorLogger(testLogFunc("ERROR")). WithDialConfig( amqp.Config{ Properties: amqp.Table{ @@ -245,12 +241,3 @@ func initTest(t *testing.T) (server *Server, client *Client, start, stop func()) return } - -func testLogFunc(prefix string) LogFunc { - startTime := time.Now() - - return func(format string, args ...interface{}) { - format = fmt.Sprintf("[%s] %s: %s\n", prefix, time.Since(startTime).String(), format) - fmt.Printf(format, args...) - } -}