Skip to content

Commit

Permalink
[receiver/kafka] unexport the function WithTracesUnmarshalers, `Wit…
Browse files Browse the repository at this point in the history
…hMetricsUnmarshalers`, `WithLogsUnmarshalers` (#26644)

#26304

---------

Co-authored-by: Alex Boten <aboten@lightstep.com>
Co-authored-by: sakulali <sakulali@126.com>
  • Loading branch information
3 people authored Nov 24, 2023
1 parent 5882d1c commit f481d0c
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 27 deletions.
27 changes: 27 additions & 0 deletions .chloggen/kafkareceiver-checkapi.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkareceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Do not export the function `WithTracesUnmarshalers`, `WithMetricsUnmarshalers`, `WithLogsUnmarshalers`

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26304]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
3 changes: 1 addition & 2 deletions cmd/checkapi/allowlist.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
connector/servicegraphconnector
extension/observer
processor/servicegraphprocessor
receiver/kafkareceiver
processor/servicegraphprocessor
14 changes: 7 additions & 7 deletions receiver/kafkareceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,26 @@ const (
// FactoryOption applies changes to kafkaExporterFactory.
type FactoryOption func(factory *kafkaReceiverFactory)

// WithTracesUnmarshalers adds Unmarshalers.
func WithTracesUnmarshalers(tracesUnmarshalers ...TracesUnmarshaler) FactoryOption {
// withTracesUnmarshalers adds Unmarshalers.
func withTracesUnmarshalers(tracesUnmarshalers ...TracesUnmarshaler) FactoryOption {
return func(factory *kafkaReceiverFactory) {
for _, unmarshaler := range tracesUnmarshalers {
factory.tracesUnmarshalers[unmarshaler.Encoding()] = unmarshaler
}
}
}

// WithMetricsUnmarshalers adds MetricsUnmarshalers.
func WithMetricsUnmarshalers(metricsUnmarshalers ...MetricsUnmarshaler) FactoryOption {
// withMetricsUnmarshalers adds MetricsUnmarshalers.
func withMetricsUnmarshalers(metricsUnmarshalers ...MetricsUnmarshaler) FactoryOption {
return func(factory *kafkaReceiverFactory) {
for _, unmarshaler := range metricsUnmarshalers {
factory.metricsUnmarshalers[unmarshaler.Encoding()] = unmarshaler
}
}
}

// WithLogsUnmarshalers adds LogsUnmarshalers.
func WithLogsUnmarshalers(logsUnmarshalers ...LogsUnmarshaler) FactoryOption {
// withLogsUnmarshalers adds LogsUnmarshalers.
func withLogsUnmarshalers(logsUnmarshalers ...LogsUnmarshaler) FactoryOption {
return func(factory *kafkaReceiverFactory) {
for _, unmarshaler := range logsUnmarshalers {
factory.logsUnmarshalers[unmarshaler.Encoding()] = unmarshaler
Expand All @@ -69,7 +69,7 @@ func WithLogsUnmarshalers(logsUnmarshalers ...LogsUnmarshaler) FactoryOption {

// NewFactory creates Kafka receiver factory.
func NewFactory(options ...FactoryOption) receiver.Factory {
_ = view.Register(MetricViews()...)
_ = view.Register(metricViews()...)

f := &kafkaReceiverFactory{
tracesUnmarshalers: defaultTracesUnmarshalers(),
Expand Down
6 changes: 3 additions & 3 deletions receiver/kafkareceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestCreateTracesReceiver_error(t *testing.T) {

func TestWithTracesUnmarshalers(t *testing.T) {
unmarshaler := &customTracesUnmarshaler{}
f := NewFactory(WithTracesUnmarshalers(unmarshaler))
f := NewFactory(withTracesUnmarshalers(unmarshaler))
cfg := createDefaultConfig().(*Config)
// disable contacting broker
cfg.Metadata.Full = false
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestCreateMetricsReceiver_error(t *testing.T) {

func TestWithMetricsUnmarshalers(t *testing.T) {
unmarshaler := &customMetricsUnmarshaler{}
f := NewFactory(WithMetricsUnmarshalers(unmarshaler))
f := NewFactory(withMetricsUnmarshalers(unmarshaler))
cfg := createDefaultConfig().(*Config)
// disable contacting broker
cfg.Metadata.Full = false
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestCreateLogsReceiver_error(t *testing.T) {

func TestWithLogsUnmarshalers(t *testing.T) {
unmarshaler := &customLogsUnmarshaler{}
f := NewFactory(WithLogsUnmarshalers(unmarshaler))
f := NewFactory(withLogsUnmarshalers(unmarshaler))
cfg := createDefaultConfig().(*Config)
// disable contacting broker
cfg.Metadata.Full = false
Expand Down
24 changes: 12 additions & 12 deletions receiver/kafkareceiver/kafka_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ func TestTracesReceiver_error(t *testing.T) {
}

func TestTracesConsumerGroupHandler(t *testing.T) {
view.Unregister(MetricViews()...)
views := MetricViews()
view.Unregister(metricViews()...)
views := metricViews()
require.NoError(t, view.Register(views...))
defer view.Unregister(views...)

Expand Down Expand Up @@ -182,8 +182,8 @@ func TestTracesConsumerGroupHandler(t *testing.T) {
}

func TestTracesConsumerGroupHandler_session_done(t *testing.T) {
view.Unregister(MetricViews()...)
views := MetricViews()
view.Unregister(metricViews()...)
views := metricViews()
require.NoError(t, view.Register(views...))
defer view.Unregister(views...)

Expand Down Expand Up @@ -393,8 +393,8 @@ func TestMetricsReceiver_error(t *testing.T) {
}

func TestMetricsConsumerGroupHandler(t *testing.T) {
view.Unregister(MetricViews()...)
views := MetricViews()
view.Unregister(metricViews()...)
views := metricViews()
require.NoError(t, view.Register(views...))
defer view.Unregister(views...)

Expand Down Expand Up @@ -443,8 +443,8 @@ func TestMetricsConsumerGroupHandler(t *testing.T) {
}

func TestMetricsConsumerGroupHandler_session_done(t *testing.T) {
view.Unregister(MetricViews()...)
views := MetricViews()
view.Unregister(metricViews()...)
views := metricViews()
require.NoError(t, view.Register(views...))
defer view.Unregister(views...)

Expand Down Expand Up @@ -652,8 +652,8 @@ func TestLogsReceiver_error(t *testing.T) {
}

func TestLogsConsumerGroupHandler(t *testing.T) {
view.Unregister(MetricViews()...)
views := MetricViews()
view.Unregister(metricViews()...)
views := metricViews()
require.NoError(t, view.Register(views...))
defer view.Unregister(views...)

Expand Down Expand Up @@ -702,8 +702,8 @@ func TestLogsConsumerGroupHandler(t *testing.T) {
}

func TestLogsConsumerGroupHandler_session_done(t *testing.T) {
view.Unregister(MetricViews()...)
views := MetricViews()
view.Unregister(metricViews()...)
views := metricViews()
require.NoError(t, view.Register(views...))
defer view.Unregister(views...)

Expand Down
4 changes: 2 additions & 2 deletions receiver/kafkareceiver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ var (
statPartitionClose = stats.Int64("kafka_receiver_partition_close", "Number of finished partitions", stats.UnitDimensionless)
)

// MetricViews return metric views for Kafka receiver.
func MetricViews() []*view.View {
// metricViews return metric views for Kafka receiver.
func metricViews() []*view.View {
tagKeys := []tag.Key{tagInstanceName}

countMessages := &view.View{
Expand Down
2 changes: 1 addition & 1 deletion receiver/kafkareceiver/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func TestMetrics(t *testing.T) {
metricViews := MetricViews()
metricViews := metricViews()
viewNames := []string{
"kafka_receiver_messages",
"kafka_receiver_current_offset",
Expand Down

0 comments on commit f481d0c

Please sign in to comment.