Skip to content

Commit

Permalink
nsq_to_file: add --log-level and --log-prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Jan 4, 2019
1 parent 0d4b61d commit 3cbdbe4
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 33 deletions.
65 changes: 37 additions & 28 deletions apps/nsq_to_file/file_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ import (
"errors"
"fmt"
"io"
"log"
"os"
"path"
"path/filepath"
"strings"
"time"

"github.com/nsqio/go-nsq"
"github.com/nsqio/nsq/internal/lg"
)

type FileLogger struct {
Expand Down Expand Up @@ -112,11 +112,13 @@ func (f *FileLogger) router() {
}
_, err := f.Write(m.Body)
if err != nil {
log.Fatalf("ERROR: writing message to disk - %s", err)
f.opts.logf(lg.FATAL, "writing message to disk: %s", err)
os.Exit(1)
}
_, err = f.Write([]byte("\n"))
if err != nil {
log.Fatalf("ERROR: writing newline to disk - %s", err)
f.opts.logf(lg.FATAL, "writing newline to disk: %s", err)
os.Exit(1)
}
output[pos] = m
pos++
Expand All @@ -127,10 +129,11 @@ func (f *FileLogger) router() {

if sync || f.consumer.IsStarved() {
if pos > 0 {
log.Printf("syncing %d records to disk", pos)
f.opts.logf(lg.INFO, "syncing %d records to disk", pos)
err := f.Sync()
if err != nil {
log.Fatalf("ERROR: failed syncing messages - %s", err)
f.opts.logf(lg.FATAL, "failed syncing messages: %s", err)
os.Exit(1)
}
for pos > 0 {
pos--
Expand Down Expand Up @@ -161,16 +164,19 @@ func (f *FileLogger) Close() {
if f.gzipWriter != nil {
err := f.gzipWriter.Close()
if err != nil {
log.Fatalf("ERROR: failed to close GZIP writer: %s", err)
f.opts.logf(lg.FATAL, "failed to close GZIP writer: %s", err)
os.Exit(1)
}
}
err := f.out.Sync()
if err != nil {
log.Fatalf("ERROR: failed to fsync output file: %s", err)
f.opts.logf(lg.FATAL, "failed to fsync output file: %s", err)
os.Exit(1)
}
err = f.out.Close()
if err != nil {
log.Fatalf("ERROR: failed to close output file: %s", err)
f.opts.logf(lg.FATAL, "failed to close output file: %s", err)
os.Exit(1)
}

// Move file from work dir to output dir if necessary, taking care not
Expand All @@ -180,13 +186,13 @@ func (f *FileLogger) Close() {
dst := filepath.Join(f.opts.OutputDir, strings.TrimPrefix(src, f.opts.WorkDir))

// Optimistic rename
log.Printf("INFO: moving finished file %s to %s", src, dst)
f.opts.logf(lg.INFO, "moving finished file %s to %s", src, dst)
err := exclusiveRename(src, dst)
if err == nil {
return
} else if !os.IsExist(err) {
log.Fatalf("ERROR: unable to move file from %s to %s: %s", src, dst, err)
return
f.opts.logf(lg.FATAL, "unable to move file from %s to %s: %s", src, dst, err)
os.Exit(1)
}

// Optimistic rename failed, so we need to generate a new
Expand All @@ -196,17 +202,17 @@ func (f *FileLogger) Close() {
dstTmpl := filepath.Join(dstDir, filenameTmpl)

for i := f.rev + 1; ; i++ {
log.Printf("INFO: destination file already exists: %s", dst)
f.opts.logf(lg.WARN, "destination file already exists: %s", dst)
dst := strings.Replace(dstTmpl, "<REV>", fmt.Sprintf("-%06d", i), -1)
err := exclusiveRename(src, dst)
if err != nil {
if os.IsExist(err) {
continue // next rev
}
log.Fatalf("ERROR: unable to rename file from %s to %s: %s", src, dst, err)
return
f.opts.logf(lg.FATAL, "unable to rename file from %s to %s: %s", src, dst, err)
os.Exit(1)
}
log.Printf("INFO: renamed finished file %s to %s to avoid overwrite", src, dst)
f.opts.logf(lg.INFO, "renamed finished file %s to %s to avoid overwrite", src, dst)
break
}
}
Expand Down Expand Up @@ -243,19 +249,19 @@ func (f *FileLogger) needsRotation() bool {

filename := f.currentFilename()
if filename != f.filename {
log.Printf("INFO: new filename %s, rotating...", filename)
f.opts.logf(lg.INFO, "new filename %s, rotating...", filename)
return true // rotate by filename
}

if f.opts.RotateInterval > 0 {
if s := time.Since(f.openTime); s > f.opts.RotateInterval {
log.Printf("INFO: %s since last open, rotating...", s)
f.opts.logf(lg.INFO, "%s since last open, rotating...", s)
return true // rotate by interval
}
}

if f.opts.RotateSize > 0 && f.filesize > f.opts.RotateSize {
log.Printf("INFO: %s currently %d bytes (> %d), rotating...",
f.opts.logf(lg.INFO, "%s currently %d bytes (> %d), rotating...",
f.out.Name(), f.filesize, f.opts.RotateSize)
return true // rotate by size
}
Expand All @@ -274,7 +280,7 @@ func (f *FileLogger) updateFile() {
f.openTime = time.Now()

fullPath := path.Join(f.opts.WorkDir, filename)
makeDirFromPath(fullPath)
makeDirFromPath(f.opts.logf, fullPath)

f.Close()

Expand All @@ -288,14 +294,15 @@ func (f *FileLogger) updateFile() {
// prevent conflicts on rename in the normal case
if f.opts.WorkDir != f.opts.OutputDir {
outputFileName := filepath.Join(f.opts.OutputDir, strings.TrimPrefix(absFilename, f.opts.WorkDir))
makeDirFromPath(outputFileName)
makeDirFromPath(f.opts.logf, outputFileName)

_, err := os.Stat(outputFileName)
if err == nil {
log.Printf("INFO: output file already exists: %s", outputFileName)
f.opts.logf(lg.WARN, "output file already exists: %s", outputFileName)
continue // next rev
} else if !os.IsNotExist(err) {
log.Fatalf("ERROR: unable to stat output file %s: %s", outputFileName, err)
f.opts.logf(lg.FATAL, "unable to stat output file %s: %s", outputFileName, err)
os.Exit(1)
}
}

Expand All @@ -308,17 +315,18 @@ func (f *FileLogger) updateFile() {
f.out, err = os.OpenFile(absFilename, openFlag, 0666)
if err != nil {
if os.IsExist(err) {
log.Printf("INFO: working file already exists: %s", absFilename)
f.opts.logf(lg.WARN, "working file already exists: %s", absFilename)
continue
}
log.Fatalf("ERROR: %s Unable to open %s", err, absFilename)
f.opts.logf(lg.FATAL, "unable to open %s: %s", absFilename, err)
os.Exit(1)
}

log.Printf("INFO: opening %s", absFilename)
f.opts.logf(lg.INFO, "opening %s", absFilename)

fi, err = f.out.Stat()
if err != nil {
log.Fatalf("ERROR: %s Unable to stat file %s", err, f.out.Name())
f.opts.logf(lg.FATAL, "unable to stat file %s: %s", f.out.Name(), err)
}
f.filesize = fi.Size()
if f.filesize == 0 {
Expand All @@ -340,12 +348,13 @@ func (f *FileLogger) updateFile() {
}
}

func makeDirFromPath(path string) {
func makeDirFromPath(logf lg.AppLogFunc, path string) {
dir, _ := filepath.Split(path)
if dir != "" {
err := os.MkdirAll(dir, 0770)
if err != nil {
log.Fatalf("ERROR: %s Unable to create %s", err, dir)
logf(lg.FATAL, "unable to create dir %s: %s", dir, err)
os.Exit(1)
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions apps/nsq_to_file/nsq_to_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/mreiferson/go-options"
"github.com/nsqio/go-nsq"
"github.com/nsqio/nsq/internal/app"
"github.com/nsqio/nsq/internal/lg"
"github.com/nsqio/nsq/internal/version"
)

Expand All @@ -31,6 +32,8 @@ func flagSet() *flag.FlagSet {
fs := flag.NewFlagSet("nsqd", flag.ExitOnError)

fs.Bool("version", false, "print version string")
fs.String("log-level", "info", "set log verbosity: debug, info, warn, error, or fatal")
fs.String("log-prefix", "[nsq_to_file] ", "log message prefix")

fs.String("channel", "nsq_to_file", "nsq channel")
fs.Int("max-in-flight", 200, "max number of messages to allow in flight")
Expand Down Expand Up @@ -72,6 +75,15 @@ func main() {
opts := NewOptions()
options.Resolve(opts, fs, nil)

logger := log.New(os.Stderr, opts.LogPrefix, log.Ldate|log.Ltime|log.Lmicroseconds)
logLevel, err := lg.ParseLogLevel(opts.LogLevel, false)
if err != nil {
log.Fatal("--log-level is invalid")
}
opts.logf = func(lvl lg.LogLevel, f string, args ...interface{}) {
lg.Logf(logger, logLevel, lvl, f, args...)
}

if fs.Lookup("version").Value.(flag.Getter).Get().(bool) {
fmt.Printf("nsq_to_file v%s\n", version.Binary)
return
Expand Down
12 changes: 11 additions & 1 deletion apps/nsq_to_file/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package main

import "time"
import (
"time"

"github.com/nsqio/nsq/internal/lg"
)

type Options struct {
Topics []string `flag:"topic"`
Expand All @@ -15,6 +19,8 @@ type Options struct {
HTTPClientConnectTimeout time.Duration `flag:"http-client-connect-timeout"`
HTTPClientRequestTimeout time.Duration `flag:"http-client-request-timeout"`

LogPrefix string `flag:"log-prefix"`
LogLevel string `flag:"log-level"`
OutputDir string `flag:"output-dir"`
WorkDir string `flag:"work-dir"`
DatetimeFormat string `flag:"datetime-format"`
Expand All @@ -26,10 +32,14 @@ type Options struct {
RotateSize int64 `flag:"rotate-size"`
RotateInterval time.Duration `flag:"rotate-interval"`
SyncInterval time.Duration `flag:"sync-interval"`

logf lg.AppLogFunc
}

func NewOptions() *Options {
return &Options{
LogPrefix: "[nsq_to_file] ",
LogLevel: "info",
Channel: "nsq_to_file",
MaxInFlight: 200,
OutputDir: "/tmp",
Expand Down
8 changes: 4 additions & 4 deletions apps/nsq_to_file/topic_discoverer.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"log"
"os"
"regexp"
"sync"
Expand All @@ -10,6 +9,7 @@ import (
"github.com/nsqio/go-nsq"
"github.com/nsqio/nsq/internal/clusterinfo"
"github.com/nsqio/nsq/internal/http_api"
"github.com/nsqio/nsq/internal/lg"
)

type TopicDiscoverer struct {
Expand Down Expand Up @@ -41,13 +41,13 @@ func (t *TopicDiscoverer) updateTopics(topics []string) {
}

if !t.isTopicAllowed(topic) {
log.Printf("skipping topic %s (doesn't match pattern %s)", topic, t.opts.TopicPattern)
t.opts.logf(lg.WARN, "skipping topic %s (doesn't match pattern %s)", topic, t.opts.TopicPattern)
continue
}

fl, err := NewFileLogger(t.opts, topic, t.cfg)
if err != nil {
log.Printf("ERROR: couldn't create logger for new topic %s: %s", topic, err)
t.opts.logf(lg.ERROR, "couldn't create logger for new topic %s: %s", topic, err)
continue
}
t.topics[topic] = fl
Expand All @@ -72,7 +72,7 @@ forloop:
case <-ticker:
newTopics, err := t.ci.GetLookupdTopics(t.opts.NSQLookupdHTTPAddrs)
if err != nil {
log.Printf("ERROR: could not retrieve topic list: %s", err)
t.opts.logf(lg.ERROR, "could not retrieve topic list: %s", err)
continue
}
t.updateTopics(newTopics)
Expand Down

0 comments on commit 3cbdbe4

Please sign in to comment.