Skip to content

Commit

Permalink
Merge pull request nsqio#275 from crazyweave/master
Browse files Browse the repository at this point in the history
Added support for setting logger for each log level
  • Loading branch information
ploxiln authored Jan 11, 2020
2 parents d7acddb + dc6633b commit 0abe494
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 30 deletions.
48 changes: 38 additions & 10 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ type Conn struct {

delegate ConnDelegate

logger logger
logger []logger
logLvl LogLevel
logFmt string
logFmt []string
logGuard sync.RWMutex

r io.Reader
Expand Down Expand Up @@ -103,6 +103,9 @@ func NewConn(addr string, config *Config, delegate ConnDelegate) *Conn {
msgResponseChan: make(chan *msgResponse),
exitChan: make(chan int),
drainReady: make(chan int),

logger: make([]logger, LogLevelMax+1),
logFmt: make([]string, LogLevelMax+1),
}
}

Expand All @@ -122,19 +125,44 @@ func (c *Conn) SetLogger(l logger, lvl LogLevel, format string) {
c.logGuard.Lock()
defer c.logGuard.Unlock()

c.logger = l
c.logLvl = lvl
c.logFmt = format
if c.logFmt == "" {
c.logFmt = "(%s)"
for level := range c.logger {
c.logger[level] = l
c.logFmt[level] = format
if c.logFmt[level] == "" {
c.logFmt[level] = "(%s)"
}
}
c.logLvl = lvl
}

func (c *Conn) SetLoggerForLevel(l logger, lvl LogLevel, format string) {
c.logGuard.Lock()
defer c.logGuard.Unlock()

c.logger[lvl] = l
c.logFmt[lvl] = format
}

// SetLoggerLevel sets the package logging level.
func (c *Conn) SetLoggerLevel(lvl LogLevel) {
c.logGuard.Lock()
defer c.logGuard.Unlock()

c.logLvl = lvl
}

func (c *Conn) getLogger(lvl LogLevel) (logger, LogLevel, string) {
c.logGuard.RLock()
defer c.logGuard.RUnlock()

return c.logger[lvl], c.logLvl, c.logFmt[lvl]
}

func (c *Conn) getLogger() (logger, LogLevel, string) {
func (c *Conn) getLogLevel() LogLevel {
c.logGuard.RLock()
defer c.logGuard.RUnlock()

return c.logger, c.logLvl, c.logFmt
return c.logLvl
}

// Connect dials and bootstraps the nsqd connection
Expand Down Expand Up @@ -718,7 +746,7 @@ func (c *Conn) onMessageTouch(m *Message) {
}

func (c *Conn) log(lvl LogLevel, line string, args ...interface{}) {
logger, logLvl, logFmt := c.getLogger()
logger, logLvl, logFmt := c.getLogger(lvl)

if logger == nil {
return
Expand Down
47 changes: 36 additions & 11 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type Consumer struct {

mtx sync.RWMutex

logger logger
logger []logger
logLvl LogLevel
logGuard sync.RWMutex

Expand Down Expand Up @@ -166,7 +166,7 @@ func NewConsumer(topic string, channel string, config *Config) (*Consumer, error
channel: channel,
config: *config,

logger: log.New(os.Stderr, "", log.Flags()),
logger: make([]logger, LogLevelMax+1),
logLvl: LogLevelInfo,
maxInFlight: int32(config.MaxInFlight),

Expand All @@ -183,6 +183,13 @@ func NewConsumer(topic string, channel string, config *Config) (*Consumer, error
StopChan: make(chan int),
exitChan: make(chan int),
}

// Set default logger for all log levels
l := log.New(os.Stderr, "", log.Flags())
for index := range r.logger {
r.logger[index] = l
}

r.wg.Add(1)
go r.rdyLoop()
return r, nil
Expand Down Expand Up @@ -219,22 +226,40 @@ func (r *Consumer) SetLogger(l logger, lvl LogLevel) {
r.logGuard.Lock()
defer r.logGuard.Unlock()

r.logger = l
for level := range r.logger {
r.logger[level] = l
}
r.logLvl = lvl
}

// SetLoggerForLevel assigns the same logger for specified `level`.
func (r *Consumer) SetLoggerForLevel(l logger, lvl LogLevel) {
r.logGuard.Lock()
defer r.logGuard.Unlock()

r.logger[lvl] = l
}

// SetLoggerLevel sets the package logging level.
func (r *Consumer) SetLoggerLevel(lvl LogLevel) {
r.logGuard.Lock()
defer r.logGuard.Unlock()

r.logLvl = lvl
}

func (r *Consumer) getLogger() (logger, LogLevel) {
func (r *Consumer) getLogger(lvl LogLevel) (logger, LogLevel) {
r.logGuard.RLock()
defer r.logGuard.RUnlock()

return r.logger, r.logLvl
return r.logger[lvl], r.logLvl
}

func (r *Consumer) getLogLevel() LogLevel {
r.logGuard.RLock()
defer r.logGuard.RUnlock()

return r.logLvl
}

// SetBehaviorDelegate takes a type implementing one or more
Expand Down Expand Up @@ -530,12 +555,12 @@ func (r *Consumer) ConnectToNSQD(addr string) error {

atomic.StoreInt32(&r.connectedFlag, 1)

logger, logLvl := r.getLogger()

conn := NewConn(addr, &r.config, &consumerConnDelegate{r})
conn.SetLogger(logger, logLvl,
fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel))

conn.SetLoggerLevel(r.getLogLevel())
for index := range r.logger {
conn.SetLoggerForLevel(r.logger[index], LogLevel(index),
fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel))
}
r.mtx.Lock()
_, pendingOk := r.pendingConnections[addr]
_, ok := r.connections[addr]
Expand Down Expand Up @@ -1156,7 +1181,7 @@ func (r *Consumer) exit() {
}

func (r *Consumer) log(lvl LogLevel, line string, args ...interface{}) {
logger, logLvl := r.getLogger()
logger, logLvl := r.getLogger(lvl)

if logger == nil {
return
Expand Down
1 change: 1 addition & 0 deletions delegates.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
LogLevelInfo
LogLevelWarning
LogLevelError
LogLevelMax = iota - 1 // convenience - match highest log level
)

// String returns the string form for a given LogLevel
Expand Down
52 changes: 43 additions & 9 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
type producerConn interface {
String() string
SetLogger(logger, LogLevel, string)
SetLoggerLevel(LogLevel)
SetLoggerForLevel(logger, LogLevel, string)
Connect() (*IdentifyResponse, error)
Close() error
WriteCommand(*Command) error
Expand All @@ -28,7 +30,7 @@ type Producer struct {
conn producerConn
config Config

logger logger
logger []logger
logLvl LogLevel
logGuard sync.RWMutex

Expand Down Expand Up @@ -80,14 +82,20 @@ func NewProducer(addr string, config *Config) (*Producer, error) {
addr: addr,
config: *config,

logger: log.New(os.Stderr, "", log.Flags()),
logger: make([]logger, int(LogLevelMax+1)),
logLvl: LogLevelInfo,

transactionChan: make(chan *ProducerTransaction),
exitChan: make(chan int),
responseChan: make(chan []byte),
errorChan: make(chan []byte),
}

// Set default logger for all log levels
l := log.New(os.Stderr, "", log.Flags())
for index, _ := range p.logger {
p.logger[index] = l
}
return p, nil
}

Expand Down Expand Up @@ -119,15 +127,40 @@ func (w *Producer) SetLogger(l logger, lvl LogLevel) {
w.logGuard.Lock()
defer w.logGuard.Unlock()

w.logger = l
for level := range w.logger {
w.logger[level] = l
}
w.logLvl = lvl
}

func (w *Producer) getLogger() (logger, LogLevel) {
// SetLoggerForLevel assigns the same logger for specified `level`.
func (w *Producer) SetLoggerForLevel(l logger, lvl LogLevel) {
w.logGuard.Lock()
defer w.logGuard.Unlock()

w.logger[lvl] = l
}

// SetLoggerLevel sets the package logging level.
func (w *Producer) SetLoggerLevel(lvl LogLevel) {
w.logGuard.Lock()
defer w.logGuard.Unlock()

w.logLvl = lvl
}

func (w *Producer) getLogger(lvl LogLevel) (logger, LogLevel) {
w.logGuard.RLock()
defer w.logGuard.RUnlock()

return w.logger[lvl], w.logLvl
}

func (w *Producer) getLogLevel() LogLevel {
w.logGuard.RLock()
defer w.logGuard.RUnlock()

return w.logger, w.logLvl
return w.logLvl
}

// String returns the address of the Producer
Expand Down Expand Up @@ -273,10 +306,11 @@ func (w *Producer) connect() error {

w.log(LogLevelInfo, "(%s) connecting to nsqd", w.addr)

logger, logLvl := w.getLogger()

w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id))
w.conn.SetLoggerLevel(w.getLogLevel())
for index := range w.logger {
w.conn.SetLoggerForLevel(w.logger[index], LogLevel(index), fmt.Sprintf("%3d (%%s)", w.id))
}

_, err := w.conn.Connect()
if err != nil {
Expand Down Expand Up @@ -369,7 +403,7 @@ func (w *Producer) transactionCleanup() {
}

func (w *Producer) log(lvl LogLevel, line string, args ...interface{}) {
logger, logLvl := w.getLogger()
logger, logLvl := w.getLogger(lvl)

if logger == nil {
return
Expand Down
4 changes: 4 additions & 0 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ func (m *mockProducerConn) String() string {

func (m *mockProducerConn) SetLogger(logger logger, level LogLevel, prefix string) {}

func (m *mockProducerConn) SetLoggerLevel(lvl LogLevel) {}

func (m *mockProducerConn) SetLoggerForLevel(logger logger, level LogLevel, format string) {}

func (m *mockProducerConn) Connect() (*IdentifyResponse, error) {
return &IdentifyResponse{}, nil
}
Expand Down

0 comments on commit 0abe494

Please sign in to comment.