Skip to content
This repository has been archived by the owner on Dec 14, 2020. It is now read-only.

Commit

Permalink
Improved debug logging.
Browse files Browse the repository at this point in the history
Added connection id (pointer) and channel for debugging multi-connection/session programs.

Centralized scattered debug logic to fix missing/duplicate log entries.

Removed (Session) and (Link) decorations - these can be deduced from the frame type.

Don't print null fields in log output - reduce distracting clutter.

Signed-off-by: Alan Conway <aconway@redhat.com>
  • Loading branch information
alanconway committed Dec 5, 2019
1 parent ae5d459 commit 237c721
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 206 deletions.
27 changes: 0 additions & 27 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func (c *Client) NewSession(opts ...SessionOption) (*Session, error) {
OutgoingWindow: s.outgoingWindow,
HandleMax: s.handleMax,
}
debug(1, "TX: %s", begin)
s.txFrame(begin, nil)

// wait for response
Expand All @@ -143,8 +142,6 @@ func (c *Client) NewSession(opts ...SessionOption) (*Session, error) {
return nil, c.conn.getErr()
case fr = <-s.rx:
}
debug(1, "RX: %s", fr.body)

begin, ok := fr.body.(*performBegin)
if !ok {
_ = s.Close(context.Background()) // deallocate session on error
Expand Down Expand Up @@ -564,8 +561,6 @@ func (s *Session) mux(remoteBegin *performBegin) {

// incoming frame for link
case fr := <-s.rx:
debug(1, "RX(Session): %s", fr.body)

switch body := fr.body.(type) {
// Disposition frames can reference transfers from more than one
// link. Send this frame to all of them.
Expand Down Expand Up @@ -660,7 +655,6 @@ func (s *Session) mux(remoteBegin *performBegin) {
NextOutgoingID: nextOutgoingID,
OutgoingWindow: s.outgoingWindow,
}
debug(1, "TX: %s", resp)
s.txFrame(resp, nil)
}

Expand Down Expand Up @@ -712,7 +706,6 @@ func (s *Session) mux(remoteBegin *performBegin) {
NextOutgoingID: nextOutgoingID,
OutgoingWindow: s.outgoingWindow,
}
debug(1, "TX(Session): %s", flow)
s.txFrame(flow, nil)
remoteOutgoingWindow = s.incomingWindow
}
Expand Down Expand Up @@ -762,8 +755,6 @@ func (s *Session) mux(remoteBegin *performBegin) {
settlementByDeliveryID[deliveryID] = fr.done
fr.done = nil
}

debug(2, "TX(Session): %s", fr)
s.txFrame(fr, fr.done)

// "Upon sending a transfer, the sending endpoint will increment
Expand All @@ -780,13 +771,11 @@ func (s *Session) mux(remoteBegin *performBegin) {
fr.IncomingWindow = s.incomingWindow
fr.NextOutgoingID = nextOutgoingID
fr.OutgoingWindow = s.outgoingWindow
debug(1, "TX(Session): %s", fr)
s.txFrame(fr, nil)
remoteOutgoingWindow = s.incomingWindow
case *performTransfer:
panic("transfer frames must use txTransfer")
default:
debug(1, "TX(Session): %s", fr)
s.txFrame(fr, nil)
}
}
Expand Down Expand Up @@ -936,7 +925,6 @@ func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
}

// send Attach frame
debug(1, "TX: %s", attach)
s.txFrame(attach, nil)

// wait for response
Expand All @@ -946,7 +934,6 @@ func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
return nil, s.err
case fr = <-l.rx:
}
debug(3, "RX: %s", fr)
resp, ok := fr.(*performAttach)
if !ok {
return nil, errorErrorf("unexpected attach response: %#v", fr)
Expand Down Expand Up @@ -979,7 +966,6 @@ func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
Handle: l.handle,
Closed: true,
}
debug(1, "TX: %s", fr)
s.txFrame(fr, nil)

if detach.Error == nil {
Expand Down Expand Up @@ -1105,8 +1091,6 @@ Loop:

// send data
case tr := <-outgoingTransfers:
debug(3, "TX(link): %s", tr)

// Ensure the session mux is not blocked
for {
select {
Expand Down Expand Up @@ -1156,7 +1140,6 @@ func (l *link) muxFlow() error {
DeliveryCount: &deliveryCount,
LinkCredit: &linkCredit, // max number of messages
}
debug(3, "TX: %s", fr)

// Update credit. This must happen before entering loop below
// because incoming messages handled while waiting to transmit
Expand Down Expand Up @@ -1323,7 +1306,6 @@ func (l *link) muxHandleFrame(fr frameBody) error {
switch fr := fr.(type) {
// message frame
case *performTransfer:
debug(3, "RX: %s", fr)
if isSender {
// Senders should never receive transfer frames, but handle it just in case.
l.closeWithError(&Error{
Expand All @@ -1337,7 +1319,6 @@ func (l *link) muxHandleFrame(fr frameBody) error {

// flow control frame
case *performFlow:
debug(3, "RX: %s", fr)
if isSender {
linkCredit := *fr.LinkCredit - l.deliveryCount
if fr.DeliveryCount != nil {
Expand Down Expand Up @@ -1365,12 +1346,10 @@ func (l *link) muxHandleFrame(fr frameBody) error {
DeliveryCount: &deliveryCount,
LinkCredit: &linkCredit, // max number of messages
}
debug(1, "TX: %s", resp)
l.session.txFrame(resp, nil)

// remote side is closing links
case *performDetach:
debug(1, "RX: %s", fr)
// don't currently support link detach and reattach
if !fr.Closed {
return errorErrorf("non-closing detach not supported: %+v", fr)
Expand All @@ -1382,8 +1361,6 @@ func (l *link) muxHandleFrame(fr frameBody) error {
return errorWrapf(&DetachError{fr.Error}, "received detach frame")

case *performDisposition:
debug(3, "RX: %s", fr)

// Unblock receivers waiting for message disposition
if l.receiver != nil {
l.receiver.inFlight.remove(fr.First, fr.Last, nil)
Expand All @@ -1406,11 +1383,9 @@ func (l *link) muxHandleFrame(fr frameBody) error {
Last: fr.Last,
Settled: true,
}
debug(1, "TX: %s", resp)
l.session.txFrame(resp, nil)

default:
debug(1, "RX: %s", fr)
fmt.Printf("Unexpected frame: %s\n", fr)
}

Expand Down Expand Up @@ -2006,8 +1981,6 @@ func (r *Receiver) sendDisposition(first uint32, last *uint32, state interface{}
Settled: r.link.receiverSettleMode == nil || *r.link.receiverSettleMode == ModeFirst,
State: state,
}

debug(1, "TX: %s", fr)
return r.link.session.txFrame(fr, nil)
}

Expand Down
8 changes: 3 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ func (c *conn) mux() {

// new frame from connReader
case fr := <-c.rxFrame:
debugFrame(c, "RX", &fr)
var (
session *Session
ok bool
Expand Down Expand Up @@ -619,7 +620,6 @@ func (c *conn) connWriter() {
case <-c.done:
// send close
cls := &performClose{}
debug(1, "TX: %s", cls)
_ = c.writeFrame(frame{
type_: frameTypeAMQP,
body: cls,
Expand All @@ -632,6 +632,7 @@ func (c *conn) connWriter() {
// writeFrame writes a frame to the network, may only be used
// by connWriter after initial negotiation.
func (c *conn) writeFrame(fr frame) error {
debugFrame(c, "TX", &fr)
if c.connectTimeout != 0 {
_ = c.net.SetWriteDeadline(time.Now().Add(c.connectTimeout))
}
Expand Down Expand Up @@ -807,7 +808,6 @@ func (c *conn) openAMQP() stateFunc {
IdleTimeout: c.idleTimeout,
Properties: c.properties,
}
debug(1, "TX: %s", open)
c.err = c.writeFrame(frame{
type_: frameTypeAMQP,
body: open,
Expand All @@ -828,7 +828,6 @@ func (c *conn) openAMQP() stateFunc {
c.err = errorErrorf("unexpected frame type %T", fr.body)
return nil
}
debug(1, "RX: %s", o)

// update peer settings
if o.MaxFrameSize > 0 {
Expand Down Expand Up @@ -860,7 +859,6 @@ func (c *conn) negotiateSASL() stateFunc {
c.err = errorErrorf("unexpected frame type %T", fr.body)
return nil
}
debug(1, "RX: %s", sm)

// return first match in c.saslHandlers based on order received
for _, mech := range sm.Mechanisms {
Expand Down Expand Up @@ -891,7 +889,6 @@ func (c *conn) saslOutcome() stateFunc {
c.err = errorErrorf("unexpected frame type %T", fr.body)
return nil
}
debug(1, "RX: %s", so)

// check if auth succeeded
if so.Code != codeSASLOK {
Expand All @@ -916,6 +913,7 @@ func (c *conn) readFrame() (frame, error) {
var fr frame
select {
case fr = <-c.rxFrame:
debugFrame(c, "RX", &fr)
return fr, nil
case err := <-c.connErr:
return fr, err
Expand Down
3 changes: 2 additions & 1 deletion log.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ package amqp

// dummy functions used when debugging is not enabled

func debug(_ int, _ string, _ ...interface{}) {}
func debug(_ int, _ string, _ ...interface{}) {}
func debugFrame(c *conn, prefix string, fr *frame) {}
17 changes: 17 additions & 0 deletions log_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,20 @@ func debug(level int, format string, v ...interface{}) {
logger.Printf(format, v...)
}
}

func debugFrame(c *conn, prefix string, fr *frame) {
if debugLevel == 0 { // Fast exit for no logging
return
}
// Set level by frame type
level := 1 // Normal frames at 1
switch fr.body.(type) {
case *performTransfer: // High-volume messages
level = 2
case *performFlow, *performDisposition: // Noisy flow and acknowledgment
level = 3
}
if level <= debugLevel {
logger.Printf("%p %s[%d]: %s", c, prefix, fr.channel, fr.body)
}
}
2 changes: 0 additions & 2 deletions sasl.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func ConnSASLPlain(username, password string) ConnOption {
InitialResponse: []byte("\x00" + username + "\x00" + password),
Hostname: "",
}
debug(1, "TX: %s", init)
c.err = c.writeFrame(frame{
type_: frameTypeSASL,
body: init,
Expand Down Expand Up @@ -77,7 +76,6 @@ func ConnSASLAnonymous() ConnOption {
Mechanism: saslMechanismANONYMOUS,
InitialResponse: []byte("anonymous"),
}
debug(1, "TX: %s", init)
c.err = c.writeFrame(frame{
type_: frameTypeSASL,
body: init,
Expand Down
Loading

0 comments on commit 237c721

Please sign in to comment.