diff --git a/apps/nsq_to_file/nsq_to_file.go b/apps/nsq_to_file/nsq_to_file.go index 1ac005c6a..cc1a1c3e6 100644 --- a/apps/nsq_to_file/nsq_to_file.go +++ b/apps/nsq_to_file/nsq_to_file.go @@ -85,6 +85,7 @@ type ConsumerFileLogger struct { } type TopicDiscoverer struct { + clusterInfo *clusterinfo.ClusterInfo topics map[string]*ConsumerFileLogger termChan chan os.Signal hupChan chan os.Signal @@ -92,8 +93,9 @@ type TopicDiscoverer struct { cfg *nsq.Config } -func newTopicDiscoverer(cfg *nsq.Config) *TopicDiscoverer { +func newTopicDiscoverer(cfg *nsq.Config, connectTimeout time.Duration, requestTimeout time.Duration) *TopicDiscoverer { return &TopicDiscoverer{ + clusterInfo: clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout)), topics: make(map[string]*ConsumerFileLogger), termChan: make(chan os.Signal), hupChan: make(chan os.Signal), @@ -406,9 +408,8 @@ func (t *TopicDiscoverer) allowTopicName(pattern string, name string) bool { return match } -func (t *TopicDiscoverer) syncTopics(addrs []string, pattern string, - connectTimeout time.Duration, requestTimeout time.Duration) { - newTopics, err := clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout)).GetLookupdTopics(addrs) +func (t *TopicDiscoverer) syncTopics(addrs []string, pattern string) { + newTopics, err := t.clusterInfo.GetLookupdTopics(addrs) if err != nil { log.Printf("ERROR: could not retrieve topic list: %s", err) } @@ -441,14 +442,13 @@ func (t *TopicDiscoverer) hup() { } } -func (t *TopicDiscoverer) watch(addrs []string, sync bool, pattern string, - connectTimeout time.Duration, requestTimeout time.Duration) { +func (t *TopicDiscoverer) watch(addrs []string, sync bool, pattern string) { ticker := time.Tick(*topicPollRate) for { select { case <-ticker: if sync { - t.syncTopics(addrs, pattern, connectTimeout, requestTimeout) + t.syncTopics(addrs, pattern) } case <-t.termChan: t.stop() @@ -501,7 +501,7 @@ func main() { cfg.UserAgent = fmt.Sprintf("nsq_to_file/%s go-nsq/%s", version.Binary, nsq.VERSION) cfg.MaxInFlight = *maxInFlight - discoverer := newTopicDiscoverer(cfg) + discoverer := newTopicDiscoverer(cfg, connectTimeout, requestTimeout) signal.Notify(discoverer.hupChan, syscall.SIGHUP) signal.Notify(discoverer.termChan, syscall.SIGINT, syscall.SIGTERM) @@ -515,7 +515,7 @@ func main() { } topicsFromNSQLookupd = true var err error - topics, err = clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout)).GetLookupdTopics(lookupdHTTPAddrs) + topics, err = discoverer.clusterInfo.GetLookupdTopics(lookupdHTTPAddrs) if err != nil { log.Fatalf("ERROR: could not retrieve topic list: %s", err) } @@ -535,5 +535,5 @@ func main() { go discoverer.startTopicRouter(logger) } - discoverer.watch(lookupdHTTPAddrs, topicsFromNSQLookupd, *topicPattern, connectTimeout, requestTimeout) + discoverer.watch(lookupdHTTPAddrs, topicsFromNSQLookupd, *topicPattern) }