Skip to content

Commit

Permalink
nsq_to_file: include topic/channel for all logging output
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Mar 13, 2019
1 parent 3eb619f commit 135e777
Showing 1 changed file with 40 additions and 34 deletions.
74 changes: 40 additions & 34 deletions apps/nsq_to_file/file_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
type FileLogger struct {
logf lg.AppLogFunc
opts *Options
topic string
consumer *nsq.Consumer

out *os.File
Expand Down Expand Up @@ -50,6 +51,7 @@ func NewFileLogger(logf lg.AppLogFunc, opts *Options, topic string, cfg *nsq.Con
f := &FileLogger{
logf: logf,
opts: opts,
topic: topic,
consumer: consumer,
logChan: make(chan *nsq.Message, 1),
filenameFormat: computedFilenameFormat,
Expand Down Expand Up @@ -114,12 +116,12 @@ func (f *FileLogger) router() {
}
_, err := f.Write(m.Body)
if err != nil {
f.logf(lg.FATAL, "writing message to disk: %s", err)
f.logf(lg.FATAL, "[%s/%s] writing message to disk: %s", f.topic, f.opts.Channel, err)
os.Exit(1)
}
_, err = f.Write([]byte("\n"))
if err != nil {
f.logf(lg.FATAL, "writing newline to disk: %s", err)
f.logf(lg.FATAL, "[%s/%s] writing newline to disk: %s", f.topic, f.opts.Channel, err)
os.Exit(1)
}
output[pos] = m
Expand All @@ -131,10 +133,10 @@ func (f *FileLogger) router() {

if sync || f.consumer.IsStarved() {
if pos > 0 {
f.logf(lg.INFO, "syncing %d records to disk", pos)
f.logf(lg.INFO, "[%s/%s] syncing %d records to disk", f.topic, f.opts.Channel, pos)
err := f.Sync()
if err != nil {
f.logf(lg.FATAL, "failed syncing messages: %s", err)
f.logf(lg.FATAL, "[%s/%s] failed syncing messages: %s", f.topic, f.opts.Channel, err)
os.Exit(1)
}
for pos > 0 {
Expand Down Expand Up @@ -166,18 +168,18 @@ func (f *FileLogger) Close() {
if f.gzipWriter != nil {
err := f.gzipWriter.Close()
if err != nil {
f.logf(lg.FATAL, "failed to close GZIP writer: %s", err)
f.logf(lg.FATAL, "[%s/%s] failed to close GZIP writer: %s", f.topic, f.opts.Channel, err)
os.Exit(1)
}
}
err := f.out.Sync()
if err != nil {
f.logf(lg.FATAL, "failed to fsync output file: %s", err)
f.logf(lg.FATAL, "[%s/%s] failed to fsync output file: %s", f.topic, f.opts.Channel, err)
os.Exit(1)
}
err = f.out.Close()
if err != nil {
f.logf(lg.FATAL, "failed to close output file: %s", err)
f.logf(lg.FATAL, "[%s/%s] failed to close output file: %s", f.topic, f.opts.Channel, err)
os.Exit(1)
}

Expand All @@ -188,12 +190,12 @@ func (f *FileLogger) Close() {
dst := filepath.Join(f.opts.OutputDir, strings.TrimPrefix(src, f.opts.WorkDir))

// Optimistic rename
f.logf(lg.INFO, "moving finished file %s to %s", src, dst)
f.logf(lg.INFO, "[%s/%s] moving finished file %s to %s", f.topic, f.opts.Channel, src, dst)
err := exclusiveRename(src, dst)
if err == nil {
return
} else if !os.IsExist(err) {
f.logf(lg.FATAL, "unable to move file from %s to %s: %s", src, dst, err)
f.logf(lg.FATAL, "[%s/%s] unable to move file from %s to %s: %s", f.topic, f.opts.Channel, src, dst, err)
os.Exit(1)
}

Expand All @@ -204,17 +206,17 @@ func (f *FileLogger) Close() {
dstTmpl := filepath.Join(dstDir, filenameTmpl)

for i := f.rev + 1; ; i++ {
f.logf(lg.WARN, "destination file already exists: %s", dst)
f.logf(lg.WARN, "[%s/%s] destination file already exists: %s", f.topic, f.opts.Channel, dst)
dst := strings.Replace(dstTmpl, "<REV>", fmt.Sprintf("-%06d", i), -1)
err := exclusiveRename(src, dst)
if err != nil {
if os.IsExist(err) {
continue // next rev
}
f.logf(lg.FATAL, "unable to rename file from %s to %s: %s", src, dst, err)
f.logf(lg.FATAL, "[%s/%s] unable to rename file from %s to %s: %s", f.topic, f.opts.Channel, src, dst, err)
os.Exit(1)
}
f.logf(lg.INFO, "renamed finished file %s to %s to avoid overwrite", src, dst)
f.logf(lg.INFO, "[%s/%s] renamed finished file %s to %s to avoid overwrite", f.topic, f.opts.Channel, src, dst)
break
}
}
Expand Down Expand Up @@ -259,20 +261,20 @@ func (f *FileLogger) needsRotation() bool {

filename := f.currentFilename()
if filename != f.filename {
f.logf(lg.INFO, "new filename %s, rotating...", filename)
f.logf(lg.INFO, "[%s/%s] new filename %s, rotating...", f.topic, f.opts.Channel, filename)
return true // rotate by filename
}

if f.opts.RotateInterval > 0 {
if s := time.Since(f.openTime); s > f.opts.RotateInterval {
f.logf(lg.INFO, "%s since last open, rotating...", s)
f.logf(lg.INFO, "[%s/%s] %s since last open, rotating...", f.topic, f.opts.Channel, s)
return true // rotate by interval
}
}

if f.opts.RotateSize > 0 && f.filesize > f.opts.RotateSize {
f.logf(lg.INFO, "%s currently %d bytes (> %d), rotating...",
f.out.Name(), f.filesize, f.opts.RotateSize)
f.logf(lg.INFO, "[%s/%s] %s currently %d bytes (> %d), rotating...",
f.topic, f.opts.Channel, f.out.Name(), f.filesize, f.opts.RotateSize)
return true // rotate by size
}

Expand All @@ -292,9 +294,12 @@ func (f *FileLogger) updateFile() {
f.openTime = time.Now()

fullPath := path.Join(f.opts.WorkDir, filename)
makeDirFromPath(f.logf, fullPath)
err := makeDirFromPath(f.logf, fullPath)
if err != nil {
f.logf(lg.FATAL, "[%s/%s] unable to create dir: %s", f.topic, f.opts.Channel, err)
os.Exit(1)
}

var err error
var fi os.FileInfo
for ; ; f.rev++ {
absFilename := strings.Replace(fullPath, "<REV>", fmt.Sprintf("-%06d", f.rev), -1)
Expand All @@ -304,14 +309,18 @@ func (f *FileLogger) updateFile() {
// prevent conflicts on rename in the normal case
if f.opts.WorkDir != f.opts.OutputDir {
outputFileName := filepath.Join(f.opts.OutputDir, strings.TrimPrefix(absFilename, f.opts.WorkDir))
makeDirFromPath(f.logf, outputFileName)
err := makeDirFromPath(f.logf, outputFileName)
if err != nil {
f.logf(lg.FATAL, "[%s/%s] unable to create dir: %s", f.topic, f.opts.Channel, err)
os.Exit(1)
}

_, err := os.Stat(outputFileName)
_, err = os.Stat(outputFileName)
if err == nil {
f.logf(lg.WARN, "output file already exists: %s", outputFileName)
f.logf(lg.WARN, "[%s/%s] output file already exists: %s", f.topic, f.opts.Channel, outputFileName)
continue // next rev
} else if !os.IsNotExist(err) {
f.logf(lg.FATAL, "unable to stat output file %s: %s", outputFileName, err)
f.logf(lg.FATAL, "[%s/%s] unable to stat output file %s: %s", f.topic, f.opts.Channel, outputFileName, err)
os.Exit(1)
}
}
Expand All @@ -325,24 +334,24 @@ func (f *FileLogger) updateFile() {
f.out, err = os.OpenFile(absFilename, openFlag, 0666)
if err != nil {
if os.IsExist(err) {
f.logf(lg.WARN, "working file already exists: %s", absFilename)
f.logf(lg.WARN, "[%s/%s] working file already exists: %s", f.topic, f.opts.Channel, absFilename)
continue // next rev
}
f.logf(lg.FATAL, "unable to open %s: %s", absFilename, err)
f.logf(lg.FATAL, "[%s/%s] unable to open %s: %s", f.topic, f.opts.Channel, absFilename, err)
os.Exit(1)
}

f.logf(lg.INFO, "opening %s", absFilename)
f.logf(lg.INFO, "[%s/%s] opening %s", f.topic, f.opts.Channel, absFilename)

fi, err = f.out.Stat()
if err != nil {
f.logf(lg.FATAL, "unable to stat file %s: %s", f.out.Name(), err)
f.logf(lg.FATAL, "[%s/%s] unable to stat file %s: %s", f.topic, f.opts.Channel, f.out.Name(), err)
}
f.filesize = fi.Size()

if f.opts.RotateSize > 0 && f.filesize > f.opts.RotateSize {
f.logf(lg.INFO, "%s currently %d bytes (> %d), rotating...",
f.out.Name(), f.filesize, f.opts.RotateSize)
f.logf(lg.INFO, "[%s/%s] %s currently %d bytes (> %d), rotating...",
f.topic, f.opts.Channel, f.out.Name(), f.filesize, f.opts.RotateSize)
continue // next rev
}

Expand All @@ -357,15 +366,12 @@ func (f *FileLogger) updateFile() {
}
}

func makeDirFromPath(logf lg.AppLogFunc, path string) {
func makeDirFromPath(logf lg.AppLogFunc, path string) error {
dir, _ := filepath.Split(path)
if dir != "" {
err := os.MkdirAll(dir, 0770)
if err != nil {
logf(lg.FATAL, "unable to create dir %s: %s", dir, err)
os.Exit(1)
}
return os.MkdirAll(dir, 0770)
}
return nil
}

func exclusiveRename(src, dst string) error {
Expand Down

0 comments on commit 135e777

Please sign in to comment.