Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved memberlist TCPTransport logs #86

Merged
merged 1 commit into from
Dec 1, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions kv/memberlist/tcp_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func NewTCPTransport(config TCPTransportConfig, logger log.Logger) (*TCPTranspor
var ok bool
t := TCPTransport{
cfg: config,
logger: logger,
logger: log.With(logger, "component", "memberlist TCPTransport"),
packetCh: make(chan *memberlist.Packet),
connCh: make(chan net.Conn),
}
Expand Down Expand Up @@ -210,7 +210,7 @@ func (t *TCPTransport) tcpListen(tcpLn net.Listener) {
loopDelay = maxDelay
}

level.Error(t.logger).Log("msg", "TCPTransport: Error accepting TCP connection", "err", err)
level.Error(t.logger).Log("msg", "Error accepting TCP connection", "err", err)
time.Sleep(loopDelay)
continue
}
Expand All @@ -231,7 +231,7 @@ func (t *TCPTransport) debugLog() log.Logger {
}

func (t *TCPTransport) handleConnection(conn net.Conn) {
t.debugLog().Log("msg", "TCPTransport: New connection", "addr", conn.RemoteAddr())
t.debugLog().Log("msg", "New connection", "addr", conn.RemoteAddr())

closeConn := true
defer func() {
Expand All @@ -244,7 +244,7 @@ func (t *TCPTransport) handleConnection(conn net.Conn) {
msgType := []byte{0}
_, err := io.ReadFull(conn, msgType)
if err != nil {
level.Error(t.logger).Log("msg", "TCPTransport: failed to read message type", "err", err)
level.Warn(t.logger).Log("msg", "failed to read message type", "err", err, "remote", conn.RemoteAddr())
return
}

Expand All @@ -259,33 +259,33 @@ func (t *TCPTransport) handleConnection(conn net.Conn) {
t.receivedPackets.Inc()

// before reading packet, read the address
b := []byte{0}
_, err := io.ReadFull(conn, b)
addrLengthBuf := []byte{0}
_, err := io.ReadFull(conn, addrLengthBuf)
if err != nil {
t.receivedPacketsErrors.Inc()
level.Error(t.logger).Log("msg", "TCPTransport: error while reading address:", "err", err)
level.Warn(t.logger).Log("msg", "error while reading node address length from packet", "err", err, "remote", conn.RemoteAddr())
return
}

addrBuf := make([]byte, b[0])
addrBuf := make([]byte, addrLengthBuf[0])
_, err = io.ReadFull(conn, addrBuf)
if err != nil {
t.receivedPacketsErrors.Inc()
level.Error(t.logger).Log("msg", "TCPTransport: error while reading address:", "err", err)
level.Warn(t.logger).Log("msg", "error while reading node address from packet", "err", err, "remote", conn.RemoteAddr())
return
}

// read the rest to buffer -- this is the "packet" itself
buf, err := io.ReadAll(conn)
if err != nil {
t.receivedPacketsErrors.Inc()
level.Error(t.logger).Log("msg", "TCPTransport: error while reading packet data:", "err", err)
level.Warn(t.logger).Log("msg", "error while reading packet data", "err", err, "remote", conn.RemoteAddr())
return
}

if len(buf) < md5.Size {
t.receivedPacketsErrors.Inc()
level.Error(t.logger).Log("msg", "TCPTransport: not enough data received", "length", len(buf))
level.Warn(t.logger).Log("msg", "not enough data received", "data_length", len(buf), "remote", conn.RemoteAddr())
return
}

Expand All @@ -296,10 +296,10 @@ func (t *TCPTransport) handleConnection(conn net.Conn) {

if !bytes.Equal(receivedDigest, expectedDigest[:]) {
t.receivedPacketsErrors.Inc()
level.Warn(t.logger).Log("msg", "TCPTransport: packet digest mismatch", "expected", fmt.Sprintf("%x", expectedDigest), "received", fmt.Sprintf("%x", receivedDigest))
level.Warn(t.logger).Log("msg", "packet digest mismatch", "expected", fmt.Sprintf("%x", expectedDigest), "received", fmt.Sprintf("%x", receivedDigest), "data_length", len(buf), "remote", conn.RemoteAddr())
}

t.debugLog().Log("msg", "TCPTransport: Received packet", "addr", addr(addrBuf), "size", len(buf), "hash", fmt.Sprintf("%x", receivedDigest))
t.debugLog().Log("msg", "Received packet", "addr", addr(addrBuf), "size", len(buf), "hash", fmt.Sprintf("%x", receivedDigest))

t.receivedPacketsBytes.Add(float64(len(buf)))

Expand All @@ -310,7 +310,7 @@ func (t *TCPTransport) handleConnection(conn net.Conn) {
}
} else {
t.unknownConnections.Inc()
level.Error(t.logger).Log("msg", "TCPTransport: unknown message type", "msgType", msgType)
level.Error(t.logger).Log("msg", "unknown message type", "msgType", msgType, "remote", conn.RemoteAddr())
}
}

Expand Down Expand Up @@ -414,7 +414,7 @@ func (t *TCPTransport) WriteTo(b []byte, addr string) (time.Time, error) {
if err != nil {
t.sentPacketsErrors.Inc()

level.Warn(t.logger).Log("msg", "TCPTransport: WriteTo failed", "addr", addr, "err", err)
level.Warn(t.logger).Log("msg", "WriteTo failed", "addr", addr, "err", err)

// WriteTo is used to send "UDP" packets. Since we use TCP, we can detect more errors,
// but memberlist library doesn't seem to cope with that very well. That is why we return nil instead.
Expand Down