Skip to content

Commit

Permalink
Unique keys (#51)
Browse files Browse the repository at this point in the history
* Prefix entity namespaces

* Update SDK version

* Add cluster name as an ID attribute

* Bump version

* Pull SDK updates

* Fix tests

* Fix formatting
  • Loading branch information
camdencheek authored Apr 26, 2019
1 parent 96a24c6 commit 246f607
Show file tree
Hide file tree
Showing 21 changed files with 413 additions and 100 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## 2.0.0 - 2019-04-22
### Changed
- Changed the entity namespaces to be kafka-scoped
- ClusterName is a required argument to better enforce uniquene

## 1.1.1 - 2019-02-04
### Changed
- Updated Definition file protocol version to 2
Expand Down
6 changes: 5 additions & 1 deletion kafka-config.yml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ instances:
- name: kafka-metrics
command: metrics
arguments:
# A cluster name is required to uniquely identify this collection result in Insights
cluster_name: "testcluster1"

# In order to collect broker and topic metrics a Zookeeper connection needs to be specified.
# The "zookeeper_hosts" field is a JSON array, each entry in the array connection information for a Zookeeper
# node.
Expand Down Expand Up @@ -81,7 +84,7 @@ instances:
- name: kafka-inventory
command: inventory
arguments:

cluster_name: "testcluster2"
# More information about the zookeeper connection can be found above
zookeeper_hosts: <JSON Array of Zookeeper Hosts of the form '[{"host": "localhost", "port": 2181}]'>
zookeeper_auth_scheme: <Auth scheme for Zookeeper>
Expand All @@ -103,6 +106,7 @@ instances:
- name: kafka-consumer-offsets
command: consumer_offset
arguments:
cluster_name: "testcluster3"
# More information about the zookeeper connection can be found above
zookeeper_hosts: <JSON Array of Zookeeper Hosts of the form '[{"host": "localhost", "port": 2181}]'>
zookeeper_auth_scheme: <Auth scheme for Zookeeper>
Expand Down
1 change: 1 addition & 0 deletions src/args/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
// ArgumentList is the raw arguments passed into the integration via yaml or CLI args
type ArgumentList struct {
sdkArgs.DefaultArgumentList
ClusterName string `default:"" help:"A user-defined name to uniquely identify the cluster"`
ZookeeperHosts string `default:"[]" help:"JSON array of ZooKeeper hosts with the following fields: host, port. Port defaults to 2181"`
ZookeeperAuthScheme string `default:"" help:"ACL scheme for authenticating ZooKeeper connection."`
ZookeeperAuthSecret string `default:"" help:"Authentication string for ZooKeeper."`
Expand Down
2 changes: 2 additions & 0 deletions src/args/kafka_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var GlobalArgs *KafkaArguments
// to allow arguments to be consumed easier.
type KafkaArguments struct {
sdkArgs.DefaultArgumentList
ClusterName string
ZookeeperHosts []*ZookeeperHost
ZookeeperAuthScheme string
ZookeeperAuthSecret string
Expand Down Expand Up @@ -99,6 +100,7 @@ func ParseArgs(a ArgumentList) (*KafkaArguments, error) {

parsedArgs := &KafkaArguments{
DefaultArgumentList: a.DefaultArgumentList,
ClusterName: a.ClusterName,
ZookeeperHosts: zookeeperHosts,
ZookeeperAuthScheme: a.ZookeeperAuthScheme,
ZookeeperAuthSecret: a.ZookeeperAuthSecret,
Expand Down
3 changes: 2 additions & 1 deletion src/brokercollect/broker_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ func createBroker(brokerID int, zkConn zookeeper.Connection, i *integration.Inte
}

// Create broker entity
brokerEntity, err := i.Entity(host, "broker")
clusterIDAttr := integration.NewIDAttribute("clusterName", args.GlobalArgs.ClusterName)
brokerEntity, err := i.Entity(host, "ka-broker", clusterIDAttr)
if err != nil {
log.Error("Unable to create entity for broker ID %d: %s", brokerID, err)
return nil, err
Expand Down
19 changes: 11 additions & 8 deletions src/brokercollect/broker_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/newrelic/infra-integrations-sdk/data/inventory"
"github.com/newrelic/infra-integrations-sdk/data/metric"
"github.com/newrelic/infra-integrations-sdk/integration"
"github.com/newrelic/infra-integrations-sdk/jmx"
"github.com/newrelic/nri-kafka/src/jmxwrapper"
"github.com/newrelic/nri-kafka/src/testutils"
"github.com/newrelic/nri-kafka/src/zookeeper"
Expand Down Expand Up @@ -107,13 +108,13 @@ func TestCreateBroker_Normal(t *testing.T) {
t.Errorf("Expected JMX Port '%d' got '%d'", expectedBroker.JMXPort, b.JMXPort)
}
if expectedBroker.KafkaPort != b.KafkaPort {
t.Errorf("Expected JMX Port '%d' got '%d'", expectedBroker.KafkaPort, b.KafkaPort)
t.Errorf("Expected Kafka Port '%d' got '%d'", expectedBroker.KafkaPort, b.KafkaPort)
}
if b.Entity.Metadata.Name != b.Host {
t.Errorf("Expected entity name '%s' got '%s'", expectedBroker.Host, expectedBroker.Entity.Metadata.Name)
t.Errorf("Expected entity name '%s' got '%s'", expectedBroker.Host, b.Entity.Metadata.Name)
}
if b.Entity.Metadata.Namespace != "broker" {
t.Errorf("Expected entity name '%s' got '%s'", "broker", expectedBroker.Entity.Metadata.Name)
if b.Entity.Metadata.Namespace != "ka-broker" {
t.Errorf("Expected entity name '%s' got '%s'", "ka-broker", b.Entity.Metadata.Namespace)
}
}

Expand All @@ -127,7 +128,7 @@ func TestPopulateBrokerInventory(t *testing.T) {
}
i, _ := integration.New("kafka", "1.0.0")

testBroker.Entity, _ = i.Entity("brokerHost", "broker")
testBroker.Entity, _ = i.Entity("brokerHost", "ka-broker")

if err := populateBrokerInventory(testBroker); err != nil {
t.Errorf("Unexpected error: %s", err.Error())
Expand Down Expand Up @@ -162,7 +163,9 @@ func TestPopulateBrokerMetrics_JMXOpenError(t *testing.T) {
testutils.SetupJmxTesting()
errorText := "jmx error"

jmxwrapper.JMXOpen = func(hostname, port, username, password string) error { return errors.New(errorText) }
jmxwrapper.JMXOpen = func(hostname, port, username, password string, options ...jmx.Option) error {
return errors.New(errorText)
}
testBroker := &broker{
Host: "kafkabroker",
JMXPort: 9999,
Expand All @@ -171,7 +174,7 @@ func TestPopulateBrokerMetrics_JMXOpenError(t *testing.T) {
}
i, _ := integration.New("kafka", "1.0.0")

testBroker.Entity, _ = i.Entity(testBroker.Host, "broker")
testBroker.Entity, _ = i.Entity(testBroker.Host, "ka-broker")

err := collectBrokerMetrics(testBroker, []string{})
if err == nil {
Expand All @@ -193,7 +196,7 @@ func TestPopulateBrokerMetrics_Normal(t *testing.T) {
}
i, _ := integration.New("kafka", "1.0.0")

testBroker.Entity, _ = i.Entity(testBroker.Host, "broker")
testBroker.Entity, _ = i.Entity(testBroker.Host, "ka-broker")

populateBrokerMetrics(testBroker)

Expand Down
3 changes: 2 additions & 1 deletion src/conoffsetcollect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ func Collect(zkConn zookeeper.Connection, kafkaIntegration *integration.Integrat

// setMetrics adds the metrics from an array of partitionOffsets to the integration
func setMetrics(consumerGroup string, offsetData []*partitionOffsets, kafkaIntegration *integration.Integration) error {
groupEntity, err := kafkaIntegration.Entity(consumerGroup, "consumerGroup")
clusterIDAttr := integration.NewIDAttribute("clusterName", args.GlobalArgs.ClusterName)
groupEntity, err := kafkaIntegration.Entity(consumerGroup, "ka-consumerGroup", clusterIDAttr)
if err != nil {
return err
}
Expand Down
7 changes: 5 additions & 2 deletions src/conoffsetcollect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ func TestCollect(t *testing.T) {
mockClient := connection.MockClient{}
mockBroker := connection.MockBroker{}

args.GlobalArgs = &args.KafkaArguments{}
args.GlobalArgs = &args.KafkaArguments{
ClusterName: "testcluster",
}
args.GlobalArgs.ConsumerGroups = map[string]map[string][]int32{
"testGroup": {
"testTopic": {
Expand Down Expand Up @@ -73,7 +75,8 @@ func Test_setMetrics(t *testing.T) {
err := setMetrics("testGroup", offsetData, i)

assert.Nil(t, err)
resultEntity, err := i.Entity("testGroup", "consumerGroup")
clusterIDAttr := integration.NewIDAttribute("clusterName", "testcluster")
resultEntity, err := i.Entity("testGroup", "ka-consumerGroup", clusterIDAttr)
assert.Nil(t, err)
assert.Equal(t, 8, len(resultEntity.Metrics[0].Metrics))

Expand Down
2 changes: 1 addition & 1 deletion src/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

const (
integrationName = "com.newrelic.kafka"
integrationVersion = "1.1.1"
integrationVersion = "2.0.0"
)

func main() {
Expand Down
4 changes: 2 additions & 2 deletions src/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ func CollectTopicSubMetrics(entity *integration.Entity, entityType string,
beanModifier func(string, string) BeanModifier) {

// need to title case the type so it matches the metric set of the parent entity
titleEntityType := strings.Title(entity.Metadata.Namespace)
titleEntityType := strings.Title(strings.TrimPrefix(entity.Metadata.Namespace, "ka-"))

for _, topicName := range topicList {
topicSample := entity.NewMetricSet("Kafka"+titleEntityType+"Sample",
metric.Attribute{Key: "displayName", Value: entity.Metadata.Name},
metric.Attribute{Key: "entityName", Value: fmt.Sprintf("%s:%s", entity.Metadata.Namespace, entity.Metadata.Name)},
metric.Attribute{Key: "entityName", Value: fmt.Sprintf("%s:%s", strings.TrimPrefix(entity.Metadata.Namespace, "ka-"), entity.Metadata.Name)},
metric.Attribute{Key: "topic", Value: topicName},
)

Expand Down
5 changes: 3 additions & 2 deletions src/prodconcollect/producer_consumer_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func ConsumerWorker(consumerChan <-chan *args.JMXHost, wg *sync.WaitGroup, i *in
}

// Create an entity for the consumer
consumerEntity, err := i.Entity(jmxInfo.Name, "consumer")
clusterIDAttr := integration.NewIDAttribute("clusterName", args.GlobalArgs.ClusterName)
consumerEntity, err := i.Entity(jmxInfo.Name, "ka-consumer", clusterIDAttr)
if err != nil {
log.Error("Unable to create entity for Consumer %s: %s", jmxInfo.Name, err.Error())
continue
Expand Down Expand Up @@ -103,7 +104,7 @@ func ProducerWorker(producerChan <-chan *args.JMXHost, wg *sync.WaitGroup, i *in
}

// Create the producer entity
producerEntity, err := i.Entity(jmxInfo.Name, "producer")
producerEntity, err := i.Entity(jmxInfo.Name, "ka-producer")
if err != nil {
log.Error("Unable to create entity for Producer %s: %s", jmxInfo.Name, err.Error())
continue
Expand Down
9 changes: 7 additions & 2 deletions src/prodconcollect/producer_consumer_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/newrelic/infra-integrations-sdk/integration"
"github.com/newrelic/infra-integrations-sdk/jmx"
"github.com/newrelic/nri-kafka/src/args"
"github.com/newrelic/nri-kafka/src/jmxwrapper"
"github.com/newrelic/nri-kafka/src/testutils"
Expand Down Expand Up @@ -85,7 +86,9 @@ func TestConsumerWorker(t *testing.T) {

func TestConsumerWorker_JmxOpenFuncErr(t *testing.T) {
testutils.SetupJmxTesting()
jmxwrapper.JMXOpen = func(hostname, port, username, password string) error { return errors.New("test") }
jmxwrapper.JMXOpen = func(hostname, port, username, password string, options ...jmx.Option) error {
return errors.New("test")
}
consumerChan := make(chan *args.JMXHost, 10)
var wg sync.WaitGroup
i, err := integration.New("kafka", "1.0.0")
Expand Down Expand Up @@ -135,7 +138,9 @@ func TestProducerWorker(t *testing.T) {

func TestProducerWorker_JmxOpenFuncErr(t *testing.T) {
testutils.SetupJmxTesting()
jmxwrapper.JMXOpen = func(hostname, port, username, password string) error { return errors.New("test") }
jmxwrapper.JMXOpen = func(hostname, port, username, password string, options ...jmx.Option) error {
return errors.New("test")
}
producerChan := make(chan *args.JMXHost, 10)
var wg sync.WaitGroup
i, err := integration.New("kafka", "1.0.0")
Expand Down
3 changes: 2 additions & 1 deletion src/testutils/test_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package testutils

import (
"github.com/newrelic/infra-integrations-sdk/jmx"
"github.com/newrelic/nri-kafka/src/args"
"github.com/newrelic/nri-kafka/src/jmxwrapper"
)
Expand All @@ -14,7 +15,7 @@ func SetupTestArgs() {

// SetupJmxTesting sets all JMX wrapper variables to basic shells
func SetupJmxTesting() {
jmxwrapper.JMXOpen = func(hostname, port, username, password string) error { return nil }
jmxwrapper.JMXOpen = func(hostname, port, username, password string, options ...jmx.Option) error { return nil }
jmxwrapper.JMXClose = func() {}
jmxwrapper.JMXQuery = func(query string, timeout int) (map[string]interface{}, error) { return map[string]interface{}{}, nil }
}
5 changes: 3 additions & 2 deletions src/topiccollect/topic_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,14 @@ func GetTopics(zkConn zookeeper.Connection) ([]string, error) {
}

// FeedTopicPool sends Topic structs down the topicChan for workers to collect and build Topic structs
func FeedTopicPool(topicChan chan<- *Topic, integration *integration.Integration, collectedTopics []string) {
func FeedTopicPool(topicChan chan<- *Topic, i *integration.Integration, collectedTopics []string) {
defer close(topicChan)

if args.GlobalArgs.CollectBrokerTopicData {
for _, topicName := range collectedTopics {
// create topic entity
topicEntity, err := integration.Entity(topicName, "topic")
clusterIDAttr := integration.NewIDAttribute("clusterName", args.GlobalArgs.ClusterName)
topicEntity, err := i.Entity(topicName, "ka-topic", clusterIDAttr)
if err != nil {
log.Error("Unable to create an entity for topic %s", topicName)
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 246f607

Please sign in to comment.