From 3cbdbe48bb539d9210cceb786bfd738a5ce0d948 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Thu, 3 Jan 2019 14:00:30 -0800 Subject: [PATCH] nsq_to_file: add --log-level and --log-prefix --- apps/nsq_to_file/file_logger.go | 65 ++++++++++++++++------------ apps/nsq_to_file/nsq_to_file.go | 12 +++++ apps/nsq_to_file/options.go | 12 ++++- apps/nsq_to_file/topic_discoverer.go | 8 ++-- 4 files changed, 64 insertions(+), 33 deletions(-) diff --git a/apps/nsq_to_file/file_logger.go b/apps/nsq_to_file/file_logger.go index 232040726..86cde941c 100644 --- a/apps/nsq_to_file/file_logger.go +++ b/apps/nsq_to_file/file_logger.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "log" "os" "path" "path/filepath" @@ -13,6 +12,7 @@ import ( "time" "github.com/nsqio/go-nsq" + "github.com/nsqio/nsq/internal/lg" ) type FileLogger struct { @@ -112,11 +112,13 @@ func (f *FileLogger) router() { } _, err := f.Write(m.Body) if err != nil { - log.Fatalf("ERROR: writing message to disk - %s", err) + f.opts.logf(lg.FATAL, "writing message to disk: %s", err) + os.Exit(1) } _, err = f.Write([]byte("\n")) if err != nil { - log.Fatalf("ERROR: writing newline to disk - %s", err) + f.opts.logf(lg.FATAL, "writing newline to disk: %s", err) + os.Exit(1) } output[pos] = m pos++ @@ -127,10 +129,11 @@ func (f *FileLogger) router() { if sync || f.consumer.IsStarved() { if pos > 0 { - log.Printf("syncing %d records to disk", pos) + f.opts.logf(lg.INFO, "syncing %d records to disk", pos) err := f.Sync() if err != nil { - log.Fatalf("ERROR: failed syncing messages - %s", err) + f.opts.logf(lg.FATAL, "failed syncing messages: %s", err) + os.Exit(1) } for pos > 0 { pos-- @@ -161,16 +164,19 @@ func (f *FileLogger) Close() { if f.gzipWriter != nil { err := f.gzipWriter.Close() if err != nil { - log.Fatalf("ERROR: failed to close GZIP writer: %s", err) + f.opts.logf(lg.FATAL, "failed to close GZIP writer: %s", err) + os.Exit(1) } } err := f.out.Sync() if err != nil { - log.Fatalf("ERROR: failed to fsync output file: %s", err) + f.opts.logf(lg.FATAL, "failed to fsync output file: %s", err) + os.Exit(1) } err = f.out.Close() if err != nil { - log.Fatalf("ERROR: failed to close output file: %s", err) + f.opts.logf(lg.FATAL, "failed to close output file: %s", err) + os.Exit(1) } // Move file from work dir to output dir if necessary, taking care not @@ -180,13 +186,13 @@ func (f *FileLogger) Close() { dst := filepath.Join(f.opts.OutputDir, strings.TrimPrefix(src, f.opts.WorkDir)) // Optimistic rename - log.Printf("INFO: moving finished file %s to %s", src, dst) + f.opts.logf(lg.INFO, "moving finished file %s to %s", src, dst) err := exclusiveRename(src, dst) if err == nil { return } else if !os.IsExist(err) { - log.Fatalf("ERROR: unable to move file from %s to %s: %s", src, dst, err) - return + f.opts.logf(lg.FATAL, "unable to move file from %s to %s: %s", src, dst, err) + os.Exit(1) } // Optimistic rename failed, so we need to generate a new @@ -196,17 +202,17 @@ func (f *FileLogger) Close() { dstTmpl := filepath.Join(dstDir, filenameTmpl) for i := f.rev + 1; ; i++ { - log.Printf("INFO: destination file already exists: %s", dst) + f.opts.logf(lg.WARN, "destination file already exists: %s", dst) dst := strings.Replace(dstTmpl, "", fmt.Sprintf("-%06d", i), -1) err := exclusiveRename(src, dst) if err != nil { if os.IsExist(err) { continue // next rev } - log.Fatalf("ERROR: unable to rename file from %s to %s: %s", src, dst, err) - return + f.opts.logf(lg.FATAL, "unable to rename file from %s to %s: %s", src, dst, err) + os.Exit(1) } - log.Printf("INFO: renamed finished file %s to %s to avoid overwrite", src, dst) + f.opts.logf(lg.INFO, "renamed finished file %s to %s to avoid overwrite", src, dst) break } } @@ -243,19 +249,19 @@ func (f *FileLogger) needsRotation() bool { filename := f.currentFilename() if filename != f.filename { - log.Printf("INFO: new filename %s, rotating...", filename) + f.opts.logf(lg.INFO, "new filename %s, rotating...", filename) return true // rotate by filename } if f.opts.RotateInterval > 0 { if s := time.Since(f.openTime); s > f.opts.RotateInterval { - log.Printf("INFO: %s since last open, rotating...", s) + f.opts.logf(lg.INFO, "%s since last open, rotating...", s) return true // rotate by interval } } if f.opts.RotateSize > 0 && f.filesize > f.opts.RotateSize { - log.Printf("INFO: %s currently %d bytes (> %d), rotating...", + f.opts.logf(lg.INFO, "%s currently %d bytes (> %d), rotating...", f.out.Name(), f.filesize, f.opts.RotateSize) return true // rotate by size } @@ -274,7 +280,7 @@ func (f *FileLogger) updateFile() { f.openTime = time.Now() fullPath := path.Join(f.opts.WorkDir, filename) - makeDirFromPath(fullPath) + makeDirFromPath(f.opts.logf, fullPath) f.Close() @@ -288,14 +294,15 @@ func (f *FileLogger) updateFile() { // prevent conflicts on rename in the normal case if f.opts.WorkDir != f.opts.OutputDir { outputFileName := filepath.Join(f.opts.OutputDir, strings.TrimPrefix(absFilename, f.opts.WorkDir)) - makeDirFromPath(outputFileName) + makeDirFromPath(f.opts.logf, outputFileName) _, err := os.Stat(outputFileName) if err == nil { - log.Printf("INFO: output file already exists: %s", outputFileName) + f.opts.logf(lg.WARN, "output file already exists: %s", outputFileName) continue // next rev } else if !os.IsNotExist(err) { - log.Fatalf("ERROR: unable to stat output file %s: %s", outputFileName, err) + f.opts.logf(lg.FATAL, "unable to stat output file %s: %s", outputFileName, err) + os.Exit(1) } } @@ -308,17 +315,18 @@ func (f *FileLogger) updateFile() { f.out, err = os.OpenFile(absFilename, openFlag, 0666) if err != nil { if os.IsExist(err) { - log.Printf("INFO: working file already exists: %s", absFilename) + f.opts.logf(lg.WARN, "working file already exists: %s", absFilename) continue } - log.Fatalf("ERROR: %s Unable to open %s", err, absFilename) + f.opts.logf(lg.FATAL, "unable to open %s: %s", absFilename, err) + os.Exit(1) } - log.Printf("INFO: opening %s", absFilename) + f.opts.logf(lg.INFO, "opening %s", absFilename) fi, err = f.out.Stat() if err != nil { - log.Fatalf("ERROR: %s Unable to stat file %s", err, f.out.Name()) + f.opts.logf(lg.FATAL, "unable to stat file %s: %s", f.out.Name(), err) } f.filesize = fi.Size() if f.filesize == 0 { @@ -340,12 +348,13 @@ func (f *FileLogger) updateFile() { } } -func makeDirFromPath(path string) { +func makeDirFromPath(logf lg.AppLogFunc, path string) { dir, _ := filepath.Split(path) if dir != "" { err := os.MkdirAll(dir, 0770) if err != nil { - log.Fatalf("ERROR: %s Unable to create %s", err, dir) + logf(lg.FATAL, "unable to create dir %s: %s", dir, err) + os.Exit(1) } } } diff --git a/apps/nsq_to_file/nsq_to_file.go b/apps/nsq_to_file/nsq_to_file.go index 0da8d499d..5a5ec339f 100644 --- a/apps/nsq_to_file/nsq_to_file.go +++ b/apps/nsq_to_file/nsq_to_file.go @@ -14,6 +14,7 @@ import ( "github.com/mreiferson/go-options" "github.com/nsqio/go-nsq" "github.com/nsqio/nsq/internal/app" + "github.com/nsqio/nsq/internal/lg" "github.com/nsqio/nsq/internal/version" ) @@ -31,6 +32,8 @@ func flagSet() *flag.FlagSet { fs := flag.NewFlagSet("nsqd", flag.ExitOnError) fs.Bool("version", false, "print version string") + fs.String("log-level", "info", "set log verbosity: debug, info, warn, error, or fatal") + fs.String("log-prefix", "[nsq_to_file] ", "log message prefix") fs.String("channel", "nsq_to_file", "nsq channel") fs.Int("max-in-flight", 200, "max number of messages to allow in flight") @@ -72,6 +75,15 @@ func main() { opts := NewOptions() options.Resolve(opts, fs, nil) + logger := log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds) + logLevel, err := lg.ParseLogLevel(opts.LogLevel, false) + if err != nil { + log.Fatal("--log-level is invalid") + } + opts.logf = func(lvl lg.LogLevel, f string, args ...interface{}) { + lg.Logf(logger, logLevel, lvl, f, args...) + } + if fs.Lookup("version").Value.(flag.Getter).Get().(bool) { fmt.Printf("nsq_to_file v%s\n", version.Binary) return diff --git a/apps/nsq_to_file/options.go b/apps/nsq_to_file/options.go index 189c5f35f..0c0818ab0 100644 --- a/apps/nsq_to_file/options.go +++ b/apps/nsq_to_file/options.go @@ -1,6 +1,10 @@ package main -import "time" +import ( + "time" + + "github.com/nsqio/nsq/internal/lg" +) type Options struct { Topics []string `flag:"topic"` @@ -15,6 +19,8 @@ type Options struct { HTTPClientConnectTimeout time.Duration `flag:"http-client-connect-timeout"` HTTPClientRequestTimeout time.Duration `flag:"http-client-request-timeout"` + LogPrefix string `flag:"log-prefix"` + LogLevel string `flag:"log-level"` OutputDir string `flag:"output-dir"` WorkDir string `flag:"work-dir"` DatetimeFormat string `flag:"datetime-format"` @@ -26,10 +32,14 @@ type Options struct { RotateSize int64 `flag:"rotate-size"` RotateInterval time.Duration `flag:"rotate-interval"` SyncInterval time.Duration `flag:"sync-interval"` + + logf lg.AppLogFunc } func NewOptions() *Options { return &Options{ + LogPrefix: "[nsq_to_file] ", + LogLevel: "info", Channel: "nsq_to_file", MaxInFlight: 200, OutputDir: "/tmp", diff --git a/apps/nsq_to_file/topic_discoverer.go b/apps/nsq_to_file/topic_discoverer.go index f1deb6c78..fe0ffdd52 100644 --- a/apps/nsq_to_file/topic_discoverer.go +++ b/apps/nsq_to_file/topic_discoverer.go @@ -1,7 +1,6 @@ package main import ( - "log" "os" "regexp" "sync" @@ -10,6 +9,7 @@ import ( "github.com/nsqio/go-nsq" "github.com/nsqio/nsq/internal/clusterinfo" "github.com/nsqio/nsq/internal/http_api" + "github.com/nsqio/nsq/internal/lg" ) type TopicDiscoverer struct { @@ -41,13 +41,13 @@ func (t *TopicDiscoverer) updateTopics(topics []string) { } if !t.isTopicAllowed(topic) { - log.Printf("skipping topic %s (doesn't match pattern %s)", topic, t.opts.TopicPattern) + t.opts.logf(lg.WARN, "skipping topic %s (doesn't match pattern %s)", topic, t.opts.TopicPattern) continue } fl, err := NewFileLogger(t.opts, topic, t.cfg) if err != nil { - log.Printf("ERROR: couldn't create logger for new topic %s: %s", topic, err) + t.opts.logf(lg.ERROR, "couldn't create logger for new topic %s: %s", topic, err) continue } t.topics[topic] = fl @@ -72,7 +72,7 @@ forloop: case <-ticker: newTopics, err := t.ci.GetLookupdTopics(t.opts.NSQLookupdHTTPAddrs) if err != nil { - log.Printf("ERROR: could not retrieve topic list: %s", err) + t.opts.logf(lg.ERROR, "could not retrieve topic list: %s", err) continue } t.updateTopics(newTopics)