Skip to content

Commit

Permalink
Make flushDaemon stoppable and testable
Browse files Browse the repository at this point in the history
Refactor flushDaemon function into a seperate struct. Instead of
starting the daemon in the init function, it is now started during
createFile, where it is only used. Add function to stop daemon when
testing. Test daemon flushing.
  • Loading branch information
katexochen committed Feb 10, 2022
1 parent 6d93ff0 commit b8f32b4
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 13 deletions.
66 changes: 62 additions & 4 deletions klog.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func init() {
logging.addDirHeader = false
logging.skipLogHeaders = false
logging.oneOutput = false
go logging.flushDaemon()
logging.flushD = newFlushDaemon(flushInterval, logging.lockAndFlushAll)
}

// InitFlags is for explicitly initializing the flags.
Expand Down Expand Up @@ -471,6 +471,8 @@ type loggingT struct {
mu sync.Mutex
// file holds writer for each of the log types.
file [numSeverity]flushSyncWriter
// flushD holds a flushDaemon that frequently flushes log file buffers.
flushD *flushDaemon
// pcs is used in V to avoid an allocation when computing the caller's PC.
pcs [1]uintptr
// vmap is a cache of the V Level for each V() call site, identified by PC.
Expand Down Expand Up @@ -1270,6 +1272,9 @@ const bufferSize = 256 * 1024
// createFiles creates all the log files for severity from sev down to infoLog.
// l.mu is held.
func (l *loggingT) createFiles(sev severity) error {
if l.flushD != nil {
l.flushD.run()
}
now := time.Now()
// Files are created in decreasing severity order, so as soon as we find one
// has already been created, we can stop.
Expand All @@ -1290,12 +1295,65 @@ func (l *loggingT) createFiles(sev severity) error {
const flushInterval = 5 * time.Second

// flushDaemon periodically flushes the log file buffers.
func (l *loggingT) flushDaemon() {
for range time.NewTicker(flushInterval).C {
l.lockAndFlushAll()
type flushDaemon struct {
interval time.Duration
flush func()
stopC chan struct{}
stopDone chan struct{}
}

// newFlushDaemon retrun a new flushDaemon.
func newFlushDaemon(interval time.Duration, flush func()) *flushDaemon {
return &flushDaemon{
interval: interval,
flush: flush,
}
}

// run starts a goroutine that periodically calls the daemons flush function.
// Calling run on an already running daemon will have no effect.
func (f *flushDaemon) run() {
if f.isRunning() {
return
}

f.stopC = make(chan struct{}, 1)
f.stopDone = make(chan struct{}, 1)

go func() {
ticker := time.NewTicker(f.interval)
for {
select {
case <-ticker.C:
f.flush()
case <-f.stopC:
ticker.Stop()
f.stopDone <- struct{}{}
return
}
}
}()
}

// stop stops the running flushDaemon and waits until the daemon has shut down.
// Calling stop on a daemon that isn't running will have no effect.
func (f *flushDaemon) stop() {
if !f.isRunning() {
return
}

f.stopC <- struct{}{}
<-f.stopDone

f.stopC = nil
f.stopDone = nil
}

// isRunning returns true if the flush daemon is running.
func (f *flushDaemon) isRunning() bool {
return f.stopC != nil
}

// lockAndFlushAll is like flushAll but locks l.mu first.
func (l *loggingT) lockAndFlushAll() {
l.mu.Lock()
Expand Down
38 changes: 29 additions & 9 deletions klog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,10 +379,9 @@ func TestSetOutputDataRace(t *testing.T) {
setFlags()
defer logging.swap(logging.newBuffers())
var wg sync.WaitGroup
var daemons []*flushDaemon
for i := 1; i <= 50; i++ {
go func() {
logging.flushDaemon()
}()
daemons = append(daemons, newFlushDaemon(time.Second, logging.lockAndFlushAll))
}
for i := 1; i <= 50; i++ {
wg.Add(1)
Expand All @@ -392,9 +391,7 @@ func TestSetOutputDataRace(t *testing.T) {
}()
}
for i := 1; i <= 50; i++ {
go func() {
logging.flushDaemon()
}()
daemons = append(daemons, newFlushDaemon(time.Second, logging.lockAndFlushAll))
}
for i := 1; i <= 50; i++ {
wg.Add(1)
Expand All @@ -404,11 +401,12 @@ func TestSetOutputDataRace(t *testing.T) {
}()
}
for i := 1; i <= 50; i++ {
go func() {
logging.flushDaemon()
}()
daemons = append(daemons, newFlushDaemon(time.Second, logging.lockAndFlushAll))
}
wg.Wait()
for _, d := range daemons {
d.stop()
}
}

func TestLogToOutput(t *testing.T) {
Expand Down Expand Up @@ -1985,3 +1983,25 @@ func (s *structWithLock) addWithDefer() {
defer s.m.Unlock()
s.n++
}

func TestFlushDaemon(t *testing.T) {
for sev := infoLog; sev < fatalLog; sev++ {
var flushed int
spyFunc := func() { flushed += 1 }
testLog := loggingT{flushD: newFlushDaemon(10*time.Millisecond, spyFunc)}

// Calling testLog will call createFile, which should start the daemon.
testLog.print(sev, nil, nil, "x")

if !testLog.flushD.isRunning() {
t.Error("expected flushD to be running")
}

time.Sleep(150 * time.Millisecond)
testLog.flushD.stop()

if flushed < 10 {
t.Errorf("expected syncBuffer to be flushed at least 10 times, but was %d times", flushed)
}
}
}

0 comments on commit b8f32b4

Please sign in to comment.