From f3a46bc15df524f5c7087e8adff8698c37a9b38a Mon Sep 17 00:00:00 2001 From: hnlq715 Date: Tue, 25 Feb 2020 10:49:19 +0800 Subject: [PATCH] WriterSyncer support Close method --- internal/ztest/writer.go | 7 +++++++ logger.go | 6 ++++++ sink.go | 2 -- writer_test.go | 4 ++++ zapcore/core.go | 7 +++++++ zapcore/sampler_test.go | 1 + zapcore/tee.go | 8 ++++++++ zapcore/write_syncer.go | 36 ++++++++++++++++++++++++++++++++---- zapcore/write_syncer_test.go | 9 +++++++++ zaptest/logger.go | 4 ++++ zaptest/observer/observer.go | 4 ++++ 11 files changed, 82 insertions(+), 6 deletions(-) diff --git a/internal/ztest/writer.go b/internal/ztest/writer.go index 9fdd5805e..2c1a3fb89 100644 --- a/internal/ztest/writer.go +++ b/internal/ztest/writer.go @@ -45,6 +45,13 @@ func (s *Syncer) Sync() error { return s.err } +// Close records that it was called, then returns the user-supplied error (if +// any). +func (s *Syncer) Close() error { + s.called = true + return s.err +} + // Called reports whether the Sync method was called. func (s *Syncer) Called() bool { return s.called diff --git a/logger.go b/logger.go index cd6e19551..719c2d1c4 100644 --- a/logger.go +++ b/logger.go @@ -243,6 +243,12 @@ func (log *Logger) Sync() error { return log.core.Sync() } +// Close calls the underlying Core's Close method, flushing any buffered log +// entries. Applications should take care to call Close before exiting. +func (log *Logger) Close() error { + return log.core.Close() +} + // Core returns the Logger's underlying zapcore.Core. func (log *Logger) Core() zapcore.Core { return log.core diff --git a/sink.go b/sink.go index ff0becfe5..03a9206a9 100644 --- a/sink.go +++ b/sink.go @@ -23,7 +23,6 @@ package zap import ( "errors" "fmt" - "io" "net/url" "os" "strings" @@ -55,7 +54,6 @@ func resetSinkRegistry() { // Sink defines the interface to write to and close logger destinations. type Sink interface { zapcore.WriteSyncer - io.Closer } type nopCloserSink struct{ zapcore.WriteSyncer } diff --git a/writer_test.go b/writer_test.go index 0dc8312b4..3d77ad277 100644 --- a/writer_test.go +++ b/writer_test.go @@ -152,6 +152,10 @@ func (w *testWriter) Sync() error { return nil } +func (w *testWriter) Close() error { + return nil +} + func TestOpenWithErroringSinkFactory(t *testing.T) { defer resetSinkRegistry() diff --git a/zapcore/core.go b/zapcore/core.go index a1ef8b034..98abc6399 100644 --- a/zapcore/core.go +++ b/zapcore/core.go @@ -42,6 +42,8 @@ type Core interface { Write(Entry, []Field) error // Sync flushes buffered logs (if any). Sync() error + // Close cleans all WriterSync resources (if any). + Close() error } type nopCore struct{} @@ -53,6 +55,7 @@ func (n nopCore) With([]Field) Core { return n } func (nopCore) Check(_ Entry, ce *CheckedEntry) *CheckedEntry { return ce } func (nopCore) Write(Entry, []Field) error { return nil } func (nopCore) Sync() error { return nil } +func (nopCore) Close() error { return nil } // NewCore creates a Core that writes logs to a WriteSyncer. func NewCore(enc Encoder, ws WriteSyncer, enab LevelEnabler) Core { @@ -104,6 +107,10 @@ func (c *ioCore) Sync() error { return c.out.Sync() } +func (c *ioCore) Close() error { + return c.out.Close() +} + func (c *ioCore) clone() *ioCore { return &ioCore{ LevelEnabler: c.LevelEnabler, diff --git a/zapcore/sampler_test.go b/zapcore/sampler_test.go index 9ba278b0b..2eb659338 100644 --- a/zapcore/sampler_test.go +++ b/zapcore/sampler_test.go @@ -150,6 +150,7 @@ func (c *countingCore) Write(Entry, []Field) error { func (c *countingCore) With([]Field) Core { return c } func (*countingCore) Enabled(Level) bool { return true } func (*countingCore) Sync() error { return nil } +func (*countingCore) Close() error { return nil } func TestSamplerConcurrent(t *testing.T) { const ( diff --git a/zapcore/tee.go b/zapcore/tee.go index 07a32eef9..93ac0e7dc 100644 --- a/zapcore/tee.go +++ b/zapcore/tee.go @@ -79,3 +79,11 @@ func (mc multiCore) Sync() error { } return err } + +func (mc multiCore) Close() error { + var err error + for i := range mc { + err = multierr.Append(err, mc[i].Close()) + } + return err +} diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index a31f1a765..97b2a17b5 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -22,6 +22,7 @@ package zapcore import ( "bufio" + "context" "io" "sync" "time" @@ -33,12 +34,13 @@ import ( // that *os.File (and thus, os.Stderr and os.Stdout) implement WriteSyncer. type WriteSyncer interface { io.Writer + Close() error Sync() error } // AddSync converts an io.Writer to a WriteSyncer. It attempts to be // intelligent: if the concrete type of the io.Writer implements WriteSyncer, -// we'll use the existing Sync method. If it doesn't, we'll add a no-op Sync. +// we'll use the existing Sync and Close method. If it doesn't, we'll add a no-op Sync and Close. func AddSync(w io.Writer) WriteSyncer { switch w := w.(type) { case WriteSyncer: @@ -77,9 +79,17 @@ func (s *lockedWriteSyncer) Sync() error { return err } +func (s *lockedWriteSyncer) Close() error { + s.Lock() + err := s.ws.Close() + s.Unlock() + return err +} + type bufferWriterSyncer struct { - bufferWriter *bufio.Writer ws WriteSyncer + bufferWriter *bufio.Writer + cancel context.CancelFunc } // defaultBufferSize sizes the buffer associated with each WriterSync. @@ -107,18 +117,24 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) WriteSy flushInterval = defaultFlushInterval } + ctx, cancel := context.WithCancel(context.Background()) + // bufio is not goroutine safe, so add lock writer here ws = Lock(&bufferWriterSyncer{ bufferWriter: bufio.NewWriterSize(ws, bufferSize), + cancel: cancel, }) // flush buffer every interval // we do not need exit this goroutine explicitly go func() { - for range time.NewTicker(flushInterval).C { + select { + case <-time.NewTicker(flushInterval).C: if err := ws.Sync(); err != nil { return } + case <-ctx.Done(): + return } }() @@ -126,7 +142,6 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) WriteSy } func (s *bufferWriterSyncer) Write(bs []byte) (int, error) { - // there are some logic internal for bufio.Writer here: // 1. when the buffer is enough, data would not be flushed. // 2. when the buffer is not enough, data would be flushed as soon as the buffer fills up. @@ -146,6 +161,11 @@ func (s *bufferWriterSyncer) Sync() error { return s.bufferWriter.Flush() } +func (s *bufferWriterSyncer) Close() error { + s.cancel() + return s.Sync() +} + type writerWrapper struct { io.Writer } @@ -154,6 +174,10 @@ func (w writerWrapper) Sync() error { return nil } +func (w writerWrapper) Close() error { + return nil +} + type multiWriteSyncer []WriteSyncer // NewMultiWriteSyncer creates a WriteSyncer that duplicates its writes @@ -192,3 +216,7 @@ func (ws multiWriteSyncer) Sync() error { } return err } + +func (ws multiWriteSyncer) Close() error { + return ws.Sync() +} diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index 257a0eb74..33f93643a 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -85,6 +85,15 @@ func TestBufferWriter(t *testing.T) { requireWriteWorks(t, ws) assert.Equal(t, "foo", buf.String(), "Unexpected log string") }) + + t.Run("cancel context", func(t *testing.T) { + buf := &bytes.Buffer{} + ws := Buffer(Buffer(AddSync(buf), 0, 0), 0, 0) + requireWriteWorks(t, ws) + assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") + assert.NoError(t, ws.Close(), "Unexpected error calling a no-op Sync method.") + assert.Equal(t, "foo", buf.String(), "Unexpected log string") + }) } func TestNewMultiWriteSyncerWorksForSingleWriter(t *testing.T) { diff --git a/zaptest/logger.go b/zaptest/logger.go index 1e2451c26..f95304e4f 100644 --- a/zaptest/logger.go +++ b/zaptest/logger.go @@ -138,3 +138,7 @@ func (w testingWriter) Write(p []byte) (n int, err error) { func (w testingWriter) Sync() error { return nil } + +func (w testingWriter) Close() error { + return nil +} diff --git a/zaptest/observer/observer.go b/zaptest/observer/observer.go index 78f5be45d..f698fa96e 100644 --- a/zaptest/observer/observer.go +++ b/zaptest/observer/observer.go @@ -165,3 +165,7 @@ func (co *contextObserver) Write(ent zapcore.Entry, fields []zapcore.Field) erro func (co *contextObserver) Sync() error { return nil } + +func (co *contextObserver) Close() error { + return nil +}