Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Autodetect consumer producer names #178

Merged
merged 12 commits into from
Jul 1, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions kafka-config.yml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ integrations:
# In order to collect Java producer and consumer metrics the "producers" and "consumers" fields should be filled out.
# Both fields are JSON arrays with each entry being a separate JAVA producer or consumer, in it's respective field.
# Each entry should have the following fields:
# - name: This is the client.id of the producer/consumer as it appears in Kafka.
# - host: The IP or Hostname of the producer/consumser. If omitted, will use the value of the "default_jmx_host" field
# - name: This is the client.id of the producer/consumer as it appears in Kafka. If omitted, metrics from all clients in the JMX host:port will be reported.
# - host: The IP or Hostname of the producer/consumer. If omitted, will use the value of the "default_jmx_host" field
# - port: The port in which JMX is setup for on the producer/consumer. If omitted will, use the value of the "default_jmx_port" field
# - username: The username used to connect to JMX. If omitted, will use the value of the "default_jmx_user" field
# - password: The password used to connect to JMX. If omitted, will use the value of the "default_jmx_password" field
Expand All @@ -175,7 +175,10 @@ integrations:
CONSUMERS: '[{"name": "myConsumer", "host": "localhost", "port": 24, "username": "me", "password": "secret"}]'
# If several producers/consumers are on the same host an agent can be installed on that host and the
# "default_jmx_host" and "default_jmx_port" field can be set once and used for all producers/consumers that
# do not have the "host" or "port" field repsectively.
# do not have the "host" or "port" field respectively.
# When defaults are set it is also possible to use the string "default" to gather metrics from all producers /
# consumers in the "default_jmx_host:default_jmx_port". Example:
# PRODUCERS: default
# These fields can be removed if each producer/consumer has it's own "host" and/or "port" field filled out.
DEFAULT_JMX_HOST: "localhost"
DEFAULT_JMX_PORT: "9999"
Expand Down
91 changes: 91 additions & 0 deletions src/args/args_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
sdkArgs "github.com/newrelic/infra-integrations-sdk/args"
"github.com/newrelic/infra-integrations-sdk/integration"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestParseArgs(t *testing.T) {
Expand Down Expand Up @@ -217,3 +218,93 @@ func Test_unmarshalConsumerGroups_NoTopics(t *testing.T) {
t.Error("Expected error")
}
}

func TestUnMarshalJMXHosts(t *testing.T) {
arguments := &ArgumentList{
DefaultJMXUser: "user",
DefaultJMXPassword: "password",
DefaultJMXPort: 42,
DefaultJMXHost: "host",
}
cases := []struct {
Name string
Input string
Expected []*JMXHost
}{
{
Name: "Empty",
Input: `[]`,
Expected: []*JMXHost{},
},
{
Name: "Default with no alias",
Input: `[{}]`,
Expected: []*JMXHost{
{
User: arguments.DefaultJMXUser,
Password: arguments.DefaultJMXPassword,
Port: arguments.DefaultJMXPort,
Host: arguments.DefaultJMXHost,
},
},
},
{
Name: "Default with alias",
Input: `default`,
Expected: []*JMXHost{
{
User: arguments.DefaultJMXUser,
Password: arguments.DefaultJMXPassword,
Port: arguments.DefaultJMXPort,
Host: arguments.DefaultJMXHost,
},
},
},
{
Name: "Only name set",
Input: `[{"name": "client.id"}]`,
Expected: []*JMXHost{
{
Name: "client.id",
User: arguments.DefaultJMXUser,
Password: arguments.DefaultJMXPassword,
Port: arguments.DefaultJMXPort,
Host: arguments.DefaultJMXHost,
},
},
},
{
Name: "No name set",
Input: `[{"user": "my.user", "password": "my.pass", "port": 1088, "host": "localhost"}]`,
Expected: []*JMXHost{
{
User: "my.user",
Password: "my.pass",
Port: 1088,
Host: "localhost",
},
},
},
{
Name: "All values set",
Input: `[{"name": "my.name", "user": "my.user", "password": "my.pass", "port": 1088, "host": "localhost"}]`,
Expected: []*JMXHost{
{
Name: "my.name",
User: "my.user",
Password: "my.pass",
Port: 1088,
Host: "localhost",
},
},
},
}

for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
parsed, err := unmarshalJMXHosts([]byte(c.Input), arguments)
require.NoError(t, err)
require.Equal(t, c.Expected, parsed)
})
}
}
10 changes: 5 additions & 5 deletions src/args/parsed_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ var GlobalArgs *ParsedArguments
const (
defaultZookeeperPort = 2181
defaultJMXPort = 9999

jmxHostDefaultAlias = "default"
)

// ParsedArguments is an special version of the config arguments that has advanced parsing
Expand Down Expand Up @@ -313,18 +315,16 @@ func ParseArgs(a ArgumentList) (*ParsedArguments, error) {
// unmarshalJMXHosts parses the user-provided JSON map for a producer
// or consumers into a jmxHost structs and sets default values
func unmarshalJMXHosts(data []byte, a *ArgumentList) ([]*JMXHost, error) {

// Parse the producer or consumer
var v []*JMXHost
if err := json.Unmarshal([]byte(data), &v); err != nil {
if string(data) == jmxHostDefaultAlias {
v = []*JMXHost{{}}
} else if err := json.Unmarshal(data, &v); err != nil {
return nil, err
}

// Set default values
for _, p := range v {
if p.Name == "" {
return nil, errors.New("must specify a name for each producer in the list")
}
if p.User == "" {
p.User = a.DefaultJMXUser
}
Expand Down
55 changes: 55 additions & 0 deletions src/client/id_detection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package client

import (
"strings"

"github.com/newrelic/nri-kafka/src/args"
"github.com/newrelic/nri-kafka/src/connection"
)

const (
consumerDetectionPattern = "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*"
producerDetectionPattern = "kafka.producer:type=producer-metrics,client-id=*"
sigilioso marked this conversation as resolved.
Show resolved Hide resolved
)

// idFromMBeanNameFn defines a function to extract the identifier from an MBean name.
type idFromMBeanNameFn func(string) string

func getClientIDS(jmxInfo *args.JMXHost, mBeanPattern string, idExtractor idFromMBeanNameFn, conn connection.JMXConnection) ([]string, error) {
if jmxInfo.Name != "" {
return []string{jmxInfo.Name}, nil
}
return detectClientIDs(mBeanPattern, idExtractor, conn)
}

func detectClientIDs(pattern string, idExtractor idFromMBeanNameFn, conn connection.JMXConnection) ([]string, error) {
mBeanNames, err := conn.QueryMBeanNames(pattern)
if err != nil {
return nil, err
}
return idsFromMBeanNames(mBeanNames, idExtractor), nil
}

func idsFromMBeanNames(mBeanNames []string, idExtractor idFromMBeanNameFn) []string {
ids := []string{}
for _, mBeanName := range mBeanNames {
if id := idExtractor(mBeanName); id != "" {
ids = append(ids, id)
}
}
return ids
}

// idFromMBeanWithClientIDField Gets the identifier given a type=app-info MBean name. Example: "name:type=app-info,client-id=my-id"
func idFromMBeanWithClientIDField(mBeanName string) string {
_, info, valid := strings.Cut(mBeanName, ":")
if !valid {
return ""
}
for _, field := range strings.Split(info, ",") {
if _, id, isIDField := strings.Cut(field, "client-id="); isIDField {
return id
}
}
return ""
}
86 changes: 86 additions & 0 deletions src/client/id_detection_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package client

import (
"strings"
"testing"

"github.com/newrelic/nri-kafka/src/args"
"github.com/newrelic/nri-kafka/src/connection/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestIdFromAppInfoMBean(t *testing.T) {
cases := []struct {
MBeanName string
Expected string
}{
{
MBeanName: "kafka.consumer:type=consumer-fetch-manager-metrics,client-id=my_consumer",
Expected: "my_consumer",
},
{
MBeanName: "kafka.consumer:client-id=my_consumer,type=consumer-fetch-manager-metrics",
Expected: "my_consumer",
},
{
MBeanName: "kafka.producer:type=producer-metrics,client-id=my_producer",
Expected: "my_producer",
},
{
MBeanName: "kafka.producer:type=producer-metrics,no-id-here",
Expected: "",
},
{
MBeanName: "id=my_consumer,type=app-info,invalid-mbean=true",
Expected: "",
},
}

for _, c := range cases {
t.Run(c.MBeanName, func(t *testing.T) {
assert.Equal(t, c.Expected, idFromMBeanWithClientIDField(c.MBeanName))
})
}
}

func TestIdsFromMBeanNames(t *testing.T) {
mBeanNames := []string{"_id1", "_id2", "invalid_id", "_id3"}
idExtractor := func(name string) string {
if strings.HasPrefix(name, "_") {
return strings.TrimLeft(name, "_")
}
return ""
}
expected := []string{"id1", "id2", "id3"}
assert.Equal(t, expected, idsFromMBeanNames(mBeanNames, idExtractor))
}

func TestDetectClientIDsConnError(t *testing.T) {
pattern := "some-pattern"
conn := &mocks.MockJMXProvider{MBeanNamePattern: "other-pattern-causes-error"}
_, err := detectClientIDs(pattern, nil, conn)
assert.Error(t, err)
}

func TestDetectClientIDs(t *testing.T) {
pattern := "pattern"
conn := &mocks.MockJMXProvider{MBeanNamePattern: pattern, Names: []string{"a", "b", "c"}}
ids, err := detectClientIDs(pattern, strings.ToUpper, conn)
require.NoError(t, err)
assert.Equal(t, []string{"A", "B", "C"}, ids)
}

func TestGetClientIDs(t *testing.T) {
pattern := "pattern"
conn := &mocks.MockJMXProvider{MBeanNamePattern: pattern, Names: []string{"a", "b", "c"}}

jmxInfo := &args.JMXHost{Name: "D"}
ids, err := getClientIDS(jmxInfo, pattern, strings.ToUpper, conn)
require.NoError(t, err)
assert.Equal(t, []string{"D"}, ids, "Expected only the JMXHost.Name when it is defined")

jmxInfo = &args.JMXHost{}
ids, _ = getClientIDS(jmxInfo, pattern, strings.ToUpper, conn)
assert.Equal(t, []string{"A", "B", "C"}, ids, "Detect clients should be executed when JMXHost.Name is not defined")
}
Loading