From c12a4f31e16f2ae46859cbe954b22718b2b120e8 Mon Sep 17 00:00:00 2001 From: Matt Reiferson Date: Mon, 28 Aug 2017 08:44:55 -0700 Subject: [PATCH] nsq_to_file: cleanup --- apps/nsq_to_file/consumer_file_logger.go | 39 ++ apps/nsq_to_file/file_logger.go | 280 ++++++++++++++ apps/nsq_to_file/nsq_to_file.go | 456 +---------------------- apps/nsq_to_file/topic_discoverer.go | 99 +++++ 4 files changed, 434 insertions(+), 440 deletions(-) create mode 100644 apps/nsq_to_file/consumer_file_logger.go create mode 100644 apps/nsq_to_file/file_logger.go create mode 100644 apps/nsq_to_file/topic_discoverer.go diff --git a/apps/nsq_to_file/consumer_file_logger.go b/apps/nsq_to_file/consumer_file_logger.go new file mode 100644 index 000000000..f7a043425 --- /dev/null +++ b/apps/nsq_to_file/consumer_file_logger.go @@ -0,0 +1,39 @@ +package main + +import ( + "github.com/nsqio/go-nsq" +) + +type ConsumerFileLogger struct { + F *FileLogger + C *nsq.Consumer +} + +func newConsumerFileLogger(topic string, cfg *nsq.Config) (*ConsumerFileLogger, error) { + f, err := NewFileLogger(*gzipEnabled, *gzipLevel, *filenameFormat, topic) + if err != nil { + return nil, err + } + + c, err := nsq.NewConsumer(topic, *channel, cfg) + if err != nil { + return nil, err + } + + c.AddHandler(f) + + err = c.ConnectToNSQDs(nsqdTCPAddrs) + if err != nil { + return nil, err + } + + err = c.ConnectToNSQLookupds(lookupdHTTPAddrs) + if err != nil { + return nil, err + } + + return &ConsumerFileLogger{ + C: c, + F: f, + }, nil +} diff --git a/apps/nsq_to_file/file_logger.go b/apps/nsq_to_file/file_logger.go new file mode 100644 index 000000000..6c13c60c1 --- /dev/null +++ b/apps/nsq_to_file/file_logger.go @@ -0,0 +1,280 @@ +package main + +import ( + "compress/gzip" + "errors" + "fmt" + "io" + "log" + "os" + "path" + "path/filepath" + "strings" + "time" + + "github.com/nsqio/go-nsq" +) + +type FileLogger struct { + out *os.File + writer io.Writer + gzipWriter *gzip.Writer + logChan chan *nsq.Message + compressionLevel int + gzipEnabled bool + filenameFormat string + + termChan chan bool + hupChan chan bool + + // for rotation + lastFilename string + lastOpenTime time.Time + filesize int64 + rev uint +} + +func NewFileLogger(gzipEnabled bool, compressionLevel int, filenameFormat, topic string) (*FileLogger, error) { + if gzipEnabled || *rotateSize > 0 || *rotateInterval > 0 { + if strings.Index(filenameFormat, "") == -1 { + return nil, errors.New("missing in --filename-format when gzip or rotation enabled") + } + } else { + // remove as we don't need it + filenameFormat = strings.Replace(filenameFormat, "", "", -1) + } + + hostname, err := os.Hostname() + if err != nil { + return nil, err + } + shortHostname := strings.Split(hostname, ".")[0] + identifier := shortHostname + if len(*hostIdentifier) != 0 { + identifier = strings.Replace(*hostIdentifier, "", shortHostname, -1) + identifier = strings.Replace(identifier, "", hostname, -1) + } + filenameFormat = strings.Replace(filenameFormat, "", topic, -1) + filenameFormat = strings.Replace(filenameFormat, "", identifier, -1) + filenameFormat = strings.Replace(filenameFormat, "", fmt.Sprintf("%d", os.Getpid()), -1) + if gzipEnabled && !strings.HasSuffix(filenameFormat, ".gz") { + filenameFormat = filenameFormat + ".gz" + } + + f := &FileLogger{ + logChan: make(chan *nsq.Message, 1), + compressionLevel: compressionLevel, + filenameFormat: filenameFormat, + gzipEnabled: gzipEnabled, + termChan: make(chan bool), + hupChan: make(chan bool), + } + return f, nil +} + +func (f *FileLogger) HandleMessage(m *nsq.Message) error { + m.DisableAutoResponse() + f.logChan <- m + return nil +} + +func (f *FileLogger) router(r *nsq.Consumer) { + pos := 0 + output := make([]*nsq.Message, *maxInFlight) + sync := false + ticker := time.NewTicker(time.Duration(30) * time.Second) + closing := false + closeFile := false + exit := false + + for { + select { + case <-r.StopChan: + sync = true + closeFile = true + exit = true + case <-f.termChan: + ticker.Stop() + r.Stop() + sync = true + closing = true + case <-f.hupChan: + sync = true + closeFile = true + case <-ticker.C: + if f.needsFileRotate() { + if *skipEmptyFiles { + closeFile = true + } else { + f.updateFile() + } + } + sync = true + case m := <-f.logChan: + if f.needsFileRotate() { + f.updateFile() + sync = true + } + _, err := f.writer.Write(m.Body) + if err != nil { + log.Fatalf("ERROR: writing message to disk - %s", err) + } + _, err = f.writer.Write([]byte("\n")) + if err != nil { + log.Fatalf("ERROR: writing newline to disk - %s", err) + } + output[pos] = m + pos++ + if pos == cap(output) { + sync = true + } + } + + if closing || sync || r.IsStarved() { + if pos > 0 { + log.Printf("syncing %d records to disk", pos) + err := f.Sync() + if err != nil { + log.Fatalf("ERROR: failed syncing messages - %s", err) + } + for pos > 0 { + pos-- + m := output[pos] + m.Finish() + output[pos] = nil + } + } + sync = false + } + + if closeFile { + f.Close() + closeFile = false + } + if exit { + break + } + } +} + +func (f *FileLogger) Close() { + if f.out != nil { + f.out.Sync() + if f.gzipWriter != nil { + f.gzipWriter.Close() + } + f.out.Close() + f.out = nil + } +} + +func (f *FileLogger) Write(p []byte) (n int, err error) { + f.filesize += int64(len(p)) + return f.out.Write(p) +} + +func (f *FileLogger) Sync() error { + var err error + if f.gzipWriter != nil { + f.gzipWriter.Close() + err = f.out.Sync() + f.gzipWriter, _ = gzip.NewWriterLevel(f, f.compressionLevel) + f.writer = f.gzipWriter + } else { + err = f.out.Sync() + } + return err +} + +func (f *FileLogger) calculateCurrentFilename() string { + t := time.Now() + datetime := strftime(*datetimeFormat, t) + return strings.Replace(f.filenameFormat, "", datetime, -1) +} + +func (f *FileLogger) needsFileRotate() bool { + if f.out == nil { + return true + } + + filename := f.calculateCurrentFilename() + if filename != f.lastFilename { + log.Printf("INFO: new filename %s, need rotate", filename) + return true // rotate by filename + } + + if *rotateInterval > 0 { + if s := time.Since(f.lastOpenTime); s > *rotateInterval { + log.Printf("INFO: %s since last open, need rotate", s) + return true // rotate by interval + } + } + + if *rotateSize > 0 && f.filesize > *rotateSize { + log.Printf("INFO: %s current %d bytes, need rotate", f.out.Name(), f.filesize) + return true // rotate by size + } + return false +} + +func (f *FileLogger) updateFile() { + filename := f.calculateCurrentFilename() + if filename != f.lastFilename { + f.rev = 0 // reset revsion to 0 if it is a new filename + } else { + f.rev++ + } + f.lastFilename = filename + f.lastOpenTime = time.Now() + + fullPath := path.Join(*outputDir, filename) + dir, _ := filepath.Split(fullPath) + if dir != "" { + err := os.MkdirAll(dir, 0770) + if err != nil { + log.Fatalf("ERROR: %s Unable to create %s", err, dir) + } + } + + f.Close() + + var err error + var fi os.FileInfo + for ; ; f.rev++ { + absFilename := strings.Replace(fullPath, "", fmt.Sprintf("-%06d", f.rev), -1) + openFlag := os.O_WRONLY | os.O_CREATE + if f.gzipEnabled { + openFlag |= os.O_EXCL + } else { + openFlag |= os.O_APPEND + } + f.out, err = os.OpenFile(absFilename, openFlag, 0666) + if err != nil { + if os.IsExist(err) { + log.Printf("INFO: file already exists: %s", absFilename) + continue + } + log.Fatalf("ERROR: %s Unable to open %s", err, absFilename) + } + log.Printf("INFO: opening %s", absFilename) + fi, err = f.out.Stat() + if err != nil { + log.Fatalf("ERROR: %s Unable to stat file %s", err, f.out.Name()) + } + f.filesize = fi.Size() + if f.filesize == 0 { + break // ok, new file + } + if f.needsFileRotate() { + continue // next rev + } + break // ok, don't need rotate + } + + if f.gzipEnabled { + f.gzipWriter, _ = gzip.NewWriterLevel(f, f.compressionLevel) + f.writer = f.gzipWriter + } else { + f.writer = f + } +} diff --git a/apps/nsq_to_file/nsq_to_file.go b/apps/nsq_to_file/nsq_to_file.go index cc1a1c3e6..7ba473699 100644 --- a/apps/nsq_to_file/nsq_to_file.go +++ b/apps/nsq_to_file/nsq_to_file.go @@ -3,26 +3,16 @@ package main import ( - "compress/gzip" - "errors" "flag" "fmt" - "io" "log" "os" "os/signal" - "path" - "path/filepath" - "regexp" - "strings" - "sync" "syscall" "time" "github.com/nsqio/go-nsq" "github.com/nsqio/nsq/internal/app" - "github.com/nsqio/nsq/internal/clusterinfo" - "github.com/nsqio/nsq/internal/http_api" "github.com/nsqio/nsq/internal/version" ) @@ -59,297 +49,6 @@ func init() { flag.Var(&topics, "topic", "nsq topic (may be given multiple times)") } -type FileLogger struct { - out *os.File - writer io.Writer - gzipWriter *gzip.Writer - logChan chan *nsq.Message - compressionLevel int - gzipEnabled bool - filenameFormat string - - ExitChan chan int - termChan chan bool - hupChan chan bool - - // for rotation - lastFilename string - lastOpenTime time.Time - filesize int64 - rev uint -} - -type ConsumerFileLogger struct { - F *FileLogger - C *nsq.Consumer -} - -type TopicDiscoverer struct { - clusterInfo *clusterinfo.ClusterInfo - topics map[string]*ConsumerFileLogger - termChan chan os.Signal - hupChan chan os.Signal - wg sync.WaitGroup - cfg *nsq.Config -} - -func newTopicDiscoverer(cfg *nsq.Config, connectTimeout time.Duration, requestTimeout time.Duration) *TopicDiscoverer { - return &TopicDiscoverer{ - clusterInfo: clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout)), - topics: make(map[string]*ConsumerFileLogger), - termChan: make(chan os.Signal), - hupChan: make(chan os.Signal), - cfg: cfg, - } -} - -func (f *FileLogger) HandleMessage(m *nsq.Message) error { - m.DisableAutoResponse() - f.logChan <- m - return nil -} - -func (f *FileLogger) router(r *nsq.Consumer) { - pos := 0 - output := make([]*nsq.Message, *maxInFlight) - sync := false - ticker := time.NewTicker(time.Duration(30) * time.Second) - closing := false - closeFile := false - exit := false - - for { - select { - case <-r.StopChan: - sync = true - closeFile = true - exit = true - case <-f.termChan: - ticker.Stop() - r.Stop() - sync = true - closing = true - case <-f.hupChan: - sync = true - closeFile = true - case <-ticker.C: - if f.needsFileRotate() { - if *skipEmptyFiles { - closeFile = true - } else { - f.updateFile() - } - } - sync = true - case m := <-f.logChan: - if f.needsFileRotate() { - f.updateFile() - sync = true - } - _, err := f.writer.Write(m.Body) - if err != nil { - log.Fatalf("ERROR: writing message to disk - %s", err) - } - _, err = f.writer.Write([]byte("\n")) - if err != nil { - log.Fatalf("ERROR: writing newline to disk - %s", err) - } - output[pos] = m - pos++ - if pos == cap(output) { - sync = true - } - } - - if closing || sync || r.IsStarved() { - if pos > 0 { - log.Printf("syncing %d records to disk", pos) - err := f.Sync() - if err != nil { - log.Fatalf("ERROR: failed syncing messages - %s", err) - } - for pos > 0 { - pos-- - m := output[pos] - m.Finish() - output[pos] = nil - } - } - sync = false - } - - if closeFile { - f.Close() - closeFile = false - } - if exit { - close(f.ExitChan) - break - } - } -} - -func (f *FileLogger) Close() { - if f.out != nil { - f.out.Sync() - if f.gzipWriter != nil { - f.gzipWriter.Close() - } - f.out.Close() - f.out = nil - } -} - -func (f *FileLogger) Write(p []byte) (n int, err error) { - f.filesize += int64(len(p)) - return f.out.Write(p) -} - -func (f *FileLogger) Sync() error { - var err error - if f.gzipWriter != nil { - f.gzipWriter.Close() - err = f.out.Sync() - f.gzipWriter, _ = gzip.NewWriterLevel(f, f.compressionLevel) - f.writer = f.gzipWriter - } else { - err = f.out.Sync() - } - return err -} - -func (f *FileLogger) calculateCurrentFilename() string { - t := time.Now() - datetime := strftime(*datetimeFormat, t) - return strings.Replace(f.filenameFormat, "", datetime, -1) -} - -func (f *FileLogger) needsFileRotate() bool { - if f.out == nil { - return true - } - - filename := f.calculateCurrentFilename() - if filename != f.lastFilename { - log.Printf("INFO: new filename %s, need rotate", filename) - return true // rotate by filename - } - - if *rotateInterval > 0 { - if s := time.Since(f.lastOpenTime); s > *rotateInterval { - log.Printf("INFO: %s since last open, need rotate", s) - return true // rotate by interval - } - } - - if *rotateSize > 0 && f.filesize > *rotateSize { - log.Printf("INFO: %s current %d bytes, need rotate", f.out.Name(), f.filesize) - return true // rotate by size - } - return false -} - -func (f *FileLogger) updateFile() { - filename := f.calculateCurrentFilename() - if filename != f.lastFilename { - f.rev = 0 // reset revsion to 0 if it is a new filename - } else { - f.rev++ - } - f.lastFilename = filename - f.lastOpenTime = time.Now() - - fullPath := path.Join(*outputDir, filename) - dir, _ := filepath.Split(fullPath) - if dir != "" { - err := os.MkdirAll(dir, 0770) - if err != nil { - log.Fatalf("ERROR: %s Unable to create %s", err, dir) - } - } - - f.Close() - - var err error - var fi os.FileInfo - for ; ; f.rev++ { - absFilename := strings.Replace(fullPath, "", fmt.Sprintf("-%06d", f.rev), -1) - openFlag := os.O_WRONLY | os.O_CREATE - if f.gzipEnabled { - openFlag |= os.O_EXCL - } else { - openFlag |= os.O_APPEND - } - f.out, err = os.OpenFile(absFilename, openFlag, 0666) - if err != nil { - if os.IsExist(err) { - log.Printf("INFO: file already exists: %s", absFilename) - continue - } - log.Fatalf("ERROR: %s Unable to open %s", err, absFilename) - } - log.Printf("INFO: opening %s", absFilename) - fi, err = f.out.Stat() - if err != nil { - log.Fatalf("ERROR: %s Unable to stat file %s", err, f.out.Name()) - } - f.filesize = fi.Size() - if f.filesize == 0 { - break // ok, new file - } - if f.needsFileRotate() { - continue // next rev - } - break // ok, don't need rotate - } - - if f.gzipEnabled { - f.gzipWriter, _ = gzip.NewWriterLevel(f, f.compressionLevel) - f.writer = f.gzipWriter - } else { - f.writer = f - } -} - -func NewFileLogger(gzipEnabled bool, compressionLevel int, filenameFormat, topic string) (*FileLogger, error) { - if gzipEnabled || *rotateSize > 0 || *rotateInterval > 0 { - if strings.Index(filenameFormat, "") == -1 { - return nil, errors.New("missing in --filename-format when gzip or rotation enabled") - } - } else { - // remove as we don't need it - filenameFormat = strings.Replace(filenameFormat, "", "", -1) - } - - hostname, err := os.Hostname() - if err != nil { - return nil, err - } - shortHostname := strings.Split(hostname, ".")[0] - identifier := shortHostname - if len(*hostIdentifier) != 0 { - identifier = strings.Replace(*hostIdentifier, "", shortHostname, -1) - identifier = strings.Replace(identifier, "", hostname, -1) - } - filenameFormat = strings.Replace(filenameFormat, "", topic, -1) - filenameFormat = strings.Replace(filenameFormat, "", identifier, -1) - filenameFormat = strings.Replace(filenameFormat, "", fmt.Sprintf("%d", os.Getpid()), -1) - if gzipEnabled && !strings.HasSuffix(filenameFormat, ".gz") { - filenameFormat = filenameFormat + ".gz" - } - - f := &FileLogger{ - logChan: make(chan *nsq.Message, 1), - compressionLevel: compressionLevel, - filenameFormat: filenameFormat, - gzipEnabled: gzipEnabled, - ExitChan: make(chan int), - termChan: make(chan bool), - hupChan: make(chan bool), - } - return f, nil -} - func hasArg(s string) bool { argExist := false flag.Visit(func(f *flag.Flag) { @@ -360,106 +59,6 @@ func hasArg(s string) bool { return argExist } -func newConsumerFileLogger(topic string, cfg *nsq.Config) (*ConsumerFileLogger, error) { - f, err := NewFileLogger(*gzipEnabled, *gzipLevel, *filenameFormat, topic) - if err != nil { - return nil, err - } - - consumer, err := nsq.NewConsumer(topic, *channel, cfg) - if err != nil { - return nil, err - } - - consumer.AddHandler(f) - - err = consumer.ConnectToNSQDs(nsqdTCPAddrs) - if err != nil { - log.Fatal(err) - } - - err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs) - if err != nil { - log.Fatal(err) - } - - return &ConsumerFileLogger{ - C: consumer, - F: f, - }, nil -} - -func (t *TopicDiscoverer) startTopicRouter(logger *ConsumerFileLogger) { - t.wg.Add(1) - defer t.wg.Done() - go logger.F.router(logger.C) - <-logger.F.ExitChan -} - -func (t *TopicDiscoverer) allowTopicName(pattern string, name string) bool { - if pattern == "" { - return true - } - match, err := regexp.MatchString(pattern, name) - if err != nil { - return false - } - - return match -} - -func (t *TopicDiscoverer) syncTopics(addrs []string, pattern string) { - newTopics, err := t.clusterInfo.GetLookupdTopics(addrs) - if err != nil { - log.Printf("ERROR: could not retrieve topic list: %s", err) - } - for _, topic := range newTopics { - if _, ok := t.topics[topic]; !ok { - if !t.allowTopicName(pattern, topic) { - log.Println("Skipping topic ", topic, "as it didn't match required pattern:", pattern) - continue - } - logger, err := newConsumerFileLogger(topic, t.cfg) - if err != nil { - log.Printf("ERROR: couldn't create logger for new topic %s: %s", topic, err) - continue - } - t.topics[topic] = logger - go t.startTopicRouter(logger) - } - } -} - -func (t *TopicDiscoverer) stop() { - for _, topic := range t.topics { - topic.F.termChan <- true - } -} - -func (t *TopicDiscoverer) hup() { - for _, topic := range t.topics { - topic.F.hupChan <- true - } -} - -func (t *TopicDiscoverer) watch(addrs []string, sync bool, pattern string) { - ticker := time.Tick(*topicPollRate) - for { - select { - case <-ticker: - if sync { - t.syncTopics(addrs, pattern) - } - case <-t.termChan: - t.stop() - t.wg.Wait() - return - case <-t.hupChan: - t.hup() - } - } -} - func main() { cfg := nsq.NewConfig() @@ -475,18 +74,14 @@ func main() { log.Fatal("--channel is required") } - connectTimeout := *httpConnectTimeout - if int64(connectTimeout) <= 0 { + if *httpConnectTimeout <= 0 { log.Fatal("--http-client-connect-timeout should be positive") } - requestTimeout := *httpRequestTimeout - if int64(requestTimeout) <= 0 { + if *httpRequestTimeout <= 0 { log.Fatal("--http-client-request-timeout should be positive") } - var topicsFromNSQLookupd bool - if len(nsqdTCPAddrs) == 0 && len(lookupdHTTPAddrs) == 0 { log.Fatal("--nsqd-tcp-address or --lookupd-http-address required.") } @@ -498,42 +93,23 @@ func main() { log.Fatalf("invalid --gzip-level value (%d), should be 1-9", *gzipLevel) } - cfg.UserAgent = fmt.Sprintf("nsq_to_file/%s go-nsq/%s", version.Binary, nsq.VERSION) - cfg.MaxInFlight = *maxInFlight - - discoverer := newTopicDiscoverer(cfg, connectTimeout, requestTimeout) - - signal.Notify(discoverer.hupChan, syscall.SIGHUP) - signal.Notify(discoverer.termChan, syscall.SIGINT, syscall.SIGTERM) + if len(topics) == 0 && len(*topicPattern) == 0 { + log.Fatal("--topic or --topic-pattern required") + } - if len(topics) < 1 { - if len(*topicPattern) < 1 { - log.Fatal("use --topic to list at least one topic to subscribe to or specify --topic-pattern to subscribe to matching topics (use \"--topic-pattern .*\" to subscribe to all topics)") - } - if len(lookupdHTTPAddrs) < 1 { - log.Fatal("use --topic to list at least one topic to subscribe to or specify at least one --lookupd-http-address to subscribe to all its topics") - } - topicsFromNSQLookupd = true - var err error - topics, err = discoverer.clusterInfo.GetLookupdTopics(lookupdHTTPAddrs) - if err != nil { - log.Fatalf("ERROR: could not retrieve topic list: %s", err) - } + if len(topics) == 0 && len(lookupdHTTPAddrs) == 0 { + log.Fatal("--lookupd-http-address must be specified when no --topic specified") } - for _, topic := range topics { - if !discoverer.allowTopicName(*topicPattern, topic) { - log.Println("Skipping topic", topic, "as it didn't match required pattern:", *topicPattern) - continue - } + cfg.UserAgent = fmt.Sprintf("nsq_to_file/%s go-nsq/%s", version.Binary, nsq.VERSION) + cfg.MaxInFlight = *maxInFlight - logger, err := newConsumerFileLogger(topic, cfg) - if err != nil { - log.Fatalf("ERROR: couldn't create logger for topic %s: %s", topic, err) - } - discoverer.topics[topic] = logger - go discoverer.startTopicRouter(logger) - } + hupChan := make(chan os.Signal) + termChan := make(chan os.Signal) + signal.Notify(hupChan, syscall.SIGHUP) + signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) - discoverer.watch(lookupdHTTPAddrs, topicsFromNSQLookupd, *topicPattern) + discoverer := newTopicDiscoverer(cfg, hupChan, termChan, *httpConnectTimeout, *httpRequestTimeout) + discoverer.updateTopics(topics, *topicPattern) + discoverer.poller(lookupdHTTPAddrs, len(topics) == 0, *topicPattern) } diff --git a/apps/nsq_to_file/topic_discoverer.go b/apps/nsq_to_file/topic_discoverer.go new file mode 100644 index 000000000..f7c86b3d1 --- /dev/null +++ b/apps/nsq_to_file/topic_discoverer.go @@ -0,0 +1,99 @@ +package main + +import ( + "log" + "os" + "regexp" + "sync" + "time" + + "github.com/nsqio/go-nsq" + "github.com/nsqio/nsq/internal/clusterinfo" + "github.com/nsqio/nsq/internal/http_api" +) + +type TopicDiscoverer struct { + ci *clusterinfo.ClusterInfo + topics map[string]*ConsumerFileLogger + hupChan chan os.Signal + termChan chan os.Signal + wg sync.WaitGroup + cfg *nsq.Config +} + +func newTopicDiscoverer(cfg *nsq.Config, + hupChan chan os.Signal, termChan chan os.Signal, + connectTimeout time.Duration, requestTimeout time.Duration) *TopicDiscoverer { + return &TopicDiscoverer{ + ci: clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout)), + topics: make(map[string]*ConsumerFileLogger), + hupChan: hupChan, + termChan: termChan, + cfg: cfg, + } +} + +func (t *TopicDiscoverer) updateTopics(topics []string, pattern string) { + for _, topic := range topics { + if _, ok := t.topics[topic]; ok { + continue + } + + if !allowTopicName(pattern, topic) { + log.Printf("skipping topic %s (doesn't match pattern %s)", topic, pattern) + continue + } + + cfl, err := newConsumerFileLogger(topic, t.cfg) + if err != nil { + log.Printf("ERROR: couldn't create logger for new topic %s: %s", topic, err) + continue + } + t.topics[topic] = cfl + + t.wg.Add(1) + go func(cfl *ConsumerFileLogger) { + cfl.F.router(cfl.C) + t.wg.Done() + }(cfl) + } +} + +func (t *TopicDiscoverer) poller(addrs []string, sync bool, pattern string) { + var ticker <-chan time.Time + if sync { + ticker = time.Tick(*topicPollRate) + } + for { + select { + case <-ticker: + newTopics, err := t.ci.GetLookupdTopics(addrs) + if err != nil { + log.Printf("ERROR: could not retrieve topic list: %s", err) + continue + } + t.updateTopics(newTopics, pattern) + case <-t.termChan: + for _, cfl := range t.topics { + close(cfl.F.termChan) + } + break + case <-t.hupChan: + for _, cfl := range t.topics { + cfl.F.hupChan <- true + } + } + } + t.wg.Wait() +} + +func allowTopicName(pattern string, name string) bool { + if pattern == "" { + return true + } + match, err := regexp.MatchString(pattern, name) + if err != nil { + return false + } + return match +}