Skip to content

Commit

Permalink
nsq_to_file: cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Aug 28, 2017
1 parent 18fc338 commit c12a4f3
Show file tree
Hide file tree
Showing 4 changed files with 434 additions and 440 deletions.
39 changes: 39 additions & 0 deletions apps/nsq_to_file/consumer_file_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package main

import (
"github.com/nsqio/go-nsq"
)

type ConsumerFileLogger struct {
F *FileLogger
C *nsq.Consumer
}

func newConsumerFileLogger(topic string, cfg *nsq.Config) (*ConsumerFileLogger, error) {
f, err := NewFileLogger(*gzipEnabled, *gzipLevel, *filenameFormat, topic)
if err != nil {
return nil, err
}

c, err := nsq.NewConsumer(topic, *channel, cfg)
if err != nil {
return nil, err
}

c.AddHandler(f)

err = c.ConnectToNSQDs(nsqdTCPAddrs)
if err != nil {
return nil, err
}

err = c.ConnectToNSQLookupds(lookupdHTTPAddrs)
if err != nil {
return nil, err
}

return &ConsumerFileLogger{
C: c,
F: f,
}, nil
}
280 changes: 280 additions & 0 deletions apps/nsq_to_file/file_logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
package main

import (
"compress/gzip"
"errors"
"fmt"
"io"
"log"
"os"
"path"
"path/filepath"
"strings"
"time"

"github.com/nsqio/go-nsq"
)

type FileLogger struct {
out *os.File
writer io.Writer
gzipWriter *gzip.Writer
logChan chan *nsq.Message
compressionLevel int
gzipEnabled bool
filenameFormat string

termChan chan bool
hupChan chan bool

// for rotation
lastFilename string
lastOpenTime time.Time
filesize int64
rev uint
}

func NewFileLogger(gzipEnabled bool, compressionLevel int, filenameFormat, topic string) (*FileLogger, error) {
if gzipEnabled || *rotateSize > 0 || *rotateInterval > 0 {
if strings.Index(filenameFormat, "<REV>") == -1 {
return nil, errors.New("missing <REV> in --filename-format when gzip or rotation enabled")
}
} else {
// remove <REV> as we don't need it
filenameFormat = strings.Replace(filenameFormat, "<REV>", "", -1)
}

hostname, err := os.Hostname()
if err != nil {
return nil, err
}
shortHostname := strings.Split(hostname, ".")[0]
identifier := shortHostname
if len(*hostIdentifier) != 0 {
identifier = strings.Replace(*hostIdentifier, "<SHORT_HOST>", shortHostname, -1)
identifier = strings.Replace(identifier, "<HOSTNAME>", hostname, -1)
}
filenameFormat = strings.Replace(filenameFormat, "<TOPIC>", topic, -1)
filenameFormat = strings.Replace(filenameFormat, "<HOST>", identifier, -1)
filenameFormat = strings.Replace(filenameFormat, "<PID>", fmt.Sprintf("%d", os.Getpid()), -1)
if gzipEnabled && !strings.HasSuffix(filenameFormat, ".gz") {
filenameFormat = filenameFormat + ".gz"
}

f := &FileLogger{
logChan: make(chan *nsq.Message, 1),
compressionLevel: compressionLevel,
filenameFormat: filenameFormat,
gzipEnabled: gzipEnabled,
termChan: make(chan bool),
hupChan: make(chan bool),
}
return f, nil
}

func (f *FileLogger) HandleMessage(m *nsq.Message) error {
m.DisableAutoResponse()
f.logChan <- m
return nil
}

func (f *FileLogger) router(r *nsq.Consumer) {
pos := 0
output := make([]*nsq.Message, *maxInFlight)
sync := false
ticker := time.NewTicker(time.Duration(30) * time.Second)
closing := false
closeFile := false
exit := false

for {
select {
case <-r.StopChan:
sync = true
closeFile = true
exit = true
case <-f.termChan:
ticker.Stop()
r.Stop()
sync = true
closing = true
case <-f.hupChan:
sync = true
closeFile = true
case <-ticker.C:
if f.needsFileRotate() {
if *skipEmptyFiles {
closeFile = true
} else {
f.updateFile()
}
}
sync = true
case m := <-f.logChan:
if f.needsFileRotate() {
f.updateFile()
sync = true
}
_, err := f.writer.Write(m.Body)
if err != nil {
log.Fatalf("ERROR: writing message to disk - %s", err)
}
_, err = f.writer.Write([]byte("\n"))
if err != nil {
log.Fatalf("ERROR: writing newline to disk - %s", err)
}
output[pos] = m
pos++
if pos == cap(output) {
sync = true
}
}

if closing || sync || r.IsStarved() {
if pos > 0 {
log.Printf("syncing %d records to disk", pos)
err := f.Sync()
if err != nil {
log.Fatalf("ERROR: failed syncing messages - %s", err)
}
for pos > 0 {
pos--
m := output[pos]
m.Finish()
output[pos] = nil
}
}
sync = false
}

if closeFile {
f.Close()
closeFile = false
}
if exit {
break
}
}
}

func (f *FileLogger) Close() {
if f.out != nil {
f.out.Sync()
if f.gzipWriter != nil {
f.gzipWriter.Close()
}
f.out.Close()
f.out = nil
}
}

func (f *FileLogger) Write(p []byte) (n int, err error) {
f.filesize += int64(len(p))
return f.out.Write(p)
}

func (f *FileLogger) Sync() error {
var err error
if f.gzipWriter != nil {
f.gzipWriter.Close()
err = f.out.Sync()
f.gzipWriter, _ = gzip.NewWriterLevel(f, f.compressionLevel)
f.writer = f.gzipWriter
} else {
err = f.out.Sync()
}
return err
}

func (f *FileLogger) calculateCurrentFilename() string {
t := time.Now()
datetime := strftime(*datetimeFormat, t)
return strings.Replace(f.filenameFormat, "<DATETIME>", datetime, -1)
}

func (f *FileLogger) needsFileRotate() bool {
if f.out == nil {
return true
}

filename := f.calculateCurrentFilename()
if filename != f.lastFilename {
log.Printf("INFO: new filename %s, need rotate", filename)
return true // rotate by filename
}

if *rotateInterval > 0 {
if s := time.Since(f.lastOpenTime); s > *rotateInterval {
log.Printf("INFO: %s since last open, need rotate", s)
return true // rotate by interval
}
}

if *rotateSize > 0 && f.filesize > *rotateSize {
log.Printf("INFO: %s current %d bytes, need rotate", f.out.Name(), f.filesize)
return true // rotate by size
}
return false
}

func (f *FileLogger) updateFile() {
filename := f.calculateCurrentFilename()
if filename != f.lastFilename {
f.rev = 0 // reset revsion to 0 if it is a new filename
} else {
f.rev++
}
f.lastFilename = filename
f.lastOpenTime = time.Now()

fullPath := path.Join(*outputDir, filename)
dir, _ := filepath.Split(fullPath)
if dir != "" {
err := os.MkdirAll(dir, 0770)
if err != nil {
log.Fatalf("ERROR: %s Unable to create %s", err, dir)
}
}

f.Close()

var err error
var fi os.FileInfo
for ; ; f.rev++ {
absFilename := strings.Replace(fullPath, "<REV>", fmt.Sprintf("-%06d", f.rev), -1)
openFlag := os.O_WRONLY | os.O_CREATE
if f.gzipEnabled {
openFlag |= os.O_EXCL
} else {
openFlag |= os.O_APPEND
}
f.out, err = os.OpenFile(absFilename, openFlag, 0666)
if err != nil {
if os.IsExist(err) {
log.Printf("INFO: file already exists: %s", absFilename)
continue
}
log.Fatalf("ERROR: %s Unable to open %s", err, absFilename)
}
log.Printf("INFO: opening %s", absFilename)
fi, err = f.out.Stat()
if err != nil {
log.Fatalf("ERROR: %s Unable to stat file %s", err, f.out.Name())
}
f.filesize = fi.Size()
if f.filesize == 0 {
break // ok, new file
}
if f.needsFileRotate() {
continue // next rev
}
break // ok, don't need rotate
}

if f.gzipEnabled {
f.gzipWriter, _ = gzip.NewWriterLevel(f, f.compressionLevel)
f.writer = f.gzipWriter
} else {
f.writer = f
}
}
Loading

0 comments on commit c12a4f3

Please sign in to comment.