Skip to content

Commit

Permalink
Added hard coded topic limit (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
Corbin Phelps authored Oct 1, 2018
1 parent 2741b86 commit 0962b92
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 1 deletion.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## 0.2.2 - 2018-10-01
### Added
- Hardcoded limit for topic collection

## 0.2.1 - 2018-09-25
### Changed
- Source Type on all metrics with `attr=Count` to `Rate`
Expand Down
19 changes: 18 additions & 1 deletion src/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

const (
integrationName = "com.newrelic.kafka"
integrationVersion = "0.2.1"
integrationVersion = "0.2.2"
)

func main() {
Expand Down Expand Up @@ -57,6 +57,9 @@ func coreCollection(zkConn zookeeper.Connection, kafkaIntegration *integration.I
collectedTopics, err := tc.GetTopics(zkConn)
ExitOnErr(err)

// Enforce hard limits on Topics
collectedTopics = enforceTopicLimit(collectedTopics)

// Setup wait group
var wg sync.WaitGroup

Expand Down Expand Up @@ -90,3 +93,17 @@ func ExitOnErr(err error) {
os.Exit(1)
}
}

// maxTopics is the maximum amount of Topics that can be collect.
// If there are more than this number of Topics then collection of
// Topics will fail.
const maxTopics = 300

func enforceTopicLimit(collectedTopics []string) []string {
if length := len(collectedTopics); length > maxTopics {
log.Error("There are %d topics in collection, the maximum amount of topics to collect is %d. Use the topic whitelist configuration parameter to limit collection size.", length, maxTopics)
return make([]string, 0)
}

return collectedTopics
}
38 changes: 38 additions & 0 deletions src/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package main

import (
"reflect"
"testing"
)

func Test_enforceTopicLimit(t *testing.T) {
overLimit := make([]string, maxTopics+1)

for i := 0; i < maxTopics+1; i++ {
overLimit[i] = "topic"
}

testCases := []struct {
name string
topics []string
want []string
}{
{
"Over Limit",
overLimit,
[]string{},
},
{
"Under Limit",
[]string{"topic"},
[]string{"topic"},
},
}

for _, tc := range testCases {
out := enforceTopicLimit(tc.topics)
if !reflect.DeepEqual(out, tc.want) {
t.Errorf("Test Case %s Failed: Expected '%+v' got '%+v'", tc.name, tc.want, out)
}
}
}

0 comments on commit 0962b92

Please sign in to comment.