Skip to content

Commit

Permalink
WriterSyncer support Close method
Browse files Browse the repository at this point in the history
  • Loading branch information
sysulq committed Feb 25, 2020
1 parent d8eab9f commit f3a46bc
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 6 deletions.
7 changes: 7 additions & 0 deletions internal/ztest/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ func (s *Syncer) Sync() error {
return s.err
}

// Close records that it was called, then returns the user-supplied error (if
// any).
func (s *Syncer) Close() error {
s.called = true
return s.err
}

// Called reports whether the Sync method was called.
func (s *Syncer) Called() bool {
return s.called
Expand Down
6 changes: 6 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,12 @@ func (log *Logger) Sync() error {
return log.core.Sync()
}

// Close calls the underlying Core's Close method, flushing any buffered log
// entries. Applications should take care to call Close before exiting.
func (log *Logger) Close() error {
return log.core.Close()
}

// Core returns the Logger's underlying zapcore.Core.
func (log *Logger) Core() zapcore.Core {
return log.core
Expand Down
2 changes: 0 additions & 2 deletions sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package zap
import (
"errors"
"fmt"
"io"
"net/url"
"os"
"strings"
Expand Down Expand Up @@ -55,7 +54,6 @@ func resetSinkRegistry() {
// Sink defines the interface to write to and close logger destinations.
type Sink interface {
zapcore.WriteSyncer
io.Closer
}

type nopCloserSink struct{ zapcore.WriteSyncer }
Expand Down
4 changes: 4 additions & 0 deletions writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ func (w *testWriter) Sync() error {
return nil
}

func (w *testWriter) Close() error {
return nil
}

func TestOpenWithErroringSinkFactory(t *testing.T) {
defer resetSinkRegistry()

Expand Down
7 changes: 7 additions & 0 deletions zapcore/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type Core interface {
Write(Entry, []Field) error
// Sync flushes buffered logs (if any).
Sync() error
// Close cleans all WriterSync resources (if any).
Close() error
}

type nopCore struct{}
Expand All @@ -53,6 +55,7 @@ func (n nopCore) With([]Field) Core { return n }
func (nopCore) Check(_ Entry, ce *CheckedEntry) *CheckedEntry { return ce }
func (nopCore) Write(Entry, []Field) error { return nil }
func (nopCore) Sync() error { return nil }
func (nopCore) Close() error { return nil }

// NewCore creates a Core that writes logs to a WriteSyncer.
func NewCore(enc Encoder, ws WriteSyncer, enab LevelEnabler) Core {
Expand Down Expand Up @@ -104,6 +107,10 @@ func (c *ioCore) Sync() error {
return c.out.Sync()
}

func (c *ioCore) Close() error {
return c.out.Close()
}

func (c *ioCore) clone() *ioCore {
return &ioCore{
LevelEnabler: c.LevelEnabler,
Expand Down
1 change: 1 addition & 0 deletions zapcore/sampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (c *countingCore) Write(Entry, []Field) error {
func (c *countingCore) With([]Field) Core { return c }
func (*countingCore) Enabled(Level) bool { return true }
func (*countingCore) Sync() error { return nil }
func (*countingCore) Close() error { return nil }

func TestSamplerConcurrent(t *testing.T) {
const (
Expand Down
8 changes: 8 additions & 0 deletions zapcore/tee.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,11 @@ func (mc multiCore) Sync() error {
}
return err
}

func (mc multiCore) Close() error {
var err error
for i := range mc {
err = multierr.Append(err, mc[i].Close())
}
return err
}
36 changes: 32 additions & 4 deletions zapcore/write_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package zapcore

import (
"bufio"
"context"
"io"
"sync"
"time"
Expand All @@ -33,12 +34,13 @@ import (
// that *os.File (and thus, os.Stderr and os.Stdout) implement WriteSyncer.
type WriteSyncer interface {
io.Writer
Close() error
Sync() error
}

// AddSync converts an io.Writer to a WriteSyncer. It attempts to be
// intelligent: if the concrete type of the io.Writer implements WriteSyncer,
// we'll use the existing Sync method. If it doesn't, we'll add a no-op Sync.
// we'll use the existing Sync and Close method. If it doesn't, we'll add a no-op Sync and Close.
func AddSync(w io.Writer) WriteSyncer {
switch w := w.(type) {
case WriteSyncer:
Expand Down Expand Up @@ -77,9 +79,17 @@ func (s *lockedWriteSyncer) Sync() error {
return err
}

func (s *lockedWriteSyncer) Close() error {
s.Lock()
err := s.ws.Close()
s.Unlock()
return err
}

type bufferWriterSyncer struct {
bufferWriter *bufio.Writer
ws WriteSyncer
bufferWriter *bufio.Writer
cancel context.CancelFunc
}

// defaultBufferSize sizes the buffer associated with each WriterSync.
Expand Down Expand Up @@ -107,26 +117,31 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) WriteSy
flushInterval = defaultFlushInterval
}

ctx, cancel := context.WithCancel(context.Background())

// bufio is not goroutine safe, so add lock writer here
ws = Lock(&bufferWriterSyncer{
bufferWriter: bufio.NewWriterSize(ws, bufferSize),
cancel: cancel,
})

// flush buffer every interval
// we do not need exit this goroutine explicitly
go func() {
for range time.NewTicker(flushInterval).C {
select {
case <-time.NewTicker(flushInterval).C:
if err := ws.Sync(); err != nil {
return
}
case <-ctx.Done():
return
}
}()

return ws
}

func (s *bufferWriterSyncer) Write(bs []byte) (int, error) {

// there are some logic internal for bufio.Writer here:
// 1. when the buffer is enough, data would not be flushed.
// 2. when the buffer is not enough, data would be flushed as soon as the buffer fills up.
Expand All @@ -146,6 +161,11 @@ func (s *bufferWriterSyncer) Sync() error {
return s.bufferWriter.Flush()
}

func (s *bufferWriterSyncer) Close() error {
s.cancel()
return s.Sync()
}

type writerWrapper struct {
io.Writer
}
Expand All @@ -154,6 +174,10 @@ func (w writerWrapper) Sync() error {
return nil
}

func (w writerWrapper) Close() error {
return nil
}

type multiWriteSyncer []WriteSyncer

// NewMultiWriteSyncer creates a WriteSyncer that duplicates its writes
Expand Down Expand Up @@ -192,3 +216,7 @@ func (ws multiWriteSyncer) Sync() error {
}
return err
}

func (ws multiWriteSyncer) Close() error {
return ws.Sync()
}
9 changes: 9 additions & 0 deletions zapcore/write_syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ func TestBufferWriter(t *testing.T) {
requireWriteWorks(t, ws)
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
})

t.Run("cancel context", func(t *testing.T) {
buf := &bytes.Buffer{}
ws := Buffer(Buffer(AddSync(buf), 0, 0), 0, 0)
requireWriteWorks(t, ws)
assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.")
assert.NoError(t, ws.Close(), "Unexpected error calling a no-op Sync method.")
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
})
}

func TestNewMultiWriteSyncerWorksForSingleWriter(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions zaptest/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,7 @@ func (w testingWriter) Write(p []byte) (n int, err error) {
func (w testingWriter) Sync() error {
return nil
}

func (w testingWriter) Close() error {
return nil
}
4 changes: 4 additions & 0 deletions zaptest/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,7 @@ func (co *contextObserver) Write(ent zapcore.Entry, fields []zapcore.Field) erro
func (co *contextObserver) Sync() error {
return nil
}

func (co *contextObserver) Close() error {
return nil
}

0 comments on commit f3a46bc

Please sign in to comment.