From 17f4b922f5b86fc72fdd1de42c00acc6c1305f1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Felipe=20=C3=81lvarez?= Date: Thu, 16 Jun 2022 17:36:40 +0200 Subject: [PATCH 01/12] extend JMXConnection to allow QueryMBeanNames --- src/connection/jmx.go | 6 ++++++ src/connection/mocks/jmx.go | 8 ++++++++ 2 files changed, 14 insertions(+) diff --git a/src/connection/jmx.go b/src/connection/jmx.go index 2fae7e18..8b2f048d 100644 --- a/src/connection/jmx.go +++ b/src/connection/jmx.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/newrelic/nri-kafka/src/args" "github.com/newrelic/nrjmx/gojmx" "golang.org/x/sync/semaphore" @@ -63,6 +64,7 @@ func (p *JMXProviderWithConnectionsLimit) NewConnection(config *gojmx.JMXConfig) // JMXConnection interface for JMX connection. type JMXConnection interface { QueryMBeanAttributes(mBeanNamePattern string) ([]*gojmx.AttributeResponse, error) + QueryMBeanNames(mBeanPattern string) ([]string, error) Close() error } @@ -77,6 +79,10 @@ func (j *jmxConnection) QueryMBeanAttributes(mBeanNamePattern string) ([]*gojmx. return j.Client.QueryMBeanAttributes(mBeanNamePattern) } +func (j *jmxConnection) QueryMBeanNames(mBeanPattern string) ([]string, error) { + return j.Client.QueryMBeanNames(mBeanPattern) +} + func (j *jmxConnection) Close() error { // In case of error on closing the connection, gojmx will kill the subprocess, then we do the release anyway. defer j.sem.Release(1) diff --git a/src/connection/mocks/jmx.go b/src/connection/mocks/jmx.go index accad292..bb48f4b7 100644 --- a/src/connection/mocks/jmx.go +++ b/src/connection/mocks/jmx.go @@ -19,6 +19,7 @@ type MockJMXResponse struct { type MockJMXProvider struct { Response *MockJMXResponse + Names []string MBeanNamePattern string } @@ -37,6 +38,13 @@ func (m *MockJMXProvider) QueryMBeanAttributes(mBeanNamePattern string) ([]*gojm return m.Response.Result, m.Response.Err } +func (m *MockJMXProvider) QueryMBeanNames(mBeanNamePattern string) ([]string, error) { + if m.MBeanNamePattern != "" && m.MBeanNamePattern != mBeanNamePattern { + return nil, fmt.Errorf("%w: expected bean pattern '%s' got '%s'", ErrQuery, m.MBeanNamePattern, mBeanNamePattern) + } + return m.Names, nil +} + func (m *MockJMXProvider) Close() error { return m.Response.Err } From 0eaa7d2ab660b4115f8eec9c6246c582aac0185b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Felipe=20=C3=81lvarez?= Date: Fri, 17 Jun 2022 09:49:49 +0200 Subject: [PATCH 02/12] include helpers to detect consumers and producers --- src/client/id_detection.go | 55 +++++++++++++++++++++ src/client/id_detection_test.go | 86 +++++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+) create mode 100644 src/client/id_detection.go create mode 100644 src/client/id_detection_test.go diff --git a/src/client/id_detection.go b/src/client/id_detection.go new file mode 100644 index 00000000..c700ae12 --- /dev/null +++ b/src/client/id_detection.go @@ -0,0 +1,55 @@ +package client + +import ( + "strings" + + "github.com/newrelic/nri-kafka/src/args" + "github.com/newrelic/nri-kafka/src/connection" +) + +const ( + consumerDetectionPattern = "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*" + producerDetectionPattern = "kafka.producer:type=producer-metrics,client-id=*" +) + +// idFromMBeanNameFn defines a function to extract the identifier from an MBean name. +type idFromMBeanNameFn func(string) string + +func getClientIDS(jmxInfo *args.JMXHost, mBeanPattern string, idExtractor idFromMBeanNameFn, conn connection.JMXConnection) ([]string, error) { + if jmxInfo.Name != "" { + return []string{jmxInfo.Name}, nil + } + return detectClientIDs(mBeanPattern, idExtractor, conn) +} + +func detectClientIDs(pattern string, idExtractor idFromMBeanNameFn, conn connection.JMXConnection) ([]string, error) { + mBeanNames, err := conn.QueryMBeanNames(pattern) + if err != nil { + return nil, err + } + return idsFromMBeanNames(mBeanNames, idExtractor), nil +} + +func idsFromMBeanNames(mBeanNames []string, idExtractor idFromMBeanNameFn) []string { + ids := []string{} + for _, mBeanName := range mBeanNames { + if id := idExtractor(mBeanName); id != "" { + ids = append(ids, id) + } + } + return ids +} + +// idFromMBeanWithClientIdField Gets the identifier given a type=app-info MBean name. Example: "name:type=app-info,client-id=my-id" +func idFromMBeanWithClientIdField(mBeanName string) string { + _, info, valid := strings.Cut(mBeanName, ":") + if !valid { + return "" + } + for _, field := range strings.Split(info, ",") { + if _, id, isIDField := strings.Cut(field, "client-id="); isIDField { + return id + } + } + return "" +} diff --git a/src/client/id_detection_test.go b/src/client/id_detection_test.go new file mode 100644 index 00000000..631c2f33 --- /dev/null +++ b/src/client/id_detection_test.go @@ -0,0 +1,86 @@ +package client + +import ( + "strings" + "testing" + + "github.com/newrelic/nri-kafka/src/args" + "github.com/newrelic/nri-kafka/src/connection/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestIdFromAppInfoMBean(t *testing.T) { + cases := []struct { + MBeanName string + Expected string + }{ + { + MBeanName: "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=my_consumer", + Expected: "my_consumer", + }, + { + MBeanName: "kafka.consumer:client-id=my_consumer,type=consumer-fetch-manager-metrics", + Expected: "my_consumer", + }, + { + MBeanName: "kafka.producer:type=producer-metrics,client-id=my_producer", + Expected: "my_producer", + }, + { + MBeanName: "kafka.producer:type=producer-metrics,no-id-here", + Expected: "", + }, + { + MBeanName: "id=my_consumer,type=app-info,invalid-mbean=true", + Expected: "", + }, + } + + for _, c := range cases { + t.Run(c.MBeanName, func(t *testing.T) { + assert.Equal(t, c.Expected, idFromMBeanWithClientIdField(c.MBeanName)) + }) + } +} + +func TestIdsFromMBeanNames(t *testing.T) { + mBeanNames := []string{"_id1", "_id2", "invalid_id", "_id3"} + idExtractor := func(name string) string { + if strings.HasPrefix(name, "_") { + return strings.TrimLeft(name, "_") + } + return "" + } + expected := []string{"id1", "id2", "id3"} + assert.Equal(t, expected, idsFromMBeanNames(mBeanNames, idExtractor)) +} + +func TestDetectClientIDsConnError(t *testing.T) { + pattern := "some-pattern" + conn := &mocks.MockJMXProvider{MBeanNamePattern: "other-pattern-causes-error"} + _, err := detectClientIDs(pattern, nil, conn) + assert.Error(t, err) +} + +func TestDetectClientIDs(t *testing.T) { + pattern := "pattern" + conn := &mocks.MockJMXProvider{MBeanNamePattern: pattern, Names: []string{"a", "b", "c"}} + ids, err := detectClientIDs(pattern, strings.ToUpper, conn) + require.NoError(t, err) + assert.Equal(t, []string{"A", "B", "C"}, ids) +} + +func TestGetClientIDs(t *testing.T) { + pattern := "pattern" + conn := &mocks.MockJMXProvider{MBeanNamePattern: pattern, Names: []string{"a", "b", "c"}} + + jmxInfo := &args.JMXHost{Name: "D"} + ids, err := getClientIDS(jmxInfo, pattern, strings.ToUpper, conn) + require.NoError(t, err) + assert.Equal(t, []string{"D"}, ids, "Expected only the JMXHost.Name when it is defined") + + jmxInfo = &args.JMXHost{} + ids, err = getClientIDS(jmxInfo, pattern, strings.ToUpper, conn) + assert.Equal(t, []string{"A", "B", "C"}, ids, "Detect clients should be executed when JMXHost.Name is not defined") +} From 327a947870ed380c5fbf4394892407c00fcd20c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Felipe=20=C3=81lvarez?= Date: Fri, 17 Jun 2022 14:56:07 +0200 Subject: [PATCH 03/12] add helper to make jmx connection easier --- src/connection/jmx.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/connection/jmx.go b/src/connection/jmx.go index 8b2f048d..e0af4c0a 100644 --- a/src/connection/jmx.go +++ b/src/connection/jmx.go @@ -139,6 +139,13 @@ func (cb *ConfigBuilder) WithPassword(password string) *ConfigBuilder { return cb } +// WithJMXHostSettings is a helper to set all attributes from the provided JMXHost. +func (cb *ConfigBuilder) WithJMXHostSettings(jmxInfo *args.JMXHost) *ConfigBuilder { + return cb. + WithHostname(jmxInfo.Host).WithPort(jmxInfo.Port). + WithUsername(jmxInfo.User).WithPassword(jmxInfo.Password) +} + // Build returns the jmx config. func (cb *ConfigBuilder) Build() *gojmx.JMXConfig { return cb.config From c71db9e9bad8a48826277e0164e9348d712228f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Felipe=20=C3=81lvarez?= Date: Fri, 17 Jun 2022 14:56:52 +0200 Subject: [PATCH 04/12] include support to autodetect consumer/producers names in jmx metrics collection --- src/client/producer_consumer_collection.go | 176 ++++++++++----------- src/kafka.go | 4 +- 2 files changed, 84 insertions(+), 96 deletions(-) diff --git a/src/client/producer_consumer_collection.go b/src/client/producer_consumer_collection.go index 25f5ff84..966cf5cb 100644 --- a/src/client/producer_consumer_collection.go +++ b/src/client/producer_consumer_collection.go @@ -13,13 +13,17 @@ import ( "github.com/newrelic/nri-kafka/src/metrics" ) +type CollectionWorker func(<-chan *args.JMXHost, *sync.WaitGroup, *integration.Integration, connection.JMXProvider) + +type CollectionFn func(*integration.Integration, *args.JMXHost, connection.JMXProvider) + // StartWorkerPool starts a pool of workers to handle collecting data for wither Consumer or producer entities. // The channel returned is to be closed by the user (or by feedWorkerPool) func StartWorkerPool( poolSize int, wg *sync.WaitGroup, integration *integration.Integration, - worker func(<-chan *args.JMXHost, *sync.WaitGroup, *integration.Integration, connection.JMXProvider), + worker CollectionWorker, jmxConnProvider connection.JMXProvider, ) chan *args.JMXHost { @@ -43,118 +47,102 @@ func FeedWorkerPool(jmxHostChan chan<- *args.JMXHost, jmxHosts []*args.JMXHost) } } -// ConsumerWorker collects information for consumers sent down the consumerChan -func ConsumerWorker(consumerChan <-chan *args.JMXHost, wg *sync.WaitGroup, i *integration.Integration, jmxConnProvider connection.JMXProvider) { - defer wg.Done() - - for { - jmxInfo, ok := <-consumerChan - if !ok { - return // Stop worker if consumerChan is closed +// Worker collects information as set in `collectionFn` for consumers/producers sent down to the chan. +func Worker(collectionFn CollectionFn) CollectionWorker { + return func(c <-chan *args.JMXHost, wg *sync.WaitGroup, i *integration.Integration, jmxConnProvider connection.JMXProvider) { + defer wg.Done() + for { + jmxInfo, ok := <-c + if !ok { + return // Stop worker if chan is closed + } + collectionFn(i, jmxInfo, jmxConnProvider) } + } +} +func CollectConsumerMetrics(i *integration.Integration, jmxInfo *args.JMXHost, jmxConnProvider connection.JMXProvider) { + jmxConfig := connection.NewConfigBuilder().FromArgs().WithJMXHostSettings(jmxInfo).Build() + conn, err := jmxConnProvider.NewConnection(jmxConfig) + if err != nil { + log.Error("Unable to make JMX connection for '%s:%s': %v", jmxInfo.Host, jmxInfo.Port, err) + return + } + defer conn.Close() + // Get client identifiers for all the consumers + clientIDs, err := getClientIDS(jmxInfo, consumerDetectionPattern, idFromMBeanWithClientIdField, conn) + if err != nil { + log.Error("Unable to detect consumer/producers for '%s:%s': %s", jmxInfo.Host, jmxInfo.Port, err) + return + } + for _, clientID := range clientIDs { // Create an entity for the consumer clusterIDAttr := integration.NewIDAttribute("clusterName", args.GlobalArgs.ClusterName) hostIDAttr := integration.NewIDAttribute("host", jmxInfo.Host) - consumerEntity, err := i.Entity(jmxInfo.Name, "ka-consumer", clusterIDAttr, hostIDAttr) + consumerEntity, err := i.Entity(clientID, "ka-consumer", clusterIDAttr, hostIDAttr) if err != nil { - log.Error("Unable to create entity for Consumer %s: %s", jmxInfo.Name, err.Error()) + log.Error("Unable to create entity for Consumer %s: %s", clientID, err.Error()) continue } - // Gather Metrics for consumer - if args.GlobalArgs.All() || args.GlobalArgs.Metrics { - log.Debug("Collecting metrics for consumer %s", consumerEntity.Metadata.Name) - - jmxConfig := connection.NewConfigBuilder(). - FromArgs(). - WithHostname(jmxInfo.Host).WithPort(jmxInfo.Port). - WithUsername(jmxInfo.User).WithPassword(jmxInfo.Password). - Build() - - conn, err := jmxConnProvider.NewConnection(jmxConfig) - if err != nil { - log.Error("Unable to make JMX connection for Consumer '%s': %v", consumerEntity.Metadata.Name, err) - continue - } - - // Create a sample for consumer metrics - sample := consumerEntity.NewMetricSet("KafkaConsumerSample", - attribute.Attribute{Key: "clusterName", Value: args.GlobalArgs.ClusterName}, - attribute.Attribute{Key: "displayName", Value: jmxInfo.Name}, - attribute.Attribute{Key: "entityName", Value: "consumer:" + jmxInfo.Name}, - attribute.Attribute{Key: "host", Value: jmxInfo.Host}, - ) - - // Collect the consumer metrics and populate the sample with them - log.Debug("Collecting metrics for Consumer '%s'", consumerEntity.Metadata.Name) - metrics.GetConsumerMetrics(consumerEntity.Metadata.Name, sample, conn) - - // Collect metrics that are topic-specific per Consumer - metrics.CollectTopicSubMetrics(consumerEntity, metrics.ConsumerTopicMetricDefs, metrics.ApplyConsumerTopicName, conn) - - log.Debug("Collecting metrics for consumer %s", consumerEntity.Metadata.Name) - - // Close connection and release lock so another process can make JMX Connections - conn.Close() + if !(args.GlobalArgs.All() || args.GlobalArgs.Metrics) { + continue } + // Gather Metrics for consumer + log.Debug("Collecting metrics for consumer %s", consumerEntity.Metadata.Name) + // Create a sample for consumer metrics + sample := consumerEntity.NewMetricSet("KafkaConsumerSample", + attribute.Attribute{Key: "clusterName", Value: args.GlobalArgs.ClusterName}, + attribute.Attribute{Key: "displayName", Value: clientID}, + attribute.Attribute{Key: "entityName", Value: "consumer:" + clientID}, + attribute.Attribute{Key: "host", Value: jmxInfo.Host}, + ) + // Collect the consumer metrics and populate the sample with them + log.Debug("Collecting metrics for Consumer '%s'", consumerEntity.Metadata.Name) + metrics.GetConsumerMetrics(consumerEntity.Metadata.Name, sample, conn) + // Collect metrics that are topic-specific per Consumer + metrics.CollectTopicSubMetrics(consumerEntity, metrics.ConsumerTopicMetricDefs, metrics.ApplyConsumerTopicName, conn) + log.Debug("Done collecting metrics for consumer %s", consumerEntity.Metadata.Name) } } -// ProducerWorker collect information for Producers sent down the producerChan -func ProducerWorker(producerChan <-chan *args.JMXHost, wg *sync.WaitGroup, i *integration.Integration, jmxConnProvider connection.JMXProvider) { - defer wg.Done() - for { - jmxInfo, ok := <-producerChan - if !ok { - return // Stop the worker if the channel is closed - } - +func CollectProducerMetrics(i *integration.Integration, jmxInfo *args.JMXHost, jmxConnProvider connection.JMXProvider) { + jmxConfig := connection.NewConfigBuilder().FromArgs().WithJMXHostSettings(jmxInfo).Build() + conn, err := jmxConnProvider.NewConnection(jmxConfig) + if err != nil { + log.Error("Unable to make JMX connection for '%s:%s': %v", jmxInfo.Host, jmxInfo.Port, err) + return + } + defer conn.Close() + // Get client identifiers for all the producers + clientIDs, err := getClientIDS(jmxInfo, producerDetectionPattern, idFromMBeanWithClientIdField, conn) + if err != nil { + log.Error("Unable to detect producers for '%s:%s': %s", jmxInfo.Host, jmxInfo.Port, err) + return + } + for _, clientID := range clientIDs { // Create the producer entity clusterIDAttr := integration.NewIDAttribute("clusterName", args.GlobalArgs.ClusterName) hostIDAttr := integration.NewIDAttribute("host", jmxInfo.Host) - producerEntity, err := i.Entity(jmxInfo.Name, "ka-producer", clusterIDAttr, hostIDAttr) + producerEntity, err := i.Entity(clientID, "ka-producer", clusterIDAttr, hostIDAttr) if err != nil { - log.Error("Unable to create entity for Producer %s: %s", jmxInfo.Name, err.Error()) + log.Error("Unable to create entity for Producer %s: %s", clientID, err.Error()) continue } - - // Gather Metrics for producer - if args.GlobalArgs.All() || args.GlobalArgs.Metrics { - log.Debug("Collecting metrics for producer %s", producerEntity.Metadata.Name) - - // Open a JMX connection to the producer - jmxConfig := connection.NewConfigBuilder(). - FromArgs(). - WithHostname(jmxInfo.Host).WithPort(jmxInfo.Port). - WithUsername(jmxInfo.User).WithPassword(jmxInfo.Password). - Build() - - conn, err := jmxConnProvider.NewConnection(jmxConfig) - if err != nil { - log.Error("Unable to make JMX connection for Producer '%s': %v", producerEntity.Metadata.Name, err) - continue - } - - // Create a metric set for the producer - sample := producerEntity.NewMetricSet("KafkaProducerSample", - attribute.Attribute{Key: "clusterName", Value: args.GlobalArgs.ClusterName}, - attribute.Attribute{Key: "displayName", Value: jmxInfo.Name}, - attribute.Attribute{Key: "entityName", Value: "producer:" + jmxInfo.Name}, - attribute.Attribute{Key: "host", Value: jmxInfo.Host}, - ) - - // Collect producer metrics and populate the metric set with them - log.Debug("Collecting metrics for Producer '%s'", producerEntity.Metadata.Name) - metrics.GetProducerMetrics(producerEntity.Metadata.Name, sample, conn) - - // Collect metrics that are topic specific per Producer - metrics.CollectTopicSubMetrics(producerEntity, metrics.ProducerTopicMetricDefs, metrics.ApplyProducerTopicName, conn) - - log.Debug("Done Collecting metrics for producer %s", producerEntity.Metadata.Name) - - // Close connection and release lock so another process can make JMX Connections - conn.Close() + if !(args.GlobalArgs.All() || args.GlobalArgs.Metrics) { + continue } + sample := producerEntity.NewMetricSet("KafkaProducerSample", + attribute.Attribute{Key: "clusterName", Value: args.GlobalArgs.ClusterName}, + attribute.Attribute{Key: "displayName", Value: clientID}, + attribute.Attribute{Key: "entityName", Value: "producer:" + clientID}, + attribute.Attribute{Key: "host", Value: jmxInfo.Host}, + ) + // Collect producer metrics and populate the metric set with them + log.Debug("Collecting metrics for Producer '%s'", producerEntity.Metadata.Name) + metrics.GetProducerMetrics(producerEntity.Metadata.Name, sample, conn) + // Collect metrics that are topic specific per Producer + metrics.CollectTopicSubMetrics(producerEntity, metrics.ProducerTopicMetricDefs, metrics.ApplyProducerTopicName, conn) + log.Debug("Done Collecting metrics for producer %s", producerEntity.Metadata.Name) } } diff --git a/src/kafka.go b/src/kafka.go index 859f0d85..8ecd70e4 100644 --- a/src/kafka.go +++ b/src/kafka.go @@ -233,8 +233,8 @@ func coreCollection(kafkaIntegration *integration.Integration, jmxConnProvider c go broker.FeedBrokerPool(brokers, brokerChan) } - consumerChan := client.StartWorkerPool(3, &wg, kafkaIntegration, client.ConsumerWorker, jmxConnProvider) - producerChan := client.StartWorkerPool(3, &wg, kafkaIntegration, client.ProducerWorker, jmxConnProvider) + consumerChan := client.StartWorkerPool(3, &wg, kafkaIntegration, client.Worker(client.CollectConsumerMetrics), jmxConnProvider) + producerChan := client.StartWorkerPool(3, &wg, kafkaIntegration, client.Worker(client.CollectProducerMetrics), jmxConnProvider) go client.FeedWorkerPool(consumerChan, args.GlobalArgs.Consumers) go client.FeedWorkerPool(producerChan, args.GlobalArgs.Producers) From 4e467ca12b25fab2b3a04725508b9e1e3c0327f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Felipe=20=C3=81lvarez?= Date: Mon, 20 Jun 2022 08:40:16 +0200 Subject: [PATCH 05/12] extend producer_consumer_collection unittests --- .../producer_consumer_collection_test.go | 86 +++++++++++++++++-- 1 file changed, 81 insertions(+), 5 deletions(-) diff --git a/src/client/producer_consumer_collection_test.go b/src/client/producer_consumer_collection_test.go index f405d0db..4bd992f2 100644 --- a/src/client/producer_consumer_collection_test.go +++ b/src/client/producer_consumer_collection_test.go @@ -8,6 +8,8 @@ import ( "github.com/newrelic/nri-kafka/src/connection/mocks" "github.com/newrelic/nrjmx/gojmx" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/newrelic/infra-integrations-sdk/integration" "github.com/newrelic/nri-kafka/src/args" @@ -25,7 +27,7 @@ func TestStartWorkerPool(t *testing.T) { t.Error(err) } - consumerHosts := StartWorkerPool(3, &wg, i, ConsumerWorker, nil) + consumerHosts := StartWorkerPool(3, &wg, i, Worker(CollectConsumerMetrics), nil) c := make(chan int) go func() { @@ -83,7 +85,8 @@ func TestConsumerWorker(t *testing.T) { } wg.Add(1) - go ConsumerWorker(consumerChan, &wg, i, mockJMXProvider) + consumerWorker := Worker(CollectConsumerMetrics) + go consumerWorker(consumerChan, &wg, i, mockJMXProvider) newJmx := &args.JMXHost{ Name: "test", @@ -114,7 +117,8 @@ func TestConsumerWorker_JmxOpenFuncErr(t *testing.T) { testutils.SetupTestArgs() wg.Add(1) - go ConsumerWorker(consumerChan, &wg, i, mockJMXProvider) + consumerWorker := Worker(CollectConsumerMetrics) + go consumerWorker(consumerChan, &wg, i, mockJMXProvider) newJmx := &args.JMXHost{ Name: "test", @@ -136,7 +140,8 @@ func TestProducerWorker(t *testing.T) { testutils.SetupTestArgs() wg.Add(1) - go ProducerWorker(producerChan, &wg, i, mocks.NewEmptyMockJMXProvider()) + producerWorker := Worker(CollectProducerMetrics) + go producerWorker(producerChan, &wg, i, mocks.NewEmptyMockJMXProvider()) newJmx := &args.JMXHost{ Name: "test", @@ -167,7 +172,8 @@ func TestProducerWorker_JmxOpenFuncErr(t *testing.T) { testutils.SetupTestArgs() wg.Add(1) - go ProducerWorker(producerChan, &wg, i, mockJMXProvider) + producerWorker := Worker(CollectProducerMetrics) + go producerWorker(producerChan, &wg, i, mockJMXProvider) newJmx := &args.JMXHost{ Name: "test", @@ -177,3 +183,73 @@ func TestProducerWorker_JmxOpenFuncErr(t *testing.T) { wg.Wait() } + +func TestCollectConsumerMetricsNameProvided(t *testing.T) { + i, err := integration.New(t.Name(), "1.0.0") + require.NoError(t, err) + connProvider := mocks.NewEmptyMockJMXProvider() + connProvider.Names = []string{ + "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1", + "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-2", + } + clientID := "consumer-1" + jmxInfo := &args.JMXHost{Name: clientID} + testutils.SetupTestArgs() + CollectConsumerMetrics(i, jmxInfo, connProvider) + require.Len(t, i.Entities, 1, "only metrics from the provided consumer should be fetched") + assert.Equal(t, clientID, i.Entities[0].Metadata.Name) +} + +func TestCollectProducerMetricsCreateEntity(t *testing.T) { + i, err := integration.New(t.Name(), "1.0.0") + require.NoError(t, err) + connProvider := mocks.NewEmptyMockJMXProvider() + connProvider.Names = []string{ + "kafka.producer:type=producer-metrics,client-id=producer-1", + "kafka.producer:type=producer-metrics,client-id=producer-2", + } + clientID := "producer-1" + jmxInfo := &args.JMXHost{Name: clientID} + testutils.SetupTestArgs() + CollectProducerMetrics(i, jmxInfo, connProvider) + require.Len(t, i.Entities, 1, "only metrics from the provided producer should be fetched") + assert.Equal(t, clientID, i.Entities[0].Metadata.Name) +} + +func TestCollectConsumerMetricsNoNameProvided(t *testing.T) { + i, err := integration.New(t.Name(), "1.0.0") + require.NoError(t, err) + connProvider := mocks.NewEmptyMockJMXProvider() + connProvider.Names = []string{ + "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1", + "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-2", + } + jmxInfo := &args.JMXHost{} + testutils.SetupTestArgs() + CollectConsumerMetrics(i, jmxInfo, connProvider) + require.Len(t, i.Entities, 2) + var entityNames []string + for _, entity := range i.Entities { + entityNames = append(entityNames, entity.Metadata.Name) + } + assert.ElementsMatch(t, []string{"consumer-1", "consumer-2"}, entityNames) +} + +func TestCollectProducerMetricsNoNameProvided(t *testing.T) { + i, err := integration.New(t.Name(), "1.0.0") + require.NoError(t, err) + connProvider := mocks.NewEmptyMockJMXProvider() + connProvider.Names = []string{ + "kafka.producer:type=producer-metrics,client-id=producer-1", + "kafka.producer:type=producer-metrics,client-id=producer-2", + } + jmxInfo := &args.JMXHost{} + testutils.SetupTestArgs() + CollectProducerMetrics(i, jmxInfo, connProvider) + require.Len(t, i.Entities, 2) + var entityNames []string + for _, entity := range i.Entities { + entityNames = append(entityNames, entity.Metadata.Name) + } + assert.ElementsMatch(t, []string{"producer-1", "producer-2"}, entityNames) +} From 93c5b4fd39fc27e7274e2c8204317204d7736eec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Felipe=20=C3=81lvarez?= Date: Mon, 20 Jun 2022 08:46:37 +0200 Subject: [PATCH 06/12] use constants to set number of workers --- src/kafka.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/kafka.go b/src/kafka.go index 8ecd70e4..fa962670 100644 --- a/src/kafka.go +++ b/src/kafka.go @@ -30,6 +30,9 @@ const ( discoverBootstrap = "bootstrap" discoverZookeeper = "zookeeper" topicSourceZookeeper = "zookeeper" + + numberOfConsumerWorkers = 3 + numberOfProducerWorkers = 3 ) var ( @@ -233,8 +236,8 @@ func coreCollection(kafkaIntegration *integration.Integration, jmxConnProvider c go broker.FeedBrokerPool(brokers, brokerChan) } - consumerChan := client.StartWorkerPool(3, &wg, kafkaIntegration, client.Worker(client.CollectConsumerMetrics), jmxConnProvider) - producerChan := client.StartWorkerPool(3, &wg, kafkaIntegration, client.Worker(client.CollectProducerMetrics), jmxConnProvider) + consumerChan := client.StartWorkerPool(numberOfConsumerWorkers, &wg, kafkaIntegration, client.Worker(client.CollectConsumerMetrics), jmxConnProvider) + producerChan := client.StartWorkerPool(numberOfProducerWorkers, &wg, kafkaIntegration, client.Worker(client.CollectProducerMetrics), jmxConnProvider) go client.FeedWorkerPool(consumerChan, args.GlobalArgs.Consumers) go client.FeedWorkerPool(producerChan, args.GlobalArgs.Producers) From f52840d5cdaf46aedbb316d174d80454ea39b183 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Felipe=20=C3=81lvarez?= Date: Mon, 20 Jun 2022 09:39:31 +0200 Subject: [PATCH 07/12] update config to make consumers/producers name optional --- src/args/parsed_args.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/args/parsed_args.go b/src/args/parsed_args.go index 2b490132..942582db 100644 --- a/src/args/parsed_args.go +++ b/src/args/parsed_args.go @@ -322,9 +322,6 @@ func unmarshalJMXHosts(data []byte, a *ArgumentList) ([]*JMXHost, error) { // Set default values for _, p := range v { - if p.Name == "" { - return nil, errors.New("must specify a name for each producer in the list") - } if p.User == "" { p.User = a.DefaultJMXUser } From 1e89e11716a6feb5726831a1696e3da5147735d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Felipe=20=C3=81lvarez?= Date: Mon, 20 Jun 2022 09:40:47 +0200 Subject: [PATCH 08/12] update integration test to check metrics are gathered when name is omited chore: lint --- src/client/id_detection.go | 4 +- src/client/id_detection_test.go | 4 +- src/client/producer_consumer_collection.go | 4 +- .../producer_consumer_collection_test.go | 132 +++++++++--------- tests/integration/kafka_test.go | 2 +- 5 files changed, 72 insertions(+), 74 deletions(-) diff --git a/src/client/id_detection.go b/src/client/id_detection.go index c700ae12..1cbf6d26 100644 --- a/src/client/id_detection.go +++ b/src/client/id_detection.go @@ -40,8 +40,8 @@ func idsFromMBeanNames(mBeanNames []string, idExtractor idFromMBeanNameFn) []str return ids } -// idFromMBeanWithClientIdField Gets the identifier given a type=app-info MBean name. Example: "name:type=app-info,client-id=my-id" -func idFromMBeanWithClientIdField(mBeanName string) string { +// idFromMBeanWithClientIDField Gets the identifier given a type=app-info MBean name. Example: "name:type=app-info,client-id=my-id" +func idFromMBeanWithClientIDField(mBeanName string) string { _, info, valid := strings.Cut(mBeanName, ":") if !valid { return "" diff --git a/src/client/id_detection_test.go b/src/client/id_detection_test.go index 631c2f33..d82e8e51 100644 --- a/src/client/id_detection_test.go +++ b/src/client/id_detection_test.go @@ -39,7 +39,7 @@ func TestIdFromAppInfoMBean(t *testing.T) { for _, c := range cases { t.Run(c.MBeanName, func(t *testing.T) { - assert.Equal(t, c.Expected, idFromMBeanWithClientIdField(c.MBeanName)) + assert.Equal(t, c.Expected, idFromMBeanWithClientIDField(c.MBeanName)) }) } } @@ -81,6 +81,6 @@ func TestGetClientIDs(t *testing.T) { assert.Equal(t, []string{"D"}, ids, "Expected only the JMXHost.Name when it is defined") jmxInfo = &args.JMXHost{} - ids, err = getClientIDS(jmxInfo, pattern, strings.ToUpper, conn) + ids, _ = getClientIDS(jmxInfo, pattern, strings.ToUpper, conn) assert.Equal(t, []string{"A", "B", "C"}, ids, "Detect clients should be executed when JMXHost.Name is not defined") } diff --git a/src/client/producer_consumer_collection.go b/src/client/producer_consumer_collection.go index 966cf5cb..466ce704 100644 --- a/src/client/producer_consumer_collection.go +++ b/src/client/producer_consumer_collection.go @@ -70,7 +70,7 @@ func CollectConsumerMetrics(i *integration.Integration, jmxInfo *args.JMXHost, j } defer conn.Close() // Get client identifiers for all the consumers - clientIDs, err := getClientIDS(jmxInfo, consumerDetectionPattern, idFromMBeanWithClientIdField, conn) + clientIDs, err := getClientIDS(jmxInfo, consumerDetectionPattern, idFromMBeanWithClientIDField, conn) if err != nil { log.Error("Unable to detect consumer/producers for '%s:%s': %s", jmxInfo.Host, jmxInfo.Port, err) return @@ -115,7 +115,7 @@ func CollectProducerMetrics(i *integration.Integration, jmxInfo *args.JMXHost, j } defer conn.Close() // Get client identifiers for all the producers - clientIDs, err := getClientIDS(jmxInfo, producerDetectionPattern, idFromMBeanWithClientIdField, conn) + clientIDs, err := getClientIDS(jmxInfo, producerDetectionPattern, idFromMBeanWithClientIDField, conn) if err != nil { log.Error("Unable to detect producers for '%s:%s': %s", jmxInfo.Host, jmxInfo.Port, err) return diff --git a/src/client/producer_consumer_collection_test.go b/src/client/producer_consumer_collection_test.go index 4bd992f2..b9d69b93 100644 --- a/src/client/producer_consumer_collection_test.go +++ b/src/client/producer_consumer_collection_test.go @@ -184,72 +184,70 @@ func TestProducerWorker_JmxOpenFuncErr(t *testing.T) { wg.Wait() } -func TestCollectConsumerMetricsNameProvided(t *testing.T) { - i, err := integration.New(t.Name(), "1.0.0") - require.NoError(t, err) - connProvider := mocks.NewEmptyMockJMXProvider() - connProvider.Names = []string{ - "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1", - "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-2", - } - clientID := "consumer-1" - jmxInfo := &args.JMXHost{Name: clientID} - testutils.SetupTestArgs() - CollectConsumerMetrics(i, jmxInfo, connProvider) - require.Len(t, i.Entities, 1, "only metrics from the provided consumer should be fetched") - assert.Equal(t, clientID, i.Entities[0].Metadata.Name) -} - -func TestCollectProducerMetricsCreateEntity(t *testing.T) { - i, err := integration.New(t.Name(), "1.0.0") - require.NoError(t, err) - connProvider := mocks.NewEmptyMockJMXProvider() - connProvider.Names = []string{ - "kafka.producer:type=producer-metrics,client-id=producer-1", - "kafka.producer:type=producer-metrics,client-id=producer-2", - } - clientID := "producer-1" - jmxInfo := &args.JMXHost{Name: clientID} - testutils.SetupTestArgs() - CollectProducerMetrics(i, jmxInfo, connProvider) - require.Len(t, i.Entities, 1, "only metrics from the provided producer should be fetched") - assert.Equal(t, clientID, i.Entities[0].Metadata.Name) -} - -func TestCollectConsumerMetricsNoNameProvided(t *testing.T) { - i, err := integration.New(t.Name(), "1.0.0") - require.NoError(t, err) - connProvider := mocks.NewEmptyMockJMXProvider() - connProvider.Names = []string{ - "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1", - "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-2", - } - jmxInfo := &args.JMXHost{} - testutils.SetupTestArgs() - CollectConsumerMetrics(i, jmxInfo, connProvider) - require.Len(t, i.Entities, 2) - var entityNames []string - for _, entity := range i.Entities { - entityNames = append(entityNames, entity.Metadata.Name) - } - assert.ElementsMatch(t, []string{"consumer-1", "consumer-2"}, entityNames) -} - -func TestCollectProducerMetricsNoNameProvided(t *testing.T) { - i, err := integration.New(t.Name(), "1.0.0") - require.NoError(t, err) - connProvider := mocks.NewEmptyMockJMXProvider() - connProvider.Names = []string{ - "kafka.producer:type=producer-metrics,client-id=producer-1", - "kafka.producer:type=producer-metrics,client-id=producer-2", - } - jmxInfo := &args.JMXHost{} - testutils.SetupTestArgs() - CollectProducerMetrics(i, jmxInfo, connProvider) - require.Len(t, i.Entities, 2) - var entityNames []string - for _, entity := range i.Entities { - entityNames = append(entityNames, entity.Metadata.Name) +func TestProducerConsumerEntitiesCreation(t *testing.T) { + cases := []struct { + Name string + JMXNames []string + JMXInfo *args.JMXHost + CollectionFn CollectionFn + ExpectedEntityNames []string + }{ + { + Name: "Collect consumer metrics with name provided", + JMXNames: []string{ + "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1", + "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-2", + }, + JMXInfo: &args.JMXHost{Name: "consumer-1"}, + CollectionFn: CollectConsumerMetrics, + ExpectedEntityNames: []string{"consumer-1"}, + }, + { + Name: "Collect consumer metrics with no name provided", + JMXNames: []string{ + "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-1", + "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-2", + }, + JMXInfo: &args.JMXHost{}, + CollectionFn: CollectConsumerMetrics, + ExpectedEntityNames: []string{"consumer-1", "consumer-2"}, + }, + { + Name: "Collect producer metrics with name provided", + JMXNames: []string{ + "kafka.producer:type=producer-metrics,client-id=producer-1", + "kafka.producer:type=producer-metrics,client-id=producer-2", + }, + JMXInfo: &args.JMXHost{Name: "producer-1"}, + CollectionFn: CollectProducerMetrics, + ExpectedEntityNames: []string{"producer-1"}, + }, + { + Name: "Collect producer metrics with no name provided", + JMXNames: []string{ + "kafka.producer:type=producer-metrics,client-id=producer-1", + "kafka.producer:type=producer-metrics,client-id=producer-2", + }, + JMXInfo: &args.JMXHost{}, + CollectionFn: CollectProducerMetrics, + ExpectedEntityNames: []string{"producer-1", "producer-2"}, + }, + } + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + // setup integration + i, err := integration.New(c.Name, "1.0.0") + require.NoError(t, err) + testutils.SetupTestArgs() + connProvider := mocks.NewEmptyMockJMXProvider() + connProvider.Names = c.JMXNames + // run collection + c.CollectionFn(i, c.JMXInfo, connProvider) + var entityNames []string + for _, entity := range i.Entities { + entityNames = append(entityNames, entity.Metadata.Name) + } + assert.ElementsMatch(t, c.ExpectedEntityNames, entityNames) + }) } - assert.ElementsMatch(t, []string{"producer-1", "producer-2"}, entityNames) } diff --git a/tests/integration/kafka_test.go b/tests/integration/kafka_test.go index 31f48d6b..ebee1dfe 100644 --- a/tests/integration/kafka_test.go +++ b/tests/integration/kafka_test.go @@ -367,7 +367,7 @@ func TestKafkaIntegration_consumer_test(t *testing.T) { consumerConfig := func(command []string) []string { return append( command, - "--consumers", "[{\"name\": \"kafka_dummy_consumer\", \"host\": \"kafka_dummy_consumer\", \"port\": 1087},{\"name\": \"kafka_dummy_consumer2\", \"host\": \"kafka_dummy_consumer2\", \"port\": 1088}]", + "--consumers", "[{\"host\": \"kafka_dummy_consumer\", \"port\": 1087},{\"name\": \"kafka_dummy_consumer2\", \"host\": \"kafka_dummy_consumer2\", \"port\": 1088}]", ) } From bd0b261236367749347507d31b8ab4cfc89751c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Felipe=20=C3=81lvarez?= Date: Mon, 27 Jun 2022 09:48:19 +0200 Subject: [PATCH 09/12] feat: add support to default alias in producers/consumers configuration --- kafka-config.yml.sample | 9 ++-- src/args/args_test.go | 91 +++++++++++++++++++++++++++++++++++++++++ src/args/parsed_args.go | 7 +++- 3 files changed, 102 insertions(+), 5 deletions(-) diff --git a/kafka-config.yml.sample b/kafka-config.yml.sample index b0fa36cd..42904313 100644 --- a/kafka-config.yml.sample +++ b/kafka-config.yml.sample @@ -165,8 +165,8 @@ integrations: # In order to collect Java producer and consumer metrics the "producers" and "consumers" fields should be filled out. # Both fields are JSON arrays with each entry being a separate JAVA producer or consumer, in it's respective field. # Each entry should have the following fields: - # - name: This is the client.id of the producer/consumer as it appears in Kafka. - # - host: The IP or Hostname of the producer/consumser. If omitted, will use the value of the "default_jmx_host" field + # - name: This is the client.id of the producer/consumer as it appears in Kafka. If omitted, metrics from all clients in the JMX host:port will be reported. + # - host: The IP or Hostname of the producer/consumer. If omitted, will use the value of the "default_jmx_host" field # - port: The port in which JMX is setup for on the producer/consumer. If omitted will, use the value of the "default_jmx_port" field # - username: The username used to connect to JMX. If omitted, will use the value of the "default_jmx_user" field # - password: The password used to connect to JMX. If omitted, will use the value of the "default_jmx_password" field @@ -175,7 +175,10 @@ integrations: CONSUMERS: '[{"name": "myConsumer", "host": "localhost", "port": 24, "username": "me", "password": "secret"}]' # If several producers/consumers are on the same host an agent can be installed on that host and the # "default_jmx_host" and "default_jmx_port" field can be set once and used for all producers/consumers that - # do not have the "host" or "port" field repsectively. + # do not have the "host" or "port" field respectively. + # When defaults are set it is also possible to use the string "default" to gather metrics from all producers / + # consumers in the "default_jmx_host:default_jmx_port". Example: + # PRODUCERS: default # These fields can be removed if each producer/consumer has it's own "host" and/or "port" field filled out. DEFAULT_JMX_HOST: "localhost" DEFAULT_JMX_PORT: "9999" diff --git a/src/args/args_test.go b/src/args/args_test.go index 0f43a3ee..1a3c0cb3 100644 --- a/src/args/args_test.go +++ b/src/args/args_test.go @@ -9,6 +9,7 @@ import ( sdkArgs "github.com/newrelic/infra-integrations-sdk/args" "github.com/newrelic/infra-integrations-sdk/integration" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestParseArgs(t *testing.T) { @@ -217,3 +218,93 @@ func Test_unmarshalConsumerGroups_NoTopics(t *testing.T) { t.Error("Expected error") } } + +func TestUnMarshalJMXHosts(t *testing.T) { + arguments := &ArgumentList{ + DefaultJMXUser: "user", + DefaultJMXPassword: "password", + DefaultJMXPort: 42, + DefaultJMXHost: "host", + } + cases := []struct { + Name string + Input string + Expected []*JMXHost + }{ + { + Name: "Empty", + Input: `[]`, + Expected: []*JMXHost{}, + }, + { + Name: "Default with no alias", + Input: `[{}]`, + Expected: []*JMXHost{ + { + User: arguments.DefaultJMXUser, + Password: arguments.DefaultJMXPassword, + Port: arguments.DefaultJMXPort, + Host: arguments.DefaultJMXHost, + }, + }, + }, + { + Name: "Default with alias", + Input: `default`, + Expected: []*JMXHost{ + { + User: arguments.DefaultJMXUser, + Password: arguments.DefaultJMXPassword, + Port: arguments.DefaultJMXPort, + Host: arguments.DefaultJMXHost, + }, + }, + }, + { + Name: "Only name set", + Input: `[{"name": "client.id"}]`, + Expected: []*JMXHost{ + { + Name: "client.id", + User: arguments.DefaultJMXUser, + Password: arguments.DefaultJMXPassword, + Port: arguments.DefaultJMXPort, + Host: arguments.DefaultJMXHost, + }, + }, + }, + { + Name: "No name set", + Input: `[{"user": "my.user", "password": "my.pass", "port": 1088, "host": "localhost"}]`, + Expected: []*JMXHost{ + { + User: "my.user", + Password: "my.pass", + Port: 1088, + Host: "localhost", + }, + }, + }, + { + Name: "All values set", + Input: `[{"name": "my.name", "user": "my.user", "password": "my.pass", "port": 1088, "host": "localhost"}]`, + Expected: []*JMXHost{ + { + Name: "my.name", + User: "my.user", + Password: "my.pass", + Port: 1088, + Host: "localhost", + }, + }, + }, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + parsed, err := unmarshalJMXHosts([]byte(c.Input), arguments) + require.NoError(t, err) + require.Equal(t, c.Expected, parsed) + }) + } +} diff --git a/src/args/parsed_args.go b/src/args/parsed_args.go index 942582db..f087dbf3 100644 --- a/src/args/parsed_args.go +++ b/src/args/parsed_args.go @@ -19,6 +19,8 @@ var GlobalArgs *ParsedArguments const ( defaultZookeeperPort = 2181 defaultJMXPort = 9999 + + jmxHostDefaultAlias = "default" ) // ParsedArguments is an special version of the config arguments that has advanced parsing @@ -313,10 +315,11 @@ func ParseArgs(a ArgumentList) (*ParsedArguments, error) { // unmarshalJMXHosts parses the user-provided JSON map for a producer // or consumers into a jmxHost structs and sets default values func unmarshalJMXHosts(data []byte, a *ArgumentList) ([]*JMXHost, error) { - // Parse the producer or consumer var v []*JMXHost - if err := json.Unmarshal([]byte(data), &v); err != nil { + if string(data) == jmxHostDefaultAlias { + v = []*JMXHost{{}} + } else if err := json.Unmarshal(data, &v); err != nil { return nil, err } From a4f33bf144032481298c9d7d607d6156b824af17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Felipe=20=C3=81lvarez?= Date: Tue, 28 Jun 2022 17:01:38 +0200 Subject: [PATCH 10/12] feat: use both app-info and metrics patterns as falback to detect client ids --- src/client/id_detection.go | 65 +++++++++++++++++++-- src/client/id_detection_test.go | 68 +++++++++++++++++++++- src/client/producer_consumer_collection.go | 4 +- 3 files changed, 127 insertions(+), 10 deletions(-) diff --git a/src/client/id_detection.go b/src/client/id_detection.go index 1cbf6d26..3bd2be5f 100644 --- a/src/client/id_detection.go +++ b/src/client/id_detection.go @@ -8,18 +8,62 @@ import ( ) const ( - consumerDetectionPattern = "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*" - producerDetectionPattern = "kafka.producer:type=producer-metrics,client-id=*" + consumerAppInfoPattern = "kafka.consumer:type=app-info,id=*" + consumerMetricsPattern = "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*" + + producerAppInfoPattern = "kafka.producer:type=app-info,id=*" + producerMetricsPattern = "kafka.producer:type=producer-metrics,client-id=*" ) // idFromMBeanNameFn defines a function to extract the identifier from an MBean name. type idFromMBeanNameFn func(string) string -func getClientIDS(jmxInfo *args.JMXHost, mBeanPattern string, idExtractor idFromMBeanNameFn, conn connection.JMXConnection) ([]string, error) { +type clientIDExtractInfo struct { + pattern string + extractor idFromMBeanNameFn +} + +func detectConsumerIDs(jmxInfo *args.JMXHost, conn connection.JMXConnection) ([]string, error) { + // consumerAppInfoPatter is defined to detect consumer clientIDs, in case it is not found it will use + // consumerMetricsPattern which is also being used to fetch consumer metrics. + return getClientIDS( + []clientIDExtractInfo{ + {pattern: consumerAppInfoPattern, extractor: idFromAppInfo}, + {pattern: consumerMetricsPattern, extractor: idFromMBeanWithClientIDField}, + }, + jmxInfo, + conn, + ) +} + +func detectProducerIDs(jmxInfo *args.JMXHost, conn connection.JMXConnection) ([]string, error) { + // producerAppInfoPatter is defined to detect consumer clientIDs, in case it is not found it will use + // ProducerMetricsPattern which is also being used to fetch consumer metrics. + return getClientIDS( + []clientIDExtractInfo{ + {pattern: producerAppInfoPattern, extractor: idFromAppInfo}, + {pattern: producerMetricsPattern, extractor: idFromMBeanWithClientIDField}, + }, + jmxInfo, + conn, + ) +} + +// getClientIDs tries to obtain clientIDs from JMX connection using each extractInfo entry until it success or items are over. +// this allows implementing a primary extractor and one or many fallbacks. +func getClientIDS(extractInfo []clientIDExtractInfo, jmxInfo *args.JMXHost, conn connection.JMXConnection) ([]string, error) { if jmxInfo.Name != "" { return []string{jmxInfo.Name}, nil } - return detectClientIDs(mBeanPattern, idExtractor, conn) + var err error + var names []string + for _, info := range extractInfo { + names, err = detectClientIDs(info.pattern, info.extractor, conn) + if err == nil && len(names) > 0 { + return names, nil + } + } + return names, err } func detectClientIDs(pattern string, idExtractor idFromMBeanNameFn, conn connection.JMXConnection) ([]string, error) { @@ -40,14 +84,23 @@ func idsFromMBeanNames(mBeanNames []string, idExtractor idFromMBeanNameFn) []str return ids } -// idFromMBeanWithClientIDField Gets the identifier given a type=app-info MBean name. Example: "name:type=app-info,client-id=my-id" +// idFromAppInfo Gets the identifier given a type=app-info MBean name. Example "name:type=app-info,id=my-id" +func idFromAppInfo(mBeanName string) string { + return idFromMBean(mBeanName, "id") +} + +// idFromMBeanWithClientIDField Gets the identifier given a MBean name including the client-id field. Example: "name:type=producer-metrics,client-id=my-id" func idFromMBeanWithClientIDField(mBeanName string) string { + return idFromMBean(mBeanName, "client-id") +} + +func idFromMBean(mBeanName string, idField string) string { _, info, valid := strings.Cut(mBeanName, ":") if !valid { return "" } for _, field := range strings.Split(info, ",") { - if _, id, isIDField := strings.Cut(field, "client-id="); isIDField { + if _, id, isIDField := strings.Cut(field, idField+"="); isIDField { return id } } diff --git a/src/client/id_detection_test.go b/src/client/id_detection_test.go index d82e8e51..9335a3f0 100644 --- a/src/client/id_detection_test.go +++ b/src/client/id_detection_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/newrelic/nri-kafka/src/args" + "github.com/newrelic/nri-kafka/src/connection" "github.com/newrelic/nri-kafka/src/connection/mocks" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -76,11 +77,74 @@ func TestGetClientIDs(t *testing.T) { conn := &mocks.MockJMXProvider{MBeanNamePattern: pattern, Names: []string{"a", "b", "c"}} jmxInfo := &args.JMXHost{Name: "D"} - ids, err := getClientIDS(jmxInfo, pattern, strings.ToUpper, conn) + ids, err := getClientIDS(nil, jmxInfo, conn) require.NoError(t, err) assert.Equal(t, []string{"D"}, ids, "Expected only the JMXHost.Name when it is defined") jmxInfo = &args.JMXHost{} - ids, _ = getClientIDS(jmxInfo, pattern, strings.ToUpper, conn) + ids, _ = getClientIDS([]clientIDExtractInfo{{pattern: pattern, extractor: strings.ToUpper}}, jmxInfo, conn) assert.Equal(t, []string{"A", "B", "C"}, ids, "Detect clients should be executed when JMXHost.Name is not defined") + + ids, _ = getClientIDS( + []clientIDExtractInfo{ + {pattern: pattern, extractor: func(string) string { return "" }}, + {pattern: pattern, extractor: strings.ToUpper}, + }, + jmxInfo, conn, + ) + assert.Equal(t, []string{"A", "B", "C"}, ids, "Fallback should be called when first extract info does not work") +} + +func TestDetectConsumerAndProducerIDs(t *testing.T) { + cases := []struct { + Name string + Conn *mocks.MockJMXProvider + Detector func(jmxInfo *args.JMXHost, conn connection.JMXConnection) ([]string, error) + Expected []string + }{ + { + Name: "consumer app-info pattern", + Conn: &mocks.MockJMXProvider{ + MBeanNamePattern: consumerAppInfoPattern, + Names: []string{"kafka.consumer:type=app-info,id=consumer-id"}, + }, + Detector: detectConsumerIDs, + Expected: []string{"consumer-id"}, + }, + { + Name: "consumer metrics pattern", // When app-info is tried and fails, the metrics fallback should be used + Conn: &mocks.MockJMXProvider{ + MBeanNamePattern: consumerMetricsPattern, + Names: []string{"kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-id"}, + }, + Detector: detectConsumerIDs, + Expected: []string{"consumer-id"}, + }, + { + Name: "producer app-info pattern", + Conn: &mocks.MockJMXProvider{ + MBeanNamePattern: producerAppInfoPattern, + Names: []string{"kafka.producer:type=app-info,id=my-id"}, + }, + Detector: detectProducerIDs, + Expected: []string{"my-id"}, + }, + { + Name: "producer metrics pattern", // When app-info is tried and fails, the metrics fallback should be used + Conn: &mocks.MockJMXProvider{ + MBeanNamePattern: producerMetricsPattern, + Names: []string{"kafka.producer:type=producer-metrics,client-id=producer-id"}, + }, + Detector: detectProducerIDs, + Expected: []string{"producer-id"}, + }, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + ids, err := c.Detector(&args.JMXHost{}, c.Conn) + require.NoError(t, err) + assert.Equal(t, c.Expected, ids) + }) + } } diff --git a/src/client/producer_consumer_collection.go b/src/client/producer_consumer_collection.go index 466ce704..c5fa0db2 100644 --- a/src/client/producer_consumer_collection.go +++ b/src/client/producer_consumer_collection.go @@ -70,7 +70,7 @@ func CollectConsumerMetrics(i *integration.Integration, jmxInfo *args.JMXHost, j } defer conn.Close() // Get client identifiers for all the consumers - clientIDs, err := getClientIDS(jmxInfo, consumerDetectionPattern, idFromMBeanWithClientIDField, conn) + clientIDs, err := detectConsumerIDs(jmxInfo, conn) if err != nil { log.Error("Unable to detect consumer/producers for '%s:%s': %s", jmxInfo.Host, jmxInfo.Port, err) return @@ -115,7 +115,7 @@ func CollectProducerMetrics(i *integration.Integration, jmxInfo *args.JMXHost, j } defer conn.Close() // Get client identifiers for all the producers - clientIDs, err := getClientIDS(jmxInfo, producerDetectionPattern, idFromMBeanWithClientIDField, conn) + clientIDs, err := detectProducerIDs(jmxInfo, conn) if err != nil { log.Error("Unable to detect producers for '%s:%s': %s", jmxInfo.Host, jmxInfo.Port, err) return From 4e483d0a49f102921ef8caf0d4064065db4b6786 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Felipe=20=C3=81lvarez?= Date: Thu, 30 Jun 2022 16:34:26 +0200 Subject: [PATCH 11/12] feat: use metrics name first for name detection --- src/client/id_detection.go | 4 ++-- src/client/id_detection_test.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/client/id_detection.go b/src/client/id_detection.go index 3bd2be5f..a3e111f9 100644 --- a/src/client/id_detection.go +++ b/src/client/id_detection.go @@ -28,8 +28,8 @@ func detectConsumerIDs(jmxInfo *args.JMXHost, conn connection.JMXConnection) ([] // consumerMetricsPattern which is also being used to fetch consumer metrics. return getClientIDS( []clientIDExtractInfo{ - {pattern: consumerAppInfoPattern, extractor: idFromAppInfo}, {pattern: consumerMetricsPattern, extractor: idFromMBeanWithClientIDField}, + {pattern: consumerAppInfoPattern, extractor: idFromAppInfo}, }, jmxInfo, conn, @@ -41,8 +41,8 @@ func detectProducerIDs(jmxInfo *args.JMXHost, conn connection.JMXConnection) ([] // ProducerMetricsPattern which is also being used to fetch consumer metrics. return getClientIDS( []clientIDExtractInfo{ - {pattern: producerAppInfoPattern, extractor: idFromAppInfo}, {pattern: producerMetricsPattern, extractor: idFromMBeanWithClientIDField}, + {pattern: producerAppInfoPattern, extractor: idFromAppInfo}, }, jmxInfo, conn, diff --git a/src/client/id_detection_test.go b/src/client/id_detection_test.go index 9335a3f0..8f75b07c 100644 --- a/src/client/id_detection_test.go +++ b/src/client/id_detection_test.go @@ -103,7 +103,7 @@ func TestDetectConsumerAndProducerIDs(t *testing.T) { Expected []string }{ { - Name: "consumer app-info pattern", + Name: "consumer app-info pattern", // app-info fallback Conn: &mocks.MockJMXProvider{ MBeanNamePattern: consumerAppInfoPattern, Names: []string{"kafka.consumer:type=app-info,id=consumer-id"}, @@ -112,7 +112,7 @@ func TestDetectConsumerAndProducerIDs(t *testing.T) { Expected: []string{"consumer-id"}, }, { - Name: "consumer metrics pattern", // When app-info is tried and fails, the metrics fallback should be used + Name: "consumer metrics pattern", Conn: &mocks.MockJMXProvider{ MBeanNamePattern: consumerMetricsPattern, Names: []string{"kafka.consumer:type=consumer-fetch-manager-metrics,client-id=consumer-id"}, @@ -121,7 +121,7 @@ func TestDetectConsumerAndProducerIDs(t *testing.T) { Expected: []string{"consumer-id"}, }, { - Name: "producer app-info pattern", + Name: "producer app-info pattern", // app-info fallback Conn: &mocks.MockJMXProvider{ MBeanNamePattern: producerAppInfoPattern, Names: []string{"kafka.producer:type=app-info,id=my-id"}, @@ -130,7 +130,7 @@ func TestDetectConsumerAndProducerIDs(t *testing.T) { Expected: []string{"my-id"}, }, { - Name: "producer metrics pattern", // When app-info is tried and fails, the metrics fallback should be used + Name: "producer metrics pattern", Conn: &mocks.MockJMXProvider{ MBeanNamePattern: producerMetricsPattern, Names: []string{"kafka.producer:type=producer-metrics,client-id=producer-id"}, From 6986813a96b1cd2fa6d7df40dc34d2afb2fc3ccc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Felipe=20=C3=81lvarez?= Date: Thu, 30 Jun 2022 16:37:41 +0200 Subject: [PATCH 12/12] chore: remove redundant comments --- src/client/id_detection.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/client/id_detection.go b/src/client/id_detection.go index a3e111f9..440842e2 100644 --- a/src/client/id_detection.go +++ b/src/client/id_detection.go @@ -24,8 +24,6 @@ type clientIDExtractInfo struct { } func detectConsumerIDs(jmxInfo *args.JMXHost, conn connection.JMXConnection) ([]string, error) { - // consumerAppInfoPatter is defined to detect consumer clientIDs, in case it is not found it will use - // consumerMetricsPattern which is also being used to fetch consumer metrics. return getClientIDS( []clientIDExtractInfo{ {pattern: consumerMetricsPattern, extractor: idFromMBeanWithClientIDField}, @@ -37,8 +35,6 @@ func detectConsumerIDs(jmxInfo *args.JMXHost, conn connection.JMXConnection) ([] } func detectProducerIDs(jmxInfo *args.JMXHost, conn connection.JMXConnection) ([]string, error) { - // producerAppInfoPatter is defined to detect consumer clientIDs, in case it is not found it will use - // ProducerMetricsPattern which is also being used to fetch consumer metrics. return getClientIDS( []clientIDExtractInfo{ {pattern: producerMetricsPattern, extractor: idFromMBeanWithClientIDField},