diff --git a/apps/nsq_to_file/file_logger.go b/apps/nsq_to_file/file_logger.go index dc26cceb4..4ac6d13d6 100644 --- a/apps/nsq_to_file/file_logger.go +++ b/apps/nsq_to_file/file_logger.go @@ -18,6 +18,7 @@ import ( type FileLogger struct { logf lg.AppLogFunc opts *Options + topic string consumer *nsq.Consumer out *os.File @@ -50,6 +51,7 @@ func NewFileLogger(logf lg.AppLogFunc, opts *Options, topic string, cfg *nsq.Con f := &FileLogger{ logf: logf, opts: opts, + topic: topic, consumer: consumer, logChan: make(chan *nsq.Message, 1), filenameFormat: computedFilenameFormat, @@ -114,12 +116,12 @@ func (f *FileLogger) router() { } _, err := f.Write(m.Body) if err != nil { - f.logf(lg.FATAL, "writing message to disk: %s", err) + f.logf(lg.FATAL, "[%s/%s] writing message to disk: %s", f.topic, f.opts.Channel, err) os.Exit(1) } _, err = f.Write([]byte("\n")) if err != nil { - f.logf(lg.FATAL, "writing newline to disk: %s", err) + f.logf(lg.FATAL, "[%s/%s] writing newline to disk: %s", f.topic, f.opts.Channel, err) os.Exit(1) } output[pos] = m @@ -131,10 +133,10 @@ func (f *FileLogger) router() { if sync || f.consumer.IsStarved() { if pos > 0 { - f.logf(lg.INFO, "syncing %d records to disk", pos) + f.logf(lg.INFO, "[%s/%s] syncing %d records to disk", f.topic, f.opts.Channel, pos) err := f.Sync() if err != nil { - f.logf(lg.FATAL, "failed syncing messages: %s", err) + f.logf(lg.FATAL, "[%s/%s] failed syncing messages: %s", f.topic, f.opts.Channel, err) os.Exit(1) } for pos > 0 { @@ -166,18 +168,18 @@ func (f *FileLogger) Close() { if f.gzipWriter != nil { err := f.gzipWriter.Close() if err != nil { - f.logf(lg.FATAL, "failed to close GZIP writer: %s", err) + f.logf(lg.FATAL, "[%s/%s] failed to close GZIP writer: %s", f.topic, f.opts.Channel, err) os.Exit(1) } } err := f.out.Sync() if err != nil { - f.logf(lg.FATAL, "failed to fsync output file: %s", err) + f.logf(lg.FATAL, "[%s/%s] failed to fsync output file: %s", f.topic, f.opts.Channel, err) os.Exit(1) } err = f.out.Close() if err != nil { - f.logf(lg.FATAL, "failed to close output file: %s", err) + f.logf(lg.FATAL, "[%s/%s] failed to close output file: %s", f.topic, f.opts.Channel, err) os.Exit(1) } @@ -188,12 +190,12 @@ func (f *FileLogger) Close() { dst := filepath.Join(f.opts.OutputDir, strings.TrimPrefix(src, f.opts.WorkDir)) // Optimistic rename - f.logf(lg.INFO, "moving finished file %s to %s", src, dst) + f.logf(lg.INFO, "[%s/%s] moving finished file %s to %s", f.topic, f.opts.Channel, src, dst) err := exclusiveRename(src, dst) if err == nil { return } else if !os.IsExist(err) { - f.logf(lg.FATAL, "unable to move file from %s to %s: %s", src, dst, err) + f.logf(lg.FATAL, "[%s/%s] unable to move file from %s to %s: %s", f.topic, f.opts.Channel, src, dst, err) os.Exit(1) } @@ -204,17 +206,17 @@ func (f *FileLogger) Close() { dstTmpl := filepath.Join(dstDir, filenameTmpl) for i := f.rev + 1; ; i++ { - f.logf(lg.WARN, "destination file already exists: %s", dst) + f.logf(lg.WARN, "[%s/%s] destination file already exists: %s", f.topic, f.opts.Channel, dst) dst := strings.Replace(dstTmpl, "", fmt.Sprintf("-%06d", i), -1) err := exclusiveRename(src, dst) if err != nil { if os.IsExist(err) { continue // next rev } - f.logf(lg.FATAL, "unable to rename file from %s to %s: %s", src, dst, err) + f.logf(lg.FATAL, "[%s/%s] unable to rename file from %s to %s: %s", f.topic, f.opts.Channel, src, dst, err) os.Exit(1) } - f.logf(lg.INFO, "renamed finished file %s to %s to avoid overwrite", src, dst) + f.logf(lg.INFO, "[%s/%s] renamed finished file %s to %s to avoid overwrite", f.topic, f.opts.Channel, src, dst) break } } @@ -259,20 +261,20 @@ func (f *FileLogger) needsRotation() bool { filename := f.currentFilename() if filename != f.filename { - f.logf(lg.INFO, "new filename %s, rotating...", filename) + f.logf(lg.INFO, "[%s/%s] new filename %s, rotating...", f.topic, f.opts.Channel, filename) return true // rotate by filename } if f.opts.RotateInterval > 0 { if s := time.Since(f.openTime); s > f.opts.RotateInterval { - f.logf(lg.INFO, "%s since last open, rotating...", s) + f.logf(lg.INFO, "[%s/%s] %s since last open, rotating...", f.topic, f.opts.Channel, s) return true // rotate by interval } } if f.opts.RotateSize > 0 && f.filesize > f.opts.RotateSize { - f.logf(lg.INFO, "%s currently %d bytes (> %d), rotating...", - f.out.Name(), f.filesize, f.opts.RotateSize) + f.logf(lg.INFO, "[%s/%s] %s currently %d bytes (> %d), rotating...", + f.topic, f.opts.Channel, f.out.Name(), f.filesize, f.opts.RotateSize) return true // rotate by size } @@ -292,9 +294,12 @@ func (f *FileLogger) updateFile() { f.openTime = time.Now() fullPath := path.Join(f.opts.WorkDir, filename) - makeDirFromPath(f.logf, fullPath) + err := makeDirFromPath(f.logf, fullPath) + if err != nil { + f.logf(lg.FATAL, "[%s/%s] unable to create dir: %s", f.topic, f.opts.Channel, err) + os.Exit(1) + } - var err error var fi os.FileInfo for ; ; f.rev++ { absFilename := strings.Replace(fullPath, "", fmt.Sprintf("-%06d", f.rev), -1) @@ -304,14 +309,18 @@ func (f *FileLogger) updateFile() { // prevent conflicts on rename in the normal case if f.opts.WorkDir != f.opts.OutputDir { outputFileName := filepath.Join(f.opts.OutputDir, strings.TrimPrefix(absFilename, f.opts.WorkDir)) - makeDirFromPath(f.logf, outputFileName) + err := makeDirFromPath(f.logf, outputFileName) + if err != nil { + f.logf(lg.FATAL, "[%s/%s] unable to create dir: %s", f.topic, f.opts.Channel, err) + os.Exit(1) + } - _, err := os.Stat(outputFileName) + _, err = os.Stat(outputFileName) if err == nil { - f.logf(lg.WARN, "output file already exists: %s", outputFileName) + f.logf(lg.WARN, "[%s/%s] output file already exists: %s", f.topic, f.opts.Channel, outputFileName) continue // next rev } else if !os.IsNotExist(err) { - f.logf(lg.FATAL, "unable to stat output file %s: %s", outputFileName, err) + f.logf(lg.FATAL, "[%s/%s] unable to stat output file %s: %s", f.topic, f.opts.Channel, outputFileName, err) os.Exit(1) } } @@ -325,24 +334,24 @@ func (f *FileLogger) updateFile() { f.out, err = os.OpenFile(absFilename, openFlag, 0666) if err != nil { if os.IsExist(err) { - f.logf(lg.WARN, "working file already exists: %s", absFilename) + f.logf(lg.WARN, "[%s/%s] working file already exists: %s", f.topic, f.opts.Channel, absFilename) continue // next rev } - f.logf(lg.FATAL, "unable to open %s: %s", absFilename, err) + f.logf(lg.FATAL, "[%s/%s] unable to open %s: %s", f.topic, f.opts.Channel, absFilename, err) os.Exit(1) } - f.logf(lg.INFO, "opening %s", absFilename) + f.logf(lg.INFO, "[%s/%s] opening %s", f.topic, f.opts.Channel, absFilename) fi, err = f.out.Stat() if err != nil { - f.logf(lg.FATAL, "unable to stat file %s: %s", f.out.Name(), err) + f.logf(lg.FATAL, "[%s/%s] unable to stat file %s: %s", f.topic, f.opts.Channel, f.out.Name(), err) } f.filesize = fi.Size() if f.opts.RotateSize > 0 && f.filesize > f.opts.RotateSize { - f.logf(lg.INFO, "%s currently %d bytes (> %d), rotating...", - f.out.Name(), f.filesize, f.opts.RotateSize) + f.logf(lg.INFO, "[%s/%s] %s currently %d bytes (> %d), rotating...", + f.topic, f.opts.Channel, f.out.Name(), f.filesize, f.opts.RotateSize) continue // next rev } @@ -357,15 +366,12 @@ func (f *FileLogger) updateFile() { } } -func makeDirFromPath(logf lg.AppLogFunc, path string) { +func makeDirFromPath(logf lg.AppLogFunc, path string) error { dir, _ := filepath.Split(path) if dir != "" { - err := os.MkdirAll(dir, 0770) - if err != nil { - logf(lg.FATAL, "unable to create dir %s: %s", dir, err) - os.Exit(1) - } + return os.MkdirAll(dir, 0770) } + return nil } func exclusiveRename(src, dst string) error {