Skip to content

Commit

Permalink
Merge pull request #935 from jxskiss/master
Browse files Browse the repository at this point in the history
nsq_to_file: --topic-pattern causes connection leaks
  • Loading branch information
mreiferson authored Aug 28, 2017
2 parents e2aa615 + 165f14a commit 18fc338
Showing 1 changed file with 10 additions and 10 deletions.
20 changes: 10 additions & 10 deletions apps/nsq_to_file/nsq_to_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,17 @@ type ConsumerFileLogger struct {
}

type TopicDiscoverer struct {
clusterInfo *clusterinfo.ClusterInfo
topics map[string]*ConsumerFileLogger
termChan chan os.Signal
hupChan chan os.Signal
wg sync.WaitGroup
cfg *nsq.Config
}

func newTopicDiscoverer(cfg *nsq.Config) *TopicDiscoverer {
func newTopicDiscoverer(cfg *nsq.Config, connectTimeout time.Duration, requestTimeout time.Duration) *TopicDiscoverer {
return &TopicDiscoverer{
clusterInfo: clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout)),
topics: make(map[string]*ConsumerFileLogger),
termChan: make(chan os.Signal),
hupChan: make(chan os.Signal),
Expand Down Expand Up @@ -406,9 +408,8 @@ func (t *TopicDiscoverer) allowTopicName(pattern string, name string) bool {
return match
}

func (t *TopicDiscoverer) syncTopics(addrs []string, pattern string,
connectTimeout time.Duration, requestTimeout time.Duration) {
newTopics, err := clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout)).GetLookupdTopics(addrs)
func (t *TopicDiscoverer) syncTopics(addrs []string, pattern string) {
newTopics, err := t.clusterInfo.GetLookupdTopics(addrs)
if err != nil {
log.Printf("ERROR: could not retrieve topic list: %s", err)
}
Expand Down Expand Up @@ -441,14 +442,13 @@ func (t *TopicDiscoverer) hup() {
}
}

func (t *TopicDiscoverer) watch(addrs []string, sync bool, pattern string,
connectTimeout time.Duration, requestTimeout time.Duration) {
func (t *TopicDiscoverer) watch(addrs []string, sync bool, pattern string) {
ticker := time.Tick(*topicPollRate)
for {
select {
case <-ticker:
if sync {
t.syncTopics(addrs, pattern, connectTimeout, requestTimeout)
t.syncTopics(addrs, pattern)
}
case <-t.termChan:
t.stop()
Expand Down Expand Up @@ -501,7 +501,7 @@ func main() {
cfg.UserAgent = fmt.Sprintf("nsq_to_file/%s go-nsq/%s", version.Binary, nsq.VERSION)
cfg.MaxInFlight = *maxInFlight

discoverer := newTopicDiscoverer(cfg)
discoverer := newTopicDiscoverer(cfg, connectTimeout, requestTimeout)

signal.Notify(discoverer.hupChan, syscall.SIGHUP)
signal.Notify(discoverer.termChan, syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -515,7 +515,7 @@ func main() {
}
topicsFromNSQLookupd = true
var err error
topics, err = clusterinfo.New(nil, http_api.NewClient(nil, connectTimeout, requestTimeout)).GetLookupdTopics(lookupdHTTPAddrs)
topics, err = discoverer.clusterInfo.GetLookupdTopics(lookupdHTTPAddrs)
if err != nil {
log.Fatalf("ERROR: could not retrieve topic list: %s", err)
}
Expand All @@ -535,5 +535,5 @@ func main() {
go discoverer.startTopicRouter(logger)
}

discoverer.watch(lookupdHTTPAddrs, topicsFromNSQLookupd, *topicPattern, connectTimeout, requestTimeout)
discoverer.watch(lookupdHTTPAddrs, topicsFromNSQLookupd, *topicPattern)
}

0 comments on commit 18fc338

Please sign in to comment.