diff --git a/CHANGELOG.md b/CHANGELOG.md index 29447e3f..8977030a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## 2.0.0 - 2019-04-22 +### Changed +- Changed the entity namespaces to be kafka-scoped +- ClusterName is a required argument to better enforce uniquene + ## 1.1.1 - 2019-02-04 ### Changed - Updated Definition file protocol version to 2 diff --git a/kafka-config.yml.sample b/kafka-config.yml.sample index ccf3faf8..c5efdd7c 100644 --- a/kafka-config.yml.sample +++ b/kafka-config.yml.sample @@ -4,6 +4,9 @@ instances: - name: kafka-metrics command: metrics arguments: + # A cluster name is required to uniquely identify this collection result in Insights + cluster_name: "testcluster1" + # In order to collect broker and topic metrics a Zookeeper connection needs to be specified. # The "zookeeper_hosts" field is a JSON array, each entry in the array connection information for a Zookeeper # node. @@ -81,7 +84,7 @@ instances: - name: kafka-inventory command: inventory arguments: - + cluster_name: "testcluster2" # More information about the zookeeper connection can be found above zookeeper_hosts: zookeeper_auth_scheme: @@ -103,6 +106,7 @@ instances: - name: kafka-consumer-offsets command: consumer_offset arguments: + cluster_name: "testcluster3" # More information about the zookeeper connection can be found above zookeeper_hosts: zookeeper_auth_scheme: diff --git a/src/args/args.go b/src/args/args.go index eddbdff3..7faac706 100644 --- a/src/args/args.go +++ b/src/args/args.go @@ -15,6 +15,7 @@ const ( // ArgumentList is the raw arguments passed into the integration via yaml or CLI args type ArgumentList struct { sdkArgs.DefaultArgumentList + ClusterName string `default:"" help:"A user-defined name to uniquely identify the cluster"` ZookeeperHosts string `default:"[]" help:"JSON array of ZooKeeper hosts with the following fields: host, port. Port defaults to 2181"` ZookeeperAuthScheme string `default:"" help:"ACL scheme for authenticating ZooKeeper connection."` ZookeeperAuthSecret string `default:"" help:"Authentication string for ZooKeeper."` diff --git a/src/args/kafka_args.go b/src/args/kafka_args.go index b684cf5e..3ff60da3 100644 --- a/src/args/kafka_args.go +++ b/src/args/kafka_args.go @@ -16,6 +16,7 @@ var GlobalArgs *KafkaArguments // to allow arguments to be consumed easier. type KafkaArguments struct { sdkArgs.DefaultArgumentList + ClusterName string ZookeeperHosts []*ZookeeperHost ZookeeperAuthScheme string ZookeeperAuthSecret string @@ -99,6 +100,7 @@ func ParseArgs(a ArgumentList) (*KafkaArguments, error) { parsedArgs := &KafkaArguments{ DefaultArgumentList: a.DefaultArgumentList, + ClusterName: a.ClusterName, ZookeeperHosts: zookeeperHosts, ZookeeperAuthScheme: a.ZookeeperAuthScheme, ZookeeperAuthSecret: a.ZookeeperAuthSecret, diff --git a/src/brokercollect/broker_collection.go b/src/brokercollect/broker_collection.go index 7c8d0333..93a73723 100644 --- a/src/brokercollect/broker_collection.go +++ b/src/brokercollect/broker_collection.go @@ -121,7 +121,8 @@ func createBroker(brokerID int, zkConn zookeeper.Connection, i *integration.Inte } // Create broker entity - brokerEntity, err := i.Entity(host, "broker") + clusterIDAttr := integration.NewIDAttribute("clusterName", args.GlobalArgs.ClusterName) + brokerEntity, err := i.Entity(host, "ka-broker", clusterIDAttr) if err != nil { log.Error("Unable to create entity for broker ID %d: %s", brokerID, err) return nil, err diff --git a/src/brokercollect/broker_collection_test.go b/src/brokercollect/broker_collection_test.go index 4fc036d2..bfcf6881 100644 --- a/src/brokercollect/broker_collection_test.go +++ b/src/brokercollect/broker_collection_test.go @@ -12,6 +12,7 @@ import ( "github.com/newrelic/infra-integrations-sdk/data/inventory" "github.com/newrelic/infra-integrations-sdk/data/metric" "github.com/newrelic/infra-integrations-sdk/integration" + "github.com/newrelic/infra-integrations-sdk/jmx" "github.com/newrelic/nri-kafka/src/jmxwrapper" "github.com/newrelic/nri-kafka/src/testutils" "github.com/newrelic/nri-kafka/src/zookeeper" @@ -107,13 +108,13 @@ func TestCreateBroker_Normal(t *testing.T) { t.Errorf("Expected JMX Port '%d' got '%d'", expectedBroker.JMXPort, b.JMXPort) } if expectedBroker.KafkaPort != b.KafkaPort { - t.Errorf("Expected JMX Port '%d' got '%d'", expectedBroker.KafkaPort, b.KafkaPort) + t.Errorf("Expected Kafka Port '%d' got '%d'", expectedBroker.KafkaPort, b.KafkaPort) } if b.Entity.Metadata.Name != b.Host { - t.Errorf("Expected entity name '%s' got '%s'", expectedBroker.Host, expectedBroker.Entity.Metadata.Name) + t.Errorf("Expected entity name '%s' got '%s'", expectedBroker.Host, b.Entity.Metadata.Name) } - if b.Entity.Metadata.Namespace != "broker" { - t.Errorf("Expected entity name '%s' got '%s'", "broker", expectedBroker.Entity.Metadata.Name) + if b.Entity.Metadata.Namespace != "ka-broker" { + t.Errorf("Expected entity name '%s' got '%s'", "ka-broker", b.Entity.Metadata.Namespace) } } @@ -127,7 +128,7 @@ func TestPopulateBrokerInventory(t *testing.T) { } i, _ := integration.New("kafka", "1.0.0") - testBroker.Entity, _ = i.Entity("brokerHost", "broker") + testBroker.Entity, _ = i.Entity("brokerHost", "ka-broker") if err := populateBrokerInventory(testBroker); err != nil { t.Errorf("Unexpected error: %s", err.Error()) @@ -162,7 +163,9 @@ func TestPopulateBrokerMetrics_JMXOpenError(t *testing.T) { testutils.SetupJmxTesting() errorText := "jmx error" - jmxwrapper.JMXOpen = func(hostname, port, username, password string) error { return errors.New(errorText) } + jmxwrapper.JMXOpen = func(hostname, port, username, password string, options ...jmx.Option) error { + return errors.New(errorText) + } testBroker := &broker{ Host: "kafkabroker", JMXPort: 9999, @@ -171,7 +174,7 @@ func TestPopulateBrokerMetrics_JMXOpenError(t *testing.T) { } i, _ := integration.New("kafka", "1.0.0") - testBroker.Entity, _ = i.Entity(testBroker.Host, "broker") + testBroker.Entity, _ = i.Entity(testBroker.Host, "ka-broker") err := collectBrokerMetrics(testBroker, []string{}) if err == nil { @@ -193,7 +196,7 @@ func TestPopulateBrokerMetrics_Normal(t *testing.T) { } i, _ := integration.New("kafka", "1.0.0") - testBroker.Entity, _ = i.Entity(testBroker.Host, "broker") + testBroker.Entity, _ = i.Entity(testBroker.Host, "ka-broker") populateBrokerMetrics(testBroker) diff --git a/src/conoffsetcollect/collect.go b/src/conoffsetcollect/collect.go index 18c0a35a..46bd3c4f 100644 --- a/src/conoffsetcollect/collect.go +++ b/src/conoffsetcollect/collect.go @@ -69,7 +69,8 @@ func Collect(zkConn zookeeper.Connection, kafkaIntegration *integration.Integrat // setMetrics adds the metrics from an array of partitionOffsets to the integration func setMetrics(consumerGroup string, offsetData []*partitionOffsets, kafkaIntegration *integration.Integration) error { - groupEntity, err := kafkaIntegration.Entity(consumerGroup, "consumerGroup") + clusterIDAttr := integration.NewIDAttribute("clusterName", args.GlobalArgs.ClusterName) + groupEntity, err := kafkaIntegration.Entity(consumerGroup, "ka-consumerGroup", clusterIDAttr) if err != nil { return err } diff --git a/src/conoffsetcollect/collect_test.go b/src/conoffsetcollect/collect_test.go index ea07a9e6..5b1cc26e 100644 --- a/src/conoffsetcollect/collect_test.go +++ b/src/conoffsetcollect/collect_test.go @@ -26,7 +26,9 @@ func TestCollect(t *testing.T) { mockClient := connection.MockClient{} mockBroker := connection.MockBroker{} - args.GlobalArgs = &args.KafkaArguments{} + args.GlobalArgs = &args.KafkaArguments{ + ClusterName: "testcluster", + } args.GlobalArgs.ConsumerGroups = map[string]map[string][]int32{ "testGroup": { "testTopic": { @@ -73,7 +75,8 @@ func Test_setMetrics(t *testing.T) { err := setMetrics("testGroup", offsetData, i) assert.Nil(t, err) - resultEntity, err := i.Entity("testGroup", "consumerGroup") + clusterIDAttr := integration.NewIDAttribute("clusterName", "testcluster") + resultEntity, err := i.Entity("testGroup", "ka-consumerGroup", clusterIDAttr) assert.Nil(t, err) assert.Equal(t, 8, len(resultEntity.Metrics[0].Metrics)) diff --git a/src/kafka.go b/src/kafka.go index ffafce22..1d891902 100644 --- a/src/kafka.go +++ b/src/kafka.go @@ -16,7 +16,7 @@ import ( const ( integrationName = "com.newrelic.kafka" - integrationVersion = "1.1.1" + integrationVersion = "2.0.0" ) func main() { diff --git a/src/metrics/metrics.go b/src/metrics/metrics.go index e947861d..16b34c22 100644 --- a/src/metrics/metrics.go +++ b/src/metrics/metrics.go @@ -39,12 +39,12 @@ func CollectTopicSubMetrics(entity *integration.Entity, entityType string, beanModifier func(string, string) BeanModifier) { // need to title case the type so it matches the metric set of the parent entity - titleEntityType := strings.Title(entity.Metadata.Namespace) + titleEntityType := strings.Title(strings.TrimPrefix(entity.Metadata.Namespace, "ka-")) for _, topicName := range topicList { topicSample := entity.NewMetricSet("Kafka"+titleEntityType+"Sample", metric.Attribute{Key: "displayName", Value: entity.Metadata.Name}, - metric.Attribute{Key: "entityName", Value: fmt.Sprintf("%s:%s", entity.Metadata.Namespace, entity.Metadata.Name)}, + metric.Attribute{Key: "entityName", Value: fmt.Sprintf("%s:%s", strings.TrimPrefix(entity.Metadata.Namespace, "ka-"), entity.Metadata.Name)}, metric.Attribute{Key: "topic", Value: topicName}, ) diff --git a/src/prodconcollect/producer_consumer_collection.go b/src/prodconcollect/producer_consumer_collection.go index 201abba7..f7efb8d8 100644 --- a/src/prodconcollect/producer_consumer_collection.go +++ b/src/prodconcollect/producer_consumer_collection.go @@ -49,7 +49,8 @@ func ConsumerWorker(consumerChan <-chan *args.JMXHost, wg *sync.WaitGroup, i *in } // Create an entity for the consumer - consumerEntity, err := i.Entity(jmxInfo.Name, "consumer") + clusterIDAttr := integration.NewIDAttribute("clusterName", args.GlobalArgs.ClusterName) + consumerEntity, err := i.Entity(jmxInfo.Name, "ka-consumer", clusterIDAttr) if err != nil { log.Error("Unable to create entity for Consumer %s: %s", jmxInfo.Name, err.Error()) continue @@ -103,7 +104,7 @@ func ProducerWorker(producerChan <-chan *args.JMXHost, wg *sync.WaitGroup, i *in } // Create the producer entity - producerEntity, err := i.Entity(jmxInfo.Name, "producer") + producerEntity, err := i.Entity(jmxInfo.Name, "ka-producer") if err != nil { log.Error("Unable to create entity for Producer %s: %s", jmxInfo.Name, err.Error()) continue diff --git a/src/prodconcollect/producer_consumer_collection_test.go b/src/prodconcollect/producer_consumer_collection_test.go index 6ed7d911..6f70739f 100644 --- a/src/prodconcollect/producer_consumer_collection_test.go +++ b/src/prodconcollect/producer_consumer_collection_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/newrelic/infra-integrations-sdk/integration" + "github.com/newrelic/infra-integrations-sdk/jmx" "github.com/newrelic/nri-kafka/src/args" "github.com/newrelic/nri-kafka/src/jmxwrapper" "github.com/newrelic/nri-kafka/src/testutils" @@ -85,7 +86,9 @@ func TestConsumerWorker(t *testing.T) { func TestConsumerWorker_JmxOpenFuncErr(t *testing.T) { testutils.SetupJmxTesting() - jmxwrapper.JMXOpen = func(hostname, port, username, password string) error { return errors.New("test") } + jmxwrapper.JMXOpen = func(hostname, port, username, password string, options ...jmx.Option) error { + return errors.New("test") + } consumerChan := make(chan *args.JMXHost, 10) var wg sync.WaitGroup i, err := integration.New("kafka", "1.0.0") @@ -135,7 +138,9 @@ func TestProducerWorker(t *testing.T) { func TestProducerWorker_JmxOpenFuncErr(t *testing.T) { testutils.SetupJmxTesting() - jmxwrapper.JMXOpen = func(hostname, port, username, password string) error { return errors.New("test") } + jmxwrapper.JMXOpen = func(hostname, port, username, password string, options ...jmx.Option) error { + return errors.New("test") + } producerChan := make(chan *args.JMXHost, 10) var wg sync.WaitGroup i, err := integration.New("kafka", "1.0.0") diff --git a/src/testutils/test_setup.go b/src/testutils/test_setup.go index e9c69c4e..2a2abcc0 100644 --- a/src/testutils/test_setup.go +++ b/src/testutils/test_setup.go @@ -2,6 +2,7 @@ package testutils import ( + "github.com/newrelic/infra-integrations-sdk/jmx" "github.com/newrelic/nri-kafka/src/args" "github.com/newrelic/nri-kafka/src/jmxwrapper" ) @@ -14,7 +15,7 @@ func SetupTestArgs() { // SetupJmxTesting sets all JMX wrapper variables to basic shells func SetupJmxTesting() { - jmxwrapper.JMXOpen = func(hostname, port, username, password string) error { return nil } + jmxwrapper.JMXOpen = func(hostname, port, username, password string, options ...jmx.Option) error { return nil } jmxwrapper.JMXClose = func() {} jmxwrapper.JMXQuery = func(query string, timeout int) (map[string]interface{}, error) { return map[string]interface{}{}, nil } } diff --git a/src/topiccollect/topic_collection.go b/src/topiccollect/topic_collection.go index b73d2c82..aedb62a1 100644 --- a/src/topiccollect/topic_collection.go +++ b/src/topiccollect/topic_collection.go @@ -67,13 +67,14 @@ func GetTopics(zkConn zookeeper.Connection) ([]string, error) { } // FeedTopicPool sends Topic structs down the topicChan for workers to collect and build Topic structs -func FeedTopicPool(topicChan chan<- *Topic, integration *integration.Integration, collectedTopics []string) { +func FeedTopicPool(topicChan chan<- *Topic, i *integration.Integration, collectedTopics []string) { defer close(topicChan) if args.GlobalArgs.CollectBrokerTopicData { for _, topicName := range collectedTopics { // create topic entity - topicEntity, err := integration.Entity(topicName, "topic") + clusterIDAttr := integration.NewIDAttribute("clusterName", args.GlobalArgs.ClusterName) + topicEntity, err := i.Entity(topicName, "ka-topic", clusterIDAttr) if err != nil { log.Error("Unable to create an entity for topic %s", topicName) } diff --git a/vendor/github.com/newrelic/infra-integrations-sdk/args/args.go b/vendor/github.com/newrelic/infra-integrations-sdk/args/args.go index d29a4b95..9fae756c 100644 --- a/vendor/github.com/newrelic/infra-integrations-sdk/args/args.go +++ b/vendor/github.com/newrelic/infra-integrations-sdk/args/args.go @@ -14,11 +14,15 @@ import ( // DefaultArgumentList includes the minimal set of necessary arguments for an integration. // If all data flags (Inventory, Metrics and Events) are false, all of them are published. type DefaultArgumentList struct { - Verbose bool `default:"false" help:"Print more information to logs."` - Pretty bool `default:"false" help:"Print pretty formatted JSON."` - Metrics bool `default:"false" help:"Publish metrics data."` - Inventory bool `default:"false" help:"Publish inventory data."` - Events bool `default:"false" help:"Publish events data."` + Verbose bool `default:"false" help:"Print more information to logs."` + Pretty bool `default:"false" help:"Print pretty formatted JSON."` + Metrics bool `default:"false" help:"Publish metrics data."` + Inventory bool `default:"false" help:"Publish inventory data."` + Events bool `default:"false" help:"Publish events data."` + Metadata bool `default:"false" help:"Add customer defined key-value attributes to the samples."` + NriAddHostname bool `default:"false" help:"Add hostname attribute to the samples."` + NriCluster string `default:"" help:"Optional. Cluster name"` + NriService string `default:"" help:"Optional. Service name"` } // All returns if all data should be published diff --git a/vendor/github.com/newrelic/infra-integrations-sdk/data/metric/metrics.go b/vendor/github.com/newrelic/infra-integrations-sdk/data/metric/metrics.go index a9ca3bc6..58f138d3 100644 --- a/vendor/github.com/newrelic/infra-integrations-sdk/data/metric/metrics.go +++ b/vendor/github.com/newrelic/infra-integrations-sdk/data/metric/metrics.go @@ -10,7 +10,7 @@ import ( "github.com/pkg/errors" ) -// Attribute represents an attribute metric in key-value pair format. +// Attribute represents a metric attribute key-value pair. type Attribute struct { Key string Value string @@ -57,6 +57,13 @@ func NewSet(eventType string, storer persist.Storer, attributes ...Attribute) (s return } +// AddCustomAttributes add customAttributes to MetricSet +func AddCustomAttributes(metricSet *Set, customAttributes []Attribute) { + for _, attr := range customAttributes { + metricSet.setSetAttribute(attr.Key, attr.Value) + } +} + // Attr creates an attribute aimed to namespace a metric-set. func Attr(key string, value string) Attribute { return Attribute{ diff --git a/vendor/github.com/newrelic/infra-integrations-sdk/integration/entity.go b/vendor/github.com/newrelic/infra-integrations-sdk/integration/entity.go index a0fd1829..571531c5 100644 --- a/vendor/github.com/newrelic/infra-integrations-sdk/integration/entity.go +++ b/vendor/github.com/newrelic/infra-integrations-sdk/integration/entity.go @@ -12,54 +12,83 @@ import ( // Entity is the producer of the data. Entity could be a host, a container, a pod, or whatever unit of meaning. type Entity struct { - Metadata *EntityMetadata `json:"entity,omitempty"` - Metrics []*metric.Set `json:"metrics"` - Inventory *inventory.Inventory `json:"inventory"` - Events []*event.Event `json:"events"` - storer persist.Storer - lock sync.Locker + Metadata *EntityMetadata `json:"entity,omitempty"` + Metrics []*metric.Set `json:"metrics"` + Inventory *inventory.Inventory `json:"inventory"` + Events []*event.Event `json:"events"` + AddHostname bool `json:"add_hostname,omitempty"` // add hostname to metadata at agent level + storer persist.Storer + lock sync.Locker + // CustomAttributes []metric.Attribute `json:"custom_attributes,omitempty"` + customAttributes []metric.Attribute } // EntityMetadata stores entity Metadata type EntityMetadata struct { - Name string `json:"name"` - Namespace string `json:"type"` // For compatibility reasons we keep the type. + Name string `json:"name"` + Namespace string `json:"type"` // For compatibility reasons we keep the type. + IDAttrs IDAttributes `json:"id_attributes"` // For entity Key uniqueness +} + +// EqualsTo returns true when both metadata are equal. +func (m *EntityMetadata) EqualsTo(b *EntityMetadata) bool { + // prevent checking on Key() for performance + if m.Name != b.Name || m.Namespace != b.Namespace { + return false + } + + k1, err := m.Key() + if err != nil { + return false + } + + k2, err := b.Key() + if err != nil { + return false + } + + return k1.String() == k2.String() } // newLocalEntity creates unique default entity without identifier (name & type) -func newLocalEntity(storer persist.Storer) *Entity { +func newLocalEntity(storer persist.Storer, addHostnameToMetadata bool) *Entity { return &Entity{ // empty array or object preferred instead of null on marshaling. - Metrics: []*metric.Set{}, - Inventory: inventory.New(), - Events: []*event.Event{}, - storer: storer, - lock: &sync.Mutex{}, + Metrics: []*metric.Set{}, + Inventory: inventory.New(), + Events: []*event.Event{}, + AddHostname: addHostnameToMetadata, + storer: storer, + lock: &sync.Mutex{}, } } -// newEntity creates a new remote-entity. -func newEntity(name, namespace string, storer persist.Storer) (*Entity, error) { - // If one of the attributes is defined, both Name and Namespace are needed. - if name == "" && namespace != "" || name != "" && namespace == "" { +// newEntity creates a new remote-entity with entity attributes. +func newEntity( + name, + namespace string, + storer persist.Storer, + addHostnameToMetadata bool, + idAttrs ...IDAttribute, +) (*Entity, error) { + + if name == "" || namespace == "" { return nil, errors.New("entity name and type are required when defining one") } d := Entity{ // empty array or object preferred instead of null on marshaling. - Metrics: []*metric.Set{}, - Inventory: inventory.New(), - Events: []*event.Event{}, - storer: storer, - lock: &sync.Mutex{}, - } - - // Entity data is optional. When not specified, data from the integration is reported for the agent's own entity. - if name != "" && namespace != "" { - d.Metadata = &EntityMetadata{ + Metrics: []*metric.Set{}, + Inventory: inventory.New(), + Events: []*event.Event{}, + AddHostname: addHostnameToMetadata, + storer: storer, + lock: &sync.Mutex{}, + Metadata: &EntityMetadata{ Name: name, Namespace: namespace, - } + IDAttrs: idAttributes(idAttrs...), + }, } return &d, nil @@ -70,10 +99,24 @@ func (e *Entity) isLocalEntity() bool { return e.Metadata == nil || e.Metadata.Name == "" } +// SameAs return true when is same entity +func (e *Entity) SameAs(b *Entity) bool { + if e.Metadata == nil || b.Metadata == nil { + return false + } + + return e.Metadata.EqualsTo(b.Metadata) +} + // NewMetricSet returns a new instance of Set with its sample attached to the integration. func (e *Entity) NewMetricSet(eventType string, nameSpacingAttributes ...metric.Attribute) *metric.Set { + s := metric.NewSet(eventType, e.storer, nameSpacingAttributes...) + if len(e.customAttributes) > 0 { + metric.AddCustomAttributes(s, e.customAttributes) + } + e.lock.Lock() defer e.lock.Unlock() e.Metrics = append(e.Metrics, s) @@ -98,3 +141,20 @@ func (e *Entity) SetInventoryItem(key string, field string, value interface{}) e defer e.lock.Unlock() return e.Inventory.SetItem(key, field, value) } + +// AddAttributes adds attributes to every entity metric-set. +func (e *Entity) AddAttributes(attributes ...metric.Attribute) { + for _, a := range attributes { + e.setCustomAttribute(a.Key, a.Value) + } +} + +func (e *Entity) setCustomAttribute(key string, value string) { + attribute := metric.Attribute{key, value} + e.customAttributes = append(e.customAttributes, attribute) +} + +// Key unique entity identifier within a New Relic customer account. +func (e *Entity) Key() (EntityKey, error) { + return e.Metadata.Key() +} diff --git a/vendor/github.com/newrelic/infra-integrations-sdk/integration/entity_id.go b/vendor/github.com/newrelic/infra-integrations-sdk/integration/entity_id.go new file mode 100644 index 00000000..38c423ad --- /dev/null +++ b/vendor/github.com/newrelic/infra-integrations-sdk/integration/entity_id.go @@ -0,0 +1,98 @@ +package integration + +import ( + "fmt" + "sort" + "strings" +) + +// EmptyKey empty entity key. +var EmptyKey = EntityKey("") + +// EntityKey unique identifier for an entity within a New Relic customer account. +type EntityKey string + +//IDAttributes list of identifier attributes used to provide uniqueness for an entity key. +type IDAttributes []IDAttribute + +// IDAttribute identifier attribute key-value pair. +type IDAttribute struct { + Key string + Value string +} + +// NewIDAttribute creates new identifier attribute. +func NewIDAttribute(key, value string) IDAttribute { + return IDAttribute{ + Key: key, + Value: value, + } +} + +// String stringer stuff +func (k EntityKey) String() string { + return string(k) +} + +// Key generates the entity key based on the entity metadata. +func (m *EntityMetadata) Key() (EntityKey, error) { + if len(m.Name) == 0 { + return EmptyKey, nil // Empty value means this agent's default entity identifier + } + if m.Namespace == "" { + //invalid entity: it has name, but not type. + return EmptyKey, fmt.Errorf("missing 'namespace' field for entity name '%v'", m.Name) + } + + attrsStr := "" + sort.Sort(m.IDAttrs) + m.IDAttrs.removeEmptyAndDuplicates() + for _, attr := range m.IDAttrs { + attrsStr = fmt.Sprintf("%v:%v=%v", attrsStr, attr.Key, attr.Value) + } + + return EntityKey(fmt.Sprintf("%v:%v%s", m.Namespace, m.Name, strings.ToLower(attrsStr))), nil +} + +func idAttributes(idAttrs ...IDAttribute) IDAttributes { + attrs := make(IDAttributes, len(idAttrs)) + if len(attrs) == 0 { + return attrs + } + for i, attr := range idAttrs { + attrs[i] = attr + } + + return attrs +} + +// Len is part of sort.Interface. +func (a IDAttributes) Len() int { + return len(a) +} + +// Swap is part of sort.Interface. +func (a IDAttributes) Swap(i, j int) { + a[i], a[j] = a[j], a[i] +} + +// Less is part of sort.Interface. +func (a IDAttributes) Less(i, j int) bool { + return a[i].Key < a[j].Key +} + +func (a *IDAttributes) removeEmptyAndDuplicates() { + + var uniques IDAttributes + var prev IDAttribute + for i, attr := range *a { + if prev.Key != attr.Key && attr.Key != "" { + uniques = append(uniques, attr) + } else if uniques.Len() >= 1 { + uniques[i-1].Value = attr.Value + } + prev = attr + } + + *a = uniques +} diff --git a/vendor/github.com/newrelic/infra-integrations-sdk/integration/integration.go b/vendor/github.com/newrelic/infra-integrations-sdk/integration/integration.go index 4399e976..e2aafcab 100644 --- a/vendor/github.com/newrelic/infra-integrations-sdk/integration/integration.go +++ b/vendor/github.com/newrelic/infra-integrations-sdk/integration/integration.go @@ -8,6 +8,7 @@ import ( "io" "os" "reflect" + "strings" "sync" "github.com/newrelic/infra-integrations-sdk/args" @@ -15,7 +16,23 @@ import ( "github.com/newrelic/infra-integrations-sdk/persist" ) -const protocolVersion = "2" +// Custom attribute keys: +const ( + CustomAttrPrefix = "NRI_" + CustomAttrCluster = "cluster_name" + CustomAttrService = "service_name" +) + +// Standard attributes +const ( + AttrReportingEntity = "reportingEntityKey" + AttrReportingEndpoint = "reportingEndpoint" +) + +// NR infrastructure agent protocol version +const ( + protocolVersion = "3" +) // Integration defines the format of the output JSON that integrations will return for protocol 2. type Integration struct { @@ -23,6 +40,7 @@ type Integration struct { ProtocolVersion string `json:"protocol_version"` IntegrationVersion string `json:"integration_version"` Entities []*Entity `json:"data"` + addHostnameToMeta bool locker sync.Locker storer persist.Storer prettyOutput bool @@ -69,6 +87,7 @@ func New(name, version string, opts ...Option) (i *Integration, err error) { } defaultArgs := args.GetDefaultArgs(i.args) i.prettyOutput = defaultArgs.Pretty + i.addHostnameToMeta = defaultArgs.NriAddHostname // Setting default values, if not set yet if i.logger == nil { @@ -97,28 +116,68 @@ func (i *Integration) LocalEntity() *Entity { } } - e := newLocalEntity(i.storer) + e := newLocalEntity(i.storer, i.addHostnameToMeta) i.Entities = append(i.Entities, e) return e } +// EntityReportedBy entity being reported from another entity that is not producing the actual entity data. +func (i *Integration) EntityReportedBy(reportingEntity EntityKey, reportedEntityName, reportedEntityNamespace string, idAttributes ...IDAttribute) (e *Entity, err error) { + e, err = i.Entity(reportedEntityName, reportedEntityNamespace, idAttributes...) + if err != nil { + return + } + + e.setCustomAttribute(AttrReportingEntity, reportingEntity.String()) + return +} + +// EntityReportedVia entity being reported from a known endpoint. +func (i *Integration) EntityReportedVia(endpoint, reportedEntityName, reportedEntityNamespace string, idAttributes ...IDAttribute) (e *Entity, err error) { + e, err = i.Entity(reportedEntityName, reportedEntityNamespace, idAttributes...) + if err != nil { + return + } + + e.setCustomAttribute(AttrReportingEndpoint, endpoint) + return +} + // Entity method creates or retrieves an already created entity. -func (i *Integration) Entity(name, namespace string) (e *Entity, err error) { +func (i *Integration) Entity(name, namespace string, idAttributes ...IDAttribute) (e *Entity, err error) { i.locker.Lock() defer i.locker.Unlock() - // we should change this to map for performance - for _, e = range i.Entities { - if e.Metadata != nil && e.Metadata.Name == name && e.Metadata.Namespace == namespace { - return e, nil + e, err = newEntity(name, namespace, i.storer, i.addHostnameToMeta, idAttributes...) + if err != nil { + return nil, err + } + + for _, eIt := range i.Entities { + if e.SameAs(eIt) { + return eIt, nil } } - e, err = newEntity(name, namespace, i.storer) - if err != nil { - return nil, err + defaultArgs := args.GetDefaultArgs(i.args) + + if defaultArgs.Metadata { + for _, element := range os.Environ() { + variable := strings.Split(element, "=") + prefix := fmt.Sprintf("%s%s_", CustomAttrPrefix, strings.ToUpper(i.Name)) + if strings.HasPrefix(variable[0], prefix) { + e.setCustomAttribute(strings.TrimPrefix(variable[0], prefix), variable[1]) + } + } + } + + if defaultArgs.NriCluster != "" { + e.setCustomAttribute(CustomAttrCluster, defaultArgs.NriCluster) + } + if defaultArgs.NriService != "" { + e.setCustomAttribute(CustomAttrService, defaultArgs.NriService) } i.Entities = append(i.Entities, e) @@ -141,7 +200,7 @@ func (i *Integration) Publish() error { if err != nil { return err } - + output = append(output, []byte{'\n'}...) _, err = i.writer.Write(output) defer i.Clear() diff --git a/vendor/github.com/newrelic/infra-integrations-sdk/jmx/jmx.go b/vendor/github.com/newrelic/infra-integrations-sdk/jmx/jmx.go index ccd7b1f4..6c8b80ba 100644 --- a/vendor/github.com/newrelic/infra-integrations-sdk/jmx/jmx.go +++ b/vendor/github.com/newrelic/infra-integrations-sdk/jmx/jmx.go @@ -39,25 +39,82 @@ const ( jmxLineBuffer = 4 * 1024 * 1024 // Max 4MB per line. If single lines are outputting more JSON than that, we likely need smaller-scoped JMX queries ) -func getCommand(hostname, port, username, password string) []string { - var cliCommand []string +// connectionConfig is the configuration for the nrjmx command. +type connectionConfig struct { + hostname string + port string + username string + password string + keyStore string + keyStorePassword string + trustStore string + trustStorePassword string + remote bool +} + +func (cfg *connectionConfig) isSSL() bool { + return cfg.keyStore != "" && cfg.keyStorePassword != "" && cfg.trustStore != "" && cfg.trustStorePassword != "" +} +func (cfg *connectionConfig) command() []string { + c := make([]string, 0) if os.Getenv("NR_JMX_TOOL") != "" { - cliCommand = strings.Split(os.Getenv("NR_JMX_TOOL"), " ") + c = strings.Split(os.Getenv("NR_JMX_TOOL"), " ") } else { - cliCommand = []string{jmxCommand} + c = []string{jmxCommand} + } + + c = append(c, "--hostname", cfg.hostname, "--port", cfg.port) + if cfg.username != "" && cfg.password != "" { + c = append(c, "--username", cfg.username, "--password", cfg.password) + } + if cfg.remote { + c = append(c, "--remote") + } + if cfg.isSSL() { + c = append(c, "--keyStore", cfg.keyStore, "--keyStorePassword", cfg.keyStorePassword, "--trustStore", cfg.trustStore, "--trustStorePassword", cfg.trustStorePassword) + } + + return c +} + +// Open executes a nrjmx command using the given options. +func Open(hostname, port, username, password string, opts ...Option) error { + config := &connectionConfig{ + hostname: hostname, + port: port, + username: username, + password: password, } - cliCommand = append(cliCommand, "--hostname", hostname, "--port", port) - if username != "" && password != "" { - cliCommand = append(cliCommand, "--username", username, "--password", password) + for _, opt := range opts { + opt(config) } - return cliCommand + return openConnection(config) +} + +// Option sets an option on integration level. +type Option func(config *connectionConfig) + +// WithSSL for SSL connection configuration. +func WithSSL(keyStore, keyStorePassword, trustStore, trustStorePassword string) Option { + return func(config *connectionConfig) { + config.keyStore = keyStore + config.keyStorePassword = keyStorePassword + config.trustStore = trustStore + config.trustStorePassword = trustStorePassword + } +} + +// WithRemoteProtocol uses the remote JMX protocol URL. +func WithRemoteProtocol() Option { + return func(config *connectionConfig) { + config.remote = true + } } -// Open will start the nrjmx command with the provided connection parameters. -func Open(hostname, port, username, password string) error { +func openConnection(config *connectionConfig) error { lock.Lock() defer lock.Unlock() @@ -75,7 +132,7 @@ func Open(hostname, port, username, password string) error { var err error var ctx context.Context - cliCommand := getCommand(hostname, port, username, password) + cliCommand := config.command() ctx, cancel = context.WithCancel(context.Background()) cmd = exec.CommandContext(ctx, cliCommand[0], cliCommand[1:]...) diff --git a/vendor/vendor.json b/vendor/vendor.json index 70c5e5c0..6b3ea3b4 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -51,52 +51,52 @@ "revisionTime": "2018-05-06T08:24:08Z" }, { - "checksumSHA1": "MFH1316fMarruD6ogBAi55X8o8g=", + "checksumSHA1": "ZfId5ZYn7jmL0xTxooHcxrLlXoA=", "path": "github.com/newrelic/infra-integrations-sdk/args", - "revision": "a9213176e7c44f5780696483f45c80a156433559", - "revisionTime": "2018-08-28T16:16:20Z" + "revision": "1d56d083ff58c63604755586fe1a65036a064de7", + "revisionTime": "2019-04-25T14:09:40Z" }, { "checksumSHA1": "KaZLSd/JUZz/mqfW3jFd1j2QdWA=", "path": "github.com/newrelic/infra-integrations-sdk/data/event", - "revision": "a9213176e7c44f5780696483f45c80a156433559", - "revisionTime": "2018-08-28T16:16:20Z" + "revision": "1d56d083ff58c63604755586fe1a65036a064de7", + "revisionTime": "2019-04-25T14:09:40Z" }, { "checksumSHA1": "EOoxXGqFDPkCvj7wec2S5kCXY2I=", "path": "github.com/newrelic/infra-integrations-sdk/data/inventory", - "revision": "a9213176e7c44f5780696483f45c80a156433559", - "revisionTime": "2018-08-28T16:16:20Z" + "revision": "1d56d083ff58c63604755586fe1a65036a064de7", + "revisionTime": "2019-04-25T14:09:40Z" }, { - "checksumSHA1": "HlQxfYHqVZTwZFlgMB+PML5Xfe0=", + "checksumSHA1": "B7mdbYyNHbBY4/x65J4zvUXn0Wk=", "path": "github.com/newrelic/infra-integrations-sdk/data/metric", - "revision": "a9213176e7c44f5780696483f45c80a156433559", - "revisionTime": "2018-08-28T16:16:20Z" + "revision": "1d56d083ff58c63604755586fe1a65036a064de7", + "revisionTime": "2019-04-25T14:09:40Z" }, { - "checksumSHA1": "iYqZm+K1690zh7ihc+yDbKftd9g=", + "checksumSHA1": "DdUizp/J5c7mAFZg2hoz4rRPOYM=", "path": "github.com/newrelic/infra-integrations-sdk/integration", - "revision": "a9213176e7c44f5780696483f45c80a156433559", - "revisionTime": "2018-08-28T16:16:20Z" + "revision": "1d56d083ff58c63604755586fe1a65036a064de7", + "revisionTime": "2019-04-25T14:09:40Z" }, { - "checksumSHA1": "miKNZdCc3v8MXndG2oZ+3LuCHPw=", + "checksumSHA1": "yqj7MwhatN5UNdMaCrfjemYhGMw=", "path": "github.com/newrelic/infra-integrations-sdk/jmx", - "revision": "a9213176e7c44f5780696483f45c80a156433559", - "revisionTime": "2018-08-28T16:16:20Z" + "revision": "1d56d083ff58c63604755586fe1a65036a064de7", + "revisionTime": "2019-04-25T14:09:40Z" }, { "checksumSHA1": "6/DhD+/LhC/k3NaLgkpd6iE8b5E=", "path": "github.com/newrelic/infra-integrations-sdk/log", - "revision": "a9213176e7c44f5780696483f45c80a156433559", - "revisionTime": "2018-08-28T16:16:20Z" + "revision": "1d56d083ff58c63604755586fe1a65036a064de7", + "revisionTime": "2019-04-25T14:09:40Z" }, { "checksumSHA1": "BsHp62Etporko0d+4GpVxrP2qtw=", "path": "github.com/newrelic/infra-integrations-sdk/persist", - "revision": "a9213176e7c44f5780696483f45c80a156433559", - "revisionTime": "2018-08-28T16:16:20Z" + "revision": "1d56d083ff58c63604755586fe1a65036a064de7", + "revisionTime": "2019-04-25T14:09:40Z" }, { "checksumSHA1": "WDgX011m3uQMKWf8cHb5Ndyzmj8=",