Skip to content
This repository has been archived by the owner on Dec 14, 2020. It is now read-only.

Improved debug logging. #204

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 0 additions & 27 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func (c *Client) NewSession(opts ...SessionOption) (*Session, error) {
OutgoingWindow: s.outgoingWindow,
HandleMax: s.handleMax,
}
debug(1, "TX: %s", begin)
s.txFrame(begin, nil)

// wait for response
Expand All @@ -143,8 +142,6 @@ func (c *Client) NewSession(opts ...SessionOption) (*Session, error) {
return nil, c.conn.getErr()
case fr = <-s.rx:
}
debug(1, "RX: %s", fr.body)

begin, ok := fr.body.(*performBegin)
if !ok {
_ = s.Close(context.Background()) // deallocate session on error
Expand Down Expand Up @@ -564,8 +561,6 @@ func (s *Session) mux(remoteBegin *performBegin) {

// incoming frame for link
case fr := <-s.rx:
debug(1, "RX(Session): %s", fr.body)

switch body := fr.body.(type) {
// Disposition frames can reference transfers from more than one
// link. Send this frame to all of them.
Expand Down Expand Up @@ -660,7 +655,6 @@ func (s *Session) mux(remoteBegin *performBegin) {
NextOutgoingID: nextOutgoingID,
OutgoingWindow: s.outgoingWindow,
}
debug(1, "TX: %s", resp)
s.txFrame(resp, nil)
}

Expand Down Expand Up @@ -712,7 +706,6 @@ func (s *Session) mux(remoteBegin *performBegin) {
NextOutgoingID: nextOutgoingID,
OutgoingWindow: s.outgoingWindow,
}
debug(1, "TX(Session): %s", flow)
s.txFrame(flow, nil)
remoteOutgoingWindow = s.incomingWindow
}
Expand Down Expand Up @@ -762,8 +755,6 @@ func (s *Session) mux(remoteBegin *performBegin) {
settlementByDeliveryID[deliveryID] = fr.done
fr.done = nil
}

debug(2, "TX(Session): %s", fr)
s.txFrame(fr, fr.done)

// "Upon sending a transfer, the sending endpoint will increment
Expand All @@ -780,13 +771,11 @@ func (s *Session) mux(remoteBegin *performBegin) {
fr.IncomingWindow = s.incomingWindow
fr.NextOutgoingID = nextOutgoingID
fr.OutgoingWindow = s.outgoingWindow
debug(1, "TX(Session): %s", fr)
s.txFrame(fr, nil)
remoteOutgoingWindow = s.incomingWindow
case *performTransfer:
panic("transfer frames must use txTransfer")
default:
debug(1, "TX(Session): %s", fr)
s.txFrame(fr, nil)
}
}
Expand Down Expand Up @@ -936,7 +925,6 @@ func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
}

// send Attach frame
debug(1, "TX: %s", attach)
s.txFrame(attach, nil)

// wait for response
Expand All @@ -946,7 +934,6 @@ func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
return nil, s.err
case fr = <-l.rx:
}
debug(3, "RX: %s", fr)
resp, ok := fr.(*performAttach)
if !ok {
return nil, errorErrorf("unexpected attach response: %#v", fr)
Expand Down Expand Up @@ -979,7 +966,6 @@ func attachLink(s *Session, r *Receiver, opts []LinkOption) (*link, error) {
Handle: l.handle,
Closed: true,
}
debug(1, "TX: %s", fr)
s.txFrame(fr, nil)

if detach.Error == nil {
Expand Down Expand Up @@ -1105,8 +1091,6 @@ Loop:

// send data
case tr := <-outgoingTransfers:
debug(3, "TX(link): %s", tr)

// Ensure the session mux is not blocked
for {
select {
Expand Down Expand Up @@ -1156,7 +1140,6 @@ func (l *link) muxFlow() error {
DeliveryCount: &deliveryCount,
LinkCredit: &linkCredit, // max number of messages
}
debug(3, "TX: %s", fr)

// Update credit. This must happen before entering loop below
// because incoming messages handled while waiting to transmit
Expand Down Expand Up @@ -1323,7 +1306,6 @@ func (l *link) muxHandleFrame(fr frameBody) error {
switch fr := fr.(type) {
// message frame
case *performTransfer:
debug(3, "RX: %s", fr)
if isSender {
// Senders should never receive transfer frames, but handle it just in case.
l.closeWithError(&Error{
Expand All @@ -1337,7 +1319,6 @@ func (l *link) muxHandleFrame(fr frameBody) error {

// flow control frame
case *performFlow:
debug(3, "RX: %s", fr)
if isSender {
linkCredit := *fr.LinkCredit - l.deliveryCount
if fr.DeliveryCount != nil {
Expand Down Expand Up @@ -1365,12 +1346,10 @@ func (l *link) muxHandleFrame(fr frameBody) error {
DeliveryCount: &deliveryCount,
LinkCredit: &linkCredit, // max number of messages
}
debug(1, "TX: %s", resp)
l.session.txFrame(resp, nil)

// remote side is closing links
case *performDetach:
debug(1, "RX: %s", fr)
// don't currently support link detach and reattach
if !fr.Closed {
return errorErrorf("non-closing detach not supported: %+v", fr)
Expand All @@ -1382,8 +1361,6 @@ func (l *link) muxHandleFrame(fr frameBody) error {
return errorWrapf(&DetachError{fr.Error}, "received detach frame")

case *performDisposition:
debug(3, "RX: %s", fr)

// Unblock receivers waiting for message disposition
if l.receiver != nil {
l.receiver.inFlight.remove(fr.First, fr.Last, nil)
Expand All @@ -1406,11 +1383,9 @@ func (l *link) muxHandleFrame(fr frameBody) error {
Last: fr.Last,
Settled: true,
}
debug(1, "TX: %s", resp)
l.session.txFrame(resp, nil)

default:
debug(1, "RX: %s", fr)
fmt.Printf("Unexpected frame: %s\n", fr)
}

Expand Down Expand Up @@ -2006,8 +1981,6 @@ func (r *Receiver) sendDisposition(first uint32, last *uint32, state interface{}
Settled: r.link.receiverSettleMode == nil || *r.link.receiverSettleMode == ModeFirst,
State: state,
}

debug(1, "TX: %s", fr)
return r.link.session.txFrame(fr, nil)
}

Expand Down
8 changes: 3 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ func (c *conn) mux() {

// new frame from connReader
case fr := <-c.rxFrame:
debugFrame(c, "RX", &fr)
var (
session *Session
ok bool
Expand Down Expand Up @@ -619,7 +620,6 @@ func (c *conn) connWriter() {
case <-c.done:
// send close
cls := &performClose{}
debug(1, "TX: %s", cls)
_ = c.writeFrame(frame{
type_: frameTypeAMQP,
body: cls,
Expand All @@ -632,6 +632,7 @@ func (c *conn) connWriter() {
// writeFrame writes a frame to the network, may only be used
// by connWriter after initial negotiation.
func (c *conn) writeFrame(fr frame) error {
debugFrame(c, "TX", &fr)
if c.connectTimeout != 0 {
_ = c.net.SetWriteDeadline(time.Now().Add(c.connectTimeout))
}
Expand Down Expand Up @@ -807,7 +808,6 @@ func (c *conn) openAMQP() stateFunc {
IdleTimeout: c.idleTimeout,
Properties: c.properties,
}
debug(1, "TX: %s", open)
c.err = c.writeFrame(frame{
type_: frameTypeAMQP,
body: open,
Expand All @@ -828,7 +828,6 @@ func (c *conn) openAMQP() stateFunc {
c.err = errorErrorf("unexpected frame type %T", fr.body)
return nil
}
debug(1, "RX: %s", o)

// update peer settings
if o.MaxFrameSize > 0 {
Expand Down Expand Up @@ -860,7 +859,6 @@ func (c *conn) negotiateSASL() stateFunc {
c.err = errorErrorf("unexpected frame type %T", fr.body)
return nil
}
debug(1, "RX: %s", sm)

// return first match in c.saslHandlers based on order received
for _, mech := range sm.Mechanisms {
Expand Down Expand Up @@ -891,7 +889,6 @@ func (c *conn) saslOutcome() stateFunc {
c.err = errorErrorf("unexpected frame type %T", fr.body)
return nil
}
debug(1, "RX: %s", so)

// check if auth succeeded
if so.Code != codeSASLOK {
Expand All @@ -916,6 +913,7 @@ func (c *conn) readFrame() (frame, error) {
var fr frame
select {
case fr = <-c.rxFrame:
debugFrame(c, "RX", &fr)
return fr, nil
case err := <-c.connErr:
return fr, err
Expand Down
3 changes: 2 additions & 1 deletion log.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ package amqp

// dummy functions used when debugging is not enabled

func debug(_ int, _ string, _ ...interface{}) {}
func debug(_ int, _ string, _ ...interface{}) {}
func debugFrame(c *conn, prefix string, fr *frame) {}
17 changes: 17 additions & 0 deletions log_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,20 @@ func debug(level int, format string, v ...interface{}) {
logger.Printf(format, v...)
}
}

func debugFrame(c *conn, prefix string, fr *frame) {
if debugLevel == 0 { // Fast exit for no logging
return
}
// Set level by frame type
level := 1 // Normal frames at 1
switch fr.body.(type) {
case *performTransfer: // High-volume messages
level = 2
case *performFlow, *performDisposition: // Noisy flow and acknowledgment
level = 3
}
if level <= debugLevel {
logger.Printf("%p %s[%d]: %s", c, prefix, fr.channel, fr.body)
}
}
2 changes: 0 additions & 2 deletions sasl.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func ConnSASLPlain(username, password string) ConnOption {
InitialResponse: []byte("\x00" + username + "\x00" + password),
Hostname: "",
}
debug(1, "TX: %s", init)
c.err = c.writeFrame(frame{
type_: frameTypeSASL,
body: init,
Expand Down Expand Up @@ -77,7 +76,6 @@ func ConnSASLAnonymous() ConnOption {
Mechanism: saslMechanismANONYMOUS,
InitialResponse: []byte("anonymous"),
}
debug(1, "TX: %s", init)
c.err = c.writeFrame(frame{
type_: frameTypeSASL,
body: init,
Expand Down
Loading