Skip to content

Commit

Permalink
Refactor logging in pgsql module
Browse files Browse the repository at this point in the history
Guard debug logging statements with "isDebug" checks. And switch the module over to using named loggers.

Fixes elastic#12150
  • Loading branch information
andrewkroh committed May 9, 2019
1 parent c9ffceb commit 52a05dd
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 66 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Prevent duplicate packet loss error messages in HTTP events. {pull}10709[10709]
- Avoid reporting unknown MongoDB opcodes more than once. {pull}10878[10878]
- Fixed a memory leak when using process monitoring under Windows. {pull}12100[12100]
- Improved debug logging efficiency in PGQSL module. {issue}12150[12150]

*Winlogbeat*

Expand Down
98 changes: 47 additions & 51 deletions packetbeat/protos/pgsql/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"strings"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

var (
Expand All @@ -34,7 +33,7 @@ var (
)

func (pgsql *pgsqlPlugin) pgsqlMessageParser(s *pgsqlStream) (bool, bool) {
debugf("pgsqlMessageParser, off=%v", s.parseOffset)
pgsql.debugf("pgsqlMessageParser, off=%v", s.parseOffset)

var ok, complete bool

Expand All @@ -46,22 +45,22 @@ func (pgsql *pgsqlPlugin) pgsqlMessageParser(s *pgsqlStream) (bool, bool) {
case pgsqlExtendedQueryState:
ok, complete = pgsql.parseMessageExtendedQuery(s)
default:
logp.Critical("Pgsql invalid parser state")
pgsql.log.Error("Pgsql invalid parser state")
}

detailedf("pgsqlMessageParser return: ok=%v, complete=%v, off=%v",
pgsql.detailf("pgsqlMessageParser return: ok=%v, complete=%v, off=%v",
ok, complete, s.parseOffset)

return ok, complete
}

func (pgsql *pgsqlPlugin) parseMessageStart(s *pgsqlStream) (bool, bool) {
detailedf("parseMessageStart")
pgsql.detailf("parseMessageStart")

m := s.message

for len(s.data[s.parseOffset:]) >= 5 {
isSpecial, length, command := isSpecialPgsqlCommand(s.data[s.parseOffset:])
isSpecial, length, command := pgsql.isSpecialCommand(s.data[s.parseOffset:])
if !isSpecial {
return pgsql.parseCommand(s)
}
Expand All @@ -71,7 +70,7 @@ func (pgsql *pgsqlPlugin) parseMessageStart(s *pgsqlStream) (bool, bool) {

// check buffer available
if len(s.data[s.parseOffset:]) <= length {
detailedf("Wait for more data 1")
pgsql.detailf("Wait for more data 1")
return true, false
}

Expand Down Expand Up @@ -103,7 +102,7 @@ func (pgsql *pgsqlPlugin) parseCommand(s *pgsqlStream) (bool, bool) {
m := s.message

// one byte reply to SSLRequest
detailedf("Reply for SSLRequest %c", typ)
pgsql.detailf("Reply for SSLRequest %c", typ)
m.start = s.parseOffset
s.parseOffset++
m.end = s.parseOffset
Expand All @@ -118,15 +117,15 @@ func (pgsql *pgsqlPlugin) parseCommand(s *pgsqlStream) (bool, bool) {
length := readLength(s.data[s.parseOffset+1:])
if length < 4 {
// length should include the size of itself (int32)
detailedf("Invalid pgsql command length.")
pgsql.detailf("Invalid pgsql command length.")
return false, false
}
if len(s.data[s.parseOffset:]) <= length {
detailedf("Wait for more data")
pgsql.detailf("Wait for more data")
return true, false
}

detailedf("Pgsql type %c, length=%d", typ, length)
pgsql.detailf("Pgsql type %c, length=%d", typ, length)

switch typ {
case 'Q':
Expand All @@ -147,7 +146,7 @@ func (pgsql *pgsqlPlugin) parseCommand(s *pgsqlStream) (bool, bool) {
return pgsql.parseExtResp(s, length)
default:
if !pgsqlValidType(typ) {
detailedf("invalid frame type: '%c'", typ)
pgsql.detailf("invalid frame type: '%c'", typ)
return false, false
}
return pgsql.parseSkipMessage(s, length)
Expand All @@ -172,7 +171,7 @@ func (pgsql *pgsqlPlugin) parseSimpleQuery(s *pgsqlStream, length int) (bool, bo
m.query = query

m.toExport = true
detailedf("Simple Query: %s", m.query)
pgsql.detailf("Simple Query: %s", m.query)
return true, true
}

Expand All @@ -184,12 +183,12 @@ func (pgsql *pgsqlPlugin) parseRowDescription(s *pgsqlStream, length int) (bool,
m.isOK = true
m.toExport = true

err := pgsqlFieldsParser(s, s.data[s.parseOffset+5:s.parseOffset+length+1])
err := pgsql.parseFields(s, s.data[s.parseOffset+5:s.parseOffset+length+1])
if err != nil {
detailedf("fields parse failed with: %v", err)
pgsql.detailf("parseFields failed with: %v", err)
return false, false
}
detailedf("Fields: %s", m.fields)
pgsql.detailf("Fields: %s", m.fields)

s.parseOffset++ //type
s.parseOffset += length //length
Expand Down Expand Up @@ -218,7 +217,7 @@ func (pgsql *pgsqlPlugin) parseEmptyQueryResponse(s *pgsqlStream) (bool, bool) {

m := s.message

detailedf("EmptyQueryResponse")
pgsql.detailf("EmptyQueryResponse")
m.start = s.parseOffset
m.isOK = true
m.isRequest = false
Expand All @@ -245,7 +244,7 @@ func (pgsql *pgsqlPlugin) parseCommandComplete(s *pgsqlStream, length int) (bool
return false, false
}

detailedf("CommandComplete length=%d, tag=%s", length, name)
pgsql.detailf("CommandComplete length=%d, tag=%s", length, name)

s.parseOffset += length
m.end = s.parseOffset
Expand All @@ -269,7 +268,7 @@ func (pgsql *pgsqlPlugin) parseReadyForQuery(s *pgsqlStream, length int) (bool,

func (pgsql *pgsqlPlugin) parseErrorResponse(s *pgsqlStream, length int) (bool, bool) {
// ErrorResponse
detailedf("ErrorResponse")
pgsql.detailf("ErrorResponse")

m := s.message
m.start = s.parseOffset
Expand All @@ -278,7 +277,7 @@ func (pgsql *pgsqlPlugin) parseErrorResponse(s *pgsqlStream, length int) (bool,
m.toExport = true

s.parseOffset++ //type
pgsqlErrorParser(s, s.data[s.parseOffset+4:s.parseOffset+length])
pgsql.parseError(s, s.data[s.parseOffset+4:s.parseOffset+length])

s.parseOffset += length //length
m.end = s.parseOffset
Expand All @@ -289,7 +288,7 @@ func (pgsql *pgsqlPlugin) parseErrorResponse(s *pgsqlStream, length int) (bool,

func (pgsql *pgsqlPlugin) parseExtReq(s *pgsqlStream, length int) (bool, bool) {
// Ready for query -> Parse for an extended query request
detailedf("Parse")
pgsql.detailf("Parse")

m := s.message
m.start = s.parseOffset
Expand All @@ -303,11 +302,11 @@ func (pgsql *pgsqlPlugin) parseExtReq(s *pgsqlStream, length int) (bool, bool) {

query, err := common.ReadString(s.data[m.start+6:])
if err != nil {
detailedf("Invalid extended query request")
pgsql.detailf("Invalid extended query request")
return false, false
}
m.query = query
detailedf("Parse in an extended query request: %s", m.query)
pgsql.detailf("Parse in an extended query request: %s", m.query)

// Ignore SET statement
if strings.HasPrefix(m.query, "SET ") {
Expand All @@ -319,7 +318,7 @@ func (pgsql *pgsqlPlugin) parseExtReq(s *pgsqlStream, length int) (bool, bool) {

func (pgsql *pgsqlPlugin) parseExtResp(s *pgsqlStream, length int) (bool, bool) {
// Sync -> Parse completion for an extended query response
detailedf("ParseCompletion")
pgsql.detailf("ParseCompletion")

m := s.message
m.start = s.parseOffset
Expand All @@ -329,7 +328,7 @@ func (pgsql *pgsqlPlugin) parseExtResp(s *pgsqlStream, length int) (bool, bool)

s.parseOffset++ //type
s.parseOffset += length
detailedf("Parse completion in an extended query response")
pgsql.detailf("Parse completion in an extended query response")
s.parseState = pgsqlGetDataState
return pgsql.parseMessageData(s)
}
Expand All @@ -349,7 +348,7 @@ func (pgsql *pgsqlPlugin) parseSkipMessage(s *pgsqlStream, length int) (bool, bo
return true, true
}

func pgsqlFieldsParser(s *pgsqlStream, buf []byte) error {
func (pgsql *pgsqlPlugin) parseFields(s *pgsqlStream, buf []byte) error {
m := s.message

if len(buf) < 2 {
Expand All @@ -359,7 +358,6 @@ func pgsqlFieldsParser(s *pgsqlStream, buf []byte) error {
// read field count (int16)
off := 2
fieldCount := readCount(buf)
detailedf("Row Description field count=%d", fieldCount)

fields := []string{}
fieldsFormat := []byte{}
Expand Down Expand Up @@ -400,8 +398,6 @@ func pgsqlFieldsParser(s *pgsqlStream, buf []byte) error {
format := common.BytesNtohs(buf[off : off+2])
off += 2
fieldsFormat = append(fieldsFormat, byte(format))

detailedf("Field name=%s, format=%d", fieldName, format)
}

if off < len(buf) {
Expand All @@ -411,13 +407,13 @@ func pgsqlFieldsParser(s *pgsqlStream, buf []byte) error {
m.fields = fields
m.fieldsFormat = fieldsFormat
if m.numberOfFields != fieldCount {
logp.Err("Missing fields from RowDescription. Expected %d. Received %d",
pgsql.log.Errorf("Missing fields from RowDescription. Expected %d. Received %d",
fieldCount, m.numberOfFields)
}
return nil
}

func pgsqlErrorParser(s *pgsqlStream, buf []byte) {
func (pgsql *pgsqlPlugin) parseError(s *pgsqlStream, buf []byte) {
m := s.message
off := 0
for off < len(buf) {
Expand All @@ -430,7 +426,7 @@ func pgsqlErrorParser(s *pgsqlStream, buf []byte) {
// read field value(string)
val, err := common.ReadString(buf[off+1:])
if err != nil {
logp.Err("Failed to read the column field")
pgsql.log.Error("Failed to read the column field")
break
}
off += len(val) + 2
Expand All @@ -444,11 +440,11 @@ func pgsqlErrorParser(s *pgsqlStream, buf []byte) {
m.errorSeverity = val
}
}
detailedf("%s %s %s", m.errorSeverity, m.errorCode, m.errorInfo)
pgsql.detailf("%s %s %s", m.errorSeverity, m.errorCode, m.errorInfo)
}

func (pgsql *pgsqlPlugin) parseMessageData(s *pgsqlStream) (bool, bool) {
detailedf("parseMessageData")
pgsql.detailf("parseMessageData")

// The response to queries that return row sets contains:
// RowDescription
Expand All @@ -466,12 +462,12 @@ func (pgsql *pgsqlPlugin) parseMessageData(s *pgsqlStream) (bool, bool) {
length := readLength(s.data[s.parseOffset+1:])
if length < 4 {
// length should include the size of itself (int32)
detailedf("Invalid pgsql command length.")
pgsql.detailf("Invalid pgsql command length.")
return false, false
}
if len(s.data[s.parseOffset:]) <= length {
// wait for more
detailedf("Wait for more data")
pgsql.detailf("Wait for more data")
return true, false
}

Expand All @@ -491,17 +487,17 @@ func (pgsql *pgsqlPlugin) parseMessageData(s *pgsqlStream) (bool, bool) {

name, err := pgsqlString(s.data[s.parseOffset+4:], length-4)
if err != nil {
detailedf("pgsql string invalid")
pgsql.detailf("pgsql string invalid")
return false, false
}

detailedf("CommandComplete length=%d, tag=%s", length, name)
pgsql.detailf("CommandComplete length=%d, tag=%s", length, name)
s.parseOffset += length
m.end = s.parseOffset
m.size = uint64(m.end - m.start)
s.parseState = pgsqlStartState

detailedf("Rows: %s", m.rows)
pgsql.detailf("Rows: %s", m.rows)

return true, true
case '2':
Expand All @@ -515,7 +511,7 @@ func (pgsql *pgsqlPlugin) parseMessageData(s *pgsqlStream) (bool, bool) {
return pgsql.parseRowDescription(s, length)
default:
// shouldn't happen -> return error
logp.Warn("Pgsql parser expected data message, but received command of type %v", typ)
pgsql.log.Warnf("Pgsql parser expected data message, but received command of type %v", typ)
s.parseState = pgsqlStartState
return false, false
}
Expand All @@ -530,7 +526,7 @@ func (pgsql *pgsqlPlugin) parseDataRow(s *pgsqlStream, buf []byte) error {
// read field count (int16)
off := 2
fieldCount := readCount(buf)
detailedf("DataRow field count=%d", fieldCount)
pgsql.detailf("DataRow field count=%d", fieldCount)

rows := []string{}
rowLength := 0
Expand All @@ -545,7 +541,7 @@ func (pgsql *pgsqlPlugin) parseDataRow(s *pgsqlStream, buf []byte) error {
off += 4

if columnLength > 0 && columnLength > len(buf[off:]) {
logp.Err("Pgsql invalid column_length=%v, buffer_length=%v, i=%v",
pgsql.log.Errorf("Pgsql invalid column_length=%v, buffer_length=%v, i=%v",
columnLength, len(buf[off:]), i)
return errInvalidLength
}
Expand All @@ -568,7 +564,7 @@ func (pgsql *pgsqlPlugin) parseDataRow(s *pgsqlStream, buf []byte) error {
rowLength += len(columnValue)
}

detailedf("Value %s, length=%d, off=%d", string(columnValue), columnLength, off)
pgsql.detailf("Value %s, length=%d, off=%d", string(columnValue), columnLength, off)
}

if off < len(buf) {
Expand All @@ -584,7 +580,7 @@ func (pgsql *pgsqlPlugin) parseDataRow(s *pgsqlStream, buf []byte) error {
}

func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool) {
detailedf("parseMessageExtendedQuery")
pgsql.detailf("parseMessageExtendedQuery")

// An extended query request contains:
// Parse
Expand All @@ -603,12 +599,12 @@ func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool)
length := readLength(s.data[s.parseOffset+1:])
if length < 4 {
// length should include the size of itself (int32)
detailedf("Invalid pgsql command length.")
pgsql.detailf("Invalid pgsql command length.")
return false, false
}
if len(s.data[s.parseOffset:]) <= length {
// wait for more
detailedf("Wait for more data")
pgsql.detailf("Wait for more data")
return true, false
}

Expand Down Expand Up @@ -647,7 +643,7 @@ func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool)
return true, true
default:
// shouldn't happen -> return error
logp.Warn("Pgsql parser expected extended query message, but received command of type %v", typ)
pgsql.log.Warnf("Pgsql parser expected extended query message, but received command of type %v", typ)
s.parseState = pgsqlStartState
return false, false
}
Expand All @@ -656,7 +652,7 @@ func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool)
return true, false
}

func isSpecialPgsqlCommand(data []byte) (bool, int, int) {
func (pgsql *pgsqlPlugin) isSpecialCommand(data []byte) (bool, int, int) {
if len(data) < 8 {
// 8 bytes required
return false, 0, 0
Expand All @@ -670,15 +666,15 @@ func isSpecialPgsqlCommand(data []byte) (bool, int, int) {

if length == 16 && code == 80877102 {
// Cancel Request
logp.Debug("pgsqldetailed", "Cancel Request, length=%d", length)
pgsql.debugf("Cancel Request, length=%d", length)
return true, length, cancelRequest
} else if length == 8 && code == 80877103 {
// SSL Request
logp.Debug("pgsqldetailed", "SSL Request, length=%d", length)
pgsql.debugf("SSL Request, length=%d", length)
return true, length, sslRequest
} else if code == 196608 {
// Startup Message
logp.Debug("pgsqldetailed", "Startup Message, length=%d", length)
pgsql.debugf("Startup Message, length=%d", length)
return true, length, startupMessage
}
return false, 0, 0
Expand Down
Loading

0 comments on commit 52a05dd

Please sign in to comment.