diff --git a/README.md b/README.md index 9d67c10..1d60aac 100644 --- a/README.md +++ b/README.md @@ -219,53 +219,55 @@ under [the specified folder](examples/with-sasl-plaintext) and then start the ap ## Configurations -| config | description | default | -|--------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------|-----------------------------| -| `reader` | [Describes all segmentio kafka reader configurations](https://pkg.go.dev/github.com/segmentio/kafka-go#ReaderConfig) | | -| `consumeFn` | Kafka consumer function, if retry enabled it, is also used to consume retriable messages | | -| `skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil | -| `logLevel` | Describes log level; valid options are `debug`, `info`, `warn`, and `error` | info | -| `concurrency` | Number of goroutines used at listeners | 1 | -| `retryEnabled` | Retry/Exception consumer is working or not | false | -| `transactionalRetry` | Set false if you want to use exception/retry strategy to only failed messages | true | -| `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s | -| `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | | -| `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | | -| `messageGroupDuration` | Maximum time to wait for a batch | 1s | -| `dial.Timeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | no timeout | -| `dial.KeepAlive` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | not enabled | -| `transport.DialTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 5s | -| `transport.IdleTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 30s | -| `transport.MetadataTTL ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 6s | -| `transport.MetadataTopics ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | all topics in cluster | -| `distributedTracingEnabled` | indicates open telemetry support on/off for consume and produce operations. | false | -| `distributedTracingConfiguration.TracerProvider` | [see doc](https://opentelemetry.io/docs/specs/otel/trace/api/) | otel.GetTracerProvider() | -| `distributedTracingConfiguration.Propagator` | [see doc](https://opentelemetry.io/docs/specs/otel/trace/api/) | otel.GetTextMapPropagator() | -| `retryConfiguration.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | | -| `retryConfiguration.startTimeCron` | Cron expression when retry consumer ([kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer#configurations)) starts to work at | | -| `retryConfiguration.workDuration` | Work duration exception consumer actively consuming messages | | -| `retryConfiguration.topic` | Retry/Exception topic names | | -| `retryConfiguration.brokers` | Retry topic brokers urls | | -| `retryConfiguration.maxRetry` | Maximum retry value for attempting to retry a message | 3 | -| `retryConfiguration.tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" | -| `retryConfiguration.tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" | -| `retryConfiguration.sasl.authType` | `SCRAM` or `PLAIN` | | -| `retryConfiguration.sasl.username` | SCRAM OR PLAIN username | | -| `retryConfiguration.sasl.password` | SCRAM OR PLAIN password | | -| `retryConfiguration.skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil | -| `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | | -| `batchConfiguration.batchConsumeFn` | Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | | -| `batchConfiguration.preBatchFn` | This function enable for transforming messages before batch consuming starts | | -| `tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" | -| `tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" | -| `sasl.authType` | `SCRAM` or `PLAIN` | | -| `sasl.username` | SCRAM OR PLAIN username | | -| `sasl.password` | SCRAM OR PLAIN password | | -| `logger` | If you want to custom logger | info | -| `apiEnabled` | Enabled metrics | false | -| `apiConfiguration.port` | Set API port | 8090 | -| `apiConfiguration.healtCheckPath` | Set Health check path | healthcheck | -| `metricConfiguration.path` | Set metric endpoint path | /metrics | +| config | description | default | +|--------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------------| +| `reader` | [Describes all segmentio kafka reader configurations](https://pkg.go.dev/github.com/segmentio/kafka-go#ReaderConfig) | | +| `consumeFn` | Kafka consumer function, if retry enabled it, is also used to consume retriable messages | | +| `skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil | +| `logLevel` | Describes log level; valid options are `debug`, `info`, `warn`, and `error` | info | +| `concurrency` | Number of goroutines used at listeners | 1 | +| `retryEnabled` | Retry/Exception consumer is working or not | false | +| `transactionalRetry` | Set false if you want to use exception/retry strategy to only failed messages | true | +| `commitInterval` | indicates the interval at which offsets are committed to the broker. | 1s | +| `rack` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#RackAffinityGroupBalancer) | | +| `clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | | +| `messageGroupDuration` | Maximum time to wait for a batch | 1s | +| `metricPrefix` | MetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is `kafka_konsumer`. Currently, there are two exposed prometheus metrics. `processed_messages_total` and `unprocessed_messages_total` So, if default metric prefix used, metrics names are `kafka_konsumer_processed_messages_total_current` and `kafka_konsumer_unprocessed_messages_total_current`. | kafka_konsumer | +| `dial.Timeout` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | no timeout | +| `dial.KeepAlive` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go#Dialer) | not enabled | +| `transport.DialTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 5s | +| `transport.IdleTimeout ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 30s | +| `transport.MetadataTTL ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | 6s | +| `transport.MetadataTopics ` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | all topics in cluster | +| `distributedTracingEnabled` | indicates open telemetry support on/off for consume and produce operations. | false | +| `distributedTracingConfiguration.TracerProvider` | [see doc](https://opentelemetry.io/docs/specs/otel/trace/api/) | otel.GetTracerProvider() | +| `distributedTracingConfiguration.Propagator` | [see doc](https://opentelemetry.io/docs/specs/otel/trace/api/) | otel.GetTextMapPropagator() | +| `retryConfiguration.clientId` | [see doc](https://pkg.go.dev/github.com/segmentio/kafka-go@#Transport) | | +| `retryConfiguration.startTimeCron` | Cron expression when retry consumer ([kafka-cronsumer](https://github.com/Trendyol/kafka-cronsumer#configurations)) starts to work at | | +| `retryConfiguration.metricPrefix` | MetricPrefix is used for prometheus fq name prefix. If not provided, default metric prefix value is kafka_cronsumer. Currently, there are two exposed prometheus metrics. retried_messages_total_current and discarded_messages_total_current. So, if default metric prefix used, metrics names are kafka_cronsumer_retried_messages_total_current and kafka_cronsumer_discarded_messages_total_current | kafka_cronsumer | +| `retryConfiguration.workDuration` | Work duration exception consumer actively consuming messages | | +| `retryConfiguration.topic` | Retry/Exception topic names | | +| `retryConfiguration.brokers` | Retry topic brokers urls | | +| `retryConfiguration.maxRetry` | Maximum retry value for attempting to retry a message | 3 | +| `retryConfiguration.tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" | +| `retryConfiguration.tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" | +| `retryConfiguration.sasl.authType` | `SCRAM` or `PLAIN` | | +| `retryConfiguration.sasl.username` | SCRAM OR PLAIN username | | +| `retryConfiguration.sasl.password` | SCRAM OR PLAIN password | | +| `retryConfiguration.skipMessageByHeaderFn` | Function to filter messages based on headers, return true if you want to skip the message | nil | +| `batchConfiguration.messageGroupLimit` | Maximum number of messages in a batch | | +| `batchConfiguration.batchConsumeFn` | Kafka batch consumer function, if retry enabled it, is also used to consume retriable messages | | +| `batchConfiguration.preBatchFn` | This function enable for transforming messages before batch consuming starts | | +| `tls.rootCAPath` | [see doc](https://pkg.go.dev/crypto/tls#Config.RootCAs) | "" | +| `tls.intermediateCAPath` | Same with rootCA, if you want to specify two rootca you can use it with rootCAPath | "" | +| `sasl.authType` | `SCRAM` or `PLAIN` | | +| `sasl.username` | SCRAM OR PLAIN username | | +| `sasl.password` | SCRAM OR PLAIN password | | +| `logger` | If you want to custom logger | info | +| `apiEnabled` | Enabled metrics | false | +| `apiConfiguration.port` | Set API port | 8090 | +| `apiConfiguration.healtCheckPath` | Set Health check path | healthcheck | +| `metricConfiguration.path` | Set metric endpoint path | /metrics | ## Monitoring diff --git a/batch_consumer.go b/batch_consumer.go index 598f129..60de663 100644 --- a/batch_consumer.go +++ b/batch_consumer.go @@ -3,6 +3,8 @@ package kafka import ( "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/segmentio/kafka-go" kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" @@ -51,8 +53,8 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) { return &c, nil } -func (b *batchConsumer) GetMetric() *ConsumerMetric { - return b.metric +func (b *batchConsumer) GetMetricCollectors() []prometheus.Collector { + return b.base.GetMetricCollectors() } func (b *batchConsumer) Consume() { diff --git a/collector.go b/collector.go index 3f702d2..29b7f38 100644 --- a/collector.go +++ b/collector.go @@ -8,20 +8,43 @@ import ( const Name = "kafka_konsumer" -type metricCollector struct { +type MetricCollector struct { consumerMetric *ConsumerMetric totalUnprocessedMessagesCounter *prometheus.Desc totalProcessedMessagesCounter *prometheus.Desc } -func (s *metricCollector) Describe(ch chan<- *prometheus.Desc) { +func NewMetricCollector(metricPrefix string, consumerMetric *ConsumerMetric) *MetricCollector { + if metricPrefix == "" { + metricPrefix = Name + } + + return &MetricCollector{ + consumerMetric: consumerMetric, + + totalProcessedMessagesCounter: prometheus.NewDesc( + prometheus.BuildFQName(metricPrefix, "processed_messages_total", "current"), + "Total number of processed messages.", + emptyStringList, + nil, + ), + totalUnprocessedMessagesCounter: prometheus.NewDesc( + prometheus.BuildFQName(metricPrefix, "unprocessed_messages_total", "current"), + "Total number of unprocessed messages.", + emptyStringList, + nil, + ), + } +} + +func (s *MetricCollector) Describe(ch chan<- *prometheus.Desc) { prometheus.DescribeByCollect(s, ch) } var emptyStringList []string -func (s *metricCollector) Collect(ch chan<- prometheus.Metric) { +func (s *MetricCollector) Collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric( s.totalProcessedMessagesCounter, prometheus.CounterValue, @@ -37,31 +60,12 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) { ) } -func newMetricCollector(consumerMetric *ConsumerMetric) *metricCollector { - return &metricCollector{ - consumerMetric: consumerMetric, - - totalProcessedMessagesCounter: prometheus.NewDesc( - prometheus.BuildFQName(Name, "processed_messages_total", "current"), - "Total number of processed messages.", - emptyStringList, - nil, - ), - totalUnprocessedMessagesCounter: prometheus.NewDesc( - prometheus.BuildFQName(Name, "unprocessed_messages_total", "current"), - "Total number of unprocessed messages.", - emptyStringList, - nil, - ), - } -} - func NewMetricMiddleware(cfg *ConsumerConfig, app *fiber.App, consumerMetric *ConsumerMetric, metricCollectors ...prometheus.Collector, ) (func(ctx *fiber.Ctx) error, error) { - prometheus.DefaultRegisterer.MustRegister(newMetricCollector(consumerMetric)) + prometheus.DefaultRegisterer.MustRegister(NewMetricCollector(cfg.MetricPrefix, consumerMetric)) prometheus.DefaultRegisterer.MustRegister(metricCollectors...) fiberPrometheus := fiberprometheus.New(cfg.Reader.GroupID) diff --git a/collector_test.go b/collector_test.go new file mode 100644 index 0000000..4c53529 --- /dev/null +++ b/collector_test.go @@ -0,0 +1,59 @@ +package kafka + +import ( + "reflect" + "testing" + + "github.com/prometheus/client_golang/prometheus" +) + +func Test_NewCollector(t *testing.T) { + t.Run("When_Default_Prefix_Value_Used", func(t *testing.T) { + cronsumerMetric := &ConsumerMetric{} + expectedTotalProcessedMessagesCounter := prometheus.NewDesc( + prometheus.BuildFQName(Name, "processed_messages_total", "current"), + "Total number of processed messages.", + emptyStringList, + nil, + ) + expectedTotalUnprocessedMessagesCounter := prometheus.NewDesc( + prometheus.BuildFQName(Name, "unprocessed_messages_total", "current"), + "Total number of unprocessed messages.", + emptyStringList, + nil, + ) + + collector := NewMetricCollector("", cronsumerMetric) + + if !reflect.DeepEqual(collector.totalProcessedMessagesCounter, expectedTotalProcessedMessagesCounter) { + t.Errorf("Expected: %+v, Actual: %+v", collector.totalProcessedMessagesCounter, expectedTotalProcessedMessagesCounter) + } + if !reflect.DeepEqual(collector.totalUnprocessedMessagesCounter, expectedTotalUnprocessedMessagesCounter) { + t.Errorf("Expected: %+v, Actual: %+v", collector.totalUnprocessedMessagesCounter, expectedTotalUnprocessedMessagesCounter) + } + }) + t.Run("When_Custom_Prefix_Value_Used", func(t *testing.T) { + cronsumerMetric := &ConsumerMetric{} + expectedTotalProcessedMessagesCounter := prometheus.NewDesc( + prometheus.BuildFQName("custom_prefix", "processed_messages_total", "current"), + "Total number of processed messages.", + emptyStringList, + nil, + ) + expectedTotalUnprocessedMessagesCounter := prometheus.NewDesc( + prometheus.BuildFQName("custom_prefix", "unprocessed_messages_total", "current"), + "Total number of unprocessed messages.", + emptyStringList, + nil, + ) + + collector := NewMetricCollector("custom_prefix", cronsumerMetric) + + if !reflect.DeepEqual(collector.totalProcessedMessagesCounter, expectedTotalProcessedMessagesCounter) { + t.Errorf("Expected: %+v, Actual: %+v", collector.totalProcessedMessagesCounter, expectedTotalProcessedMessagesCounter) + } + if !reflect.DeepEqual(collector.totalUnprocessedMessagesCounter, expectedTotalUnprocessedMessagesCounter) { + t.Errorf("Expected: %+v, Actual: %+v", collector.totalUnprocessedMessagesCounter, expectedTotalUnprocessedMessagesCounter) + } + }) +} diff --git a/consumer.go b/consumer.go index 4d66f2f..66f5e6b 100644 --- a/consumer.go +++ b/consumer.go @@ -3,6 +3,8 @@ package kafka import ( "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/segmentio/kafka-go" kcronsumer "github.com/Trendyol/kafka-cronsumer/pkg/kafka" @@ -46,6 +48,10 @@ func newSingleConsumer(cfg *ConsumerConfig) (Consumer, error) { return &c, nil } +func (c *consumer) GetMetricCollectors() []prometheus.Collector { + return c.base.GetMetricCollectors() +} + func (c *consumer) Consume() { go c.subprocesses.Start() diff --git a/consumer_base.go b/consumer_base.go index 69b50d1..d429ec5 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -26,6 +26,11 @@ type Consumer interface { // Resume function resumes consumer, it is start to working Resume() + // GetMetricCollectors for the purpose of making metric collectors available. + // You can register these collectors on your own http server. + // Please look at the examples/with-metric-collector directory. + GetMetricCollectors() []prometheus.Collector + // WithLogger for injecting custom log implementation WithLogger(logger LoggerInterface) @@ -72,6 +77,7 @@ type base struct { transactionalRetry bool distributedTracingEnabled bool consumerState state + metricPrefix string } func NewConsumer(cfg *ConsumerConfig) (Consumer, error) { @@ -109,6 +115,7 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) { batchConsumingStream: make(chan []*Message, cfg.Concurrency), consumerState: stateRunning, skipMessageByHeaderFn: cfg.SkipMessageByHeaderFn, + metricPrefix: cfg.MetricPrefix, } if cfg.DistributedTracingEnabled { @@ -127,6 +134,18 @@ func (c *base) setupCronsumer(cfg *ConsumerConfig, retryFn func(kcronsumer.Messa c.subprocesses.Add(c.cronsumer) } +func (c *base) GetMetricCollectors() []prometheus.Collector { + var metricCollectors []prometheus.Collector + + if c.retryEnabled { + metricCollectors = c.cronsumer.GetMetricCollectors() + } + + metricCollectors = append(metricCollectors, NewMetricCollector(c.metricPrefix, c.metric)) + + return metricCollectors +} + func (c *base) setupAPI(cfg *ConsumerConfig, consumerMetric *ConsumerMetric) { c.logger.Debug("Initializing API") diff --git a/consumer_config.go b/consumer_config.go index a09436f..f0cdc16 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -51,12 +51,20 @@ type ConsumerConfig struct { DistributedTracingEnabled bool RetryEnabled bool APIEnabled bool + + // MetricPrefix is used for prometheus fq name prefix. + // If not provided, default metric prefix value is `kafka_konsumer`. + // Currently, there are two exposed prometheus metrics. `processed_messages_total_current` and `unprocessed_messages_total_current`. + // So, if default metric prefix used, metrics names are `kafka_konsumer_processed_messages_total_current` and + // `kafka_konsumer_unprocessed_messages_total_current`. + MetricPrefix string } func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config { cronsumerCfg := kcronsumer.Config{ - ClientID: cfg.RetryConfiguration.ClientID, - Brokers: cfg.RetryConfiguration.Brokers, + MetricPrefix: cfg.RetryConfiguration.MetricPrefix, + ClientID: cfg.RetryConfiguration.ClientID, + Brokers: cfg.RetryConfiguration.Brokers, Consumer: kcronsumer.ConsumerConfig{ ClientID: cfg.ClientID, GroupID: cfg.Reader.GroupID, @@ -131,6 +139,13 @@ func toHeaders(cronsumerHeaders []kcronsumer.Header) []Header { } type RetryConfiguration struct { + // MetricPrefix is used for prometheus fq name prefix. + // If not provided, default metric prefix value is `kafka_cronsumer`. + // Currently, there are two exposed prometheus metrics. `retried_messages_total_current` and `discarded_messages_total_current`. + // So, if default metric prefix used, metrics names are `kafka_cronsumer_retried_messages_total_current` and + // `kafka_cronsumer_discarded_messages_total_current`. + MetricPrefix string + SASL *SASLConfig TLS *TLSConfig ClientID string diff --git a/examples/docker-compose.yml b/examples/docker-compose.yml index de84b17..8fdf209 100644 --- a/examples/docker-compose.yml +++ b/examples/docker-compose.yml @@ -49,6 +49,7 @@ services: command: "bash -c 'echo Waiting for Kafka to be ready... && \ cub kafka-ready -b kafka:9092 1 20 && \ kafka-topics --create --topic standart-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \ + kafka-topics --create --topic another-standart-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \ kafka-topics --create --topic retry-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \ kafka-topics --create --topic error-topic --if-not-exists --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 && \ sleep infinity'" diff --git a/examples/with-metric-collector/README.md b/examples/with-metric-collector/README.md new file mode 100644 index 0000000..d16d674 --- /dev/null +++ b/examples/with-metric-collector/README.md @@ -0,0 +1,30 @@ +If you run this example and go to http://localhost:8000/metrics, + +you can see first and second consumers metrics as shown below + +``` +# HELP first_discarded_messages_total_current Total number of discarded messages. +# TYPE first_discarded_messages_total_current counter +first_discarded_messages_total_current 0 +# HELP first_processed_messages_total_current Total number of processed messages. +# TYPE first_processed_messages_total_current counter +first_processed_messages_total_current 0 +# HELP first_retried_messages_total_current Total number of retried messages. +# TYPE first_retried_messages_total_current counter +first_retried_messages_total_current 0 +# HELP first_unprocessed_messages_total_current Total number of unprocessed messages. +# TYPE first_unprocessed_messages_total_current counter +first_unprocessed_messages_total_current 0 +# HELP second_discarded_messages_total_current Total number of discarded messages. +# TYPE second_discarded_messages_total_current counter +second_discarded_messages_total_current 0 +# HELP second_processed_messages_total_current Total number of processed messages. +# TYPE second_processed_messages_total_current counter +second_processed_messages_total_current 0 +# HELP second_retried_messages_total_current Total number of retried messages. +# TYPE second_retried_messages_total_current counter +second_retried_messages_total_current 0 +# HELP second_unprocessed_messages_total_current Total number of unprocessed messages. +# TYPE second_unprocessed_messages_total_current counter +second_unprocessed_messages_total_current 0 +``` \ No newline at end of file diff --git a/examples/with-metric-collector/api.go b/examples/with-metric-collector/api.go new file mode 100644 index 0000000..14855c0 --- /dev/null +++ b/examples/with-metric-collector/api.go @@ -0,0 +1,37 @@ +package main + +import ( + "fmt" + "github.com/gofiber/fiber/v2" + "github.com/prometheus/client_golang/prometheus" +) + +const port = 8000 + +func StartAPI(metricCollectors ...prometheus.Collector) { + f := fiber.New( + fiber.Config{ + DisableStartupMessage: true, + DisableDefaultDate: true, + DisableHeaderNormalizing: true, + }, + ) + + metricMiddleware, err := NewMetricMiddleware(f, metricCollectors...) + + if err == nil { + f.Use(metricMiddleware) + } else { + fmt.Printf("metric middleware cannot be initialized: %v", err) + } + + fmt.Printf("server starting on port %d", port) + + go listen(f) +} + +func listen(f *fiber.App) { + if err := f.Listen(fmt.Sprintf(":%d", port)); err != nil { + fmt.Printf("server cannot start on port %d, err: %v", port, err) + } +} diff --git a/examples/with-metric-collector/main.go b/examples/with-metric-collector/main.go new file mode 100644 index 0000000..a98bdf5 --- /dev/null +++ b/examples/with-metric-collector/main.go @@ -0,0 +1,62 @@ +package main + +import ( + "fmt" + "github.com/Trendyol/kafka-konsumer/v2" + "time" +) + +func main() { + firstConsumer, _ := kafka.NewConsumer(&kafka.ConsumerConfig{ + MetricPrefix: "first", + Reader: kafka.ReaderConfig{ + Brokers: []string{"localhost:29092"}, + Topic: "standart-topic", + GroupID: "standart-cg", + }, + RetryEnabled: true, + RetryConfiguration: kafka.RetryConfiguration{ + MetricPrefix: "first", + Brokers: []string{"localhost:29092"}, + Topic: "error-topic", + StartTimeCron: "*/1 * * * *", + WorkDuration: 50 * time.Second, + MaxRetry: 3, + }, + ConsumeFn: consumeFn, + }) + defer firstConsumer.Stop() + firstConsumer.Consume() + + secondConsumer, _ := kafka.NewConsumer(&kafka.ConsumerConfig{ + MetricPrefix: "second", + Reader: kafka.ReaderConfig{ + Brokers: []string{"localhost:29092"}, + Topic: "another-standart-topic", + GroupID: "another-standart-cg", + }, + RetryEnabled: true, + RetryConfiguration: kafka.RetryConfiguration{ + MetricPrefix: "second", + Brokers: []string{"localhost:29092"}, + Topic: "retry-topic", + StartTimeCron: "*/1 * * * *", + WorkDuration: 50 * time.Second, + MaxRetry: 3, + }, + ConsumeFn: consumeFn, + }) + defer secondConsumer.Stop() + + secondConsumer.Consume() + + allCollectors := append(firstConsumer.GetMetricCollectors(), secondConsumer.GetMetricCollectors()...) + StartAPI(allCollectors...) + + select {} +} + +func consumeFn(message *kafka.Message) error { + fmt.Printf("Message From %s with value %s\n", message.Topic, string(message.Value)) + return nil +} diff --git a/examples/with-metric-collector/metric.go b/examples/with-metric-collector/metric.go new file mode 100644 index 0000000..e7f68a7 --- /dev/null +++ b/examples/with-metric-collector/metric.go @@ -0,0 +1,16 @@ +package main + +import ( + "github.com/ansrivas/fiberprometheus/v2" + "github.com/gofiber/fiber/v2" + "github.com/prometheus/client_golang/prometheus" +) + +func NewMetricMiddleware(app *fiber.App, metricCollectors ...prometheus.Collector) (func(ctx *fiber.Ctx) error, error) { + prometheus.DefaultRegisterer.MustRegister(metricCollectors...) + + fiberPrometheus := fiberprometheus.New("konsumer-metrics") + fiberPrometheus.RegisterAt(app, "/metrics") + + return fiberPrometheus.Middleware, nil +} diff --git a/go.mod b/go.mod index 0da3f85..5ea7be0 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/Trendyol/kafka-konsumer/v2 go 1.19 require ( - github.com/Trendyol/kafka-cronsumer v1.4.7 + github.com/Trendyol/kafka-cronsumer v1.5.0 github.com/Trendyol/otel-kafka-konsumer v0.0.7 github.com/ansrivas/fiberprometheus/v2 v2.6.1 github.com/gofiber/fiber/v2 v2.50.0 diff --git a/go.sum b/go.sum index c7cba5b..3b8fade 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,9 @@ github.com/Trendyol/kafka-cronsumer v1.4.7 h1:xmjxSBJzRRkuaO8k0S4baePyVVLJf3apl7nRiMXFnUY= github.com/Trendyol/kafka-cronsumer v1.4.7/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.4.8-0.20240218154451-2072724685ea h1:YRA1X8yxH4PAsp++vufsTjuNpZYYAhnd1NVCqMJ7IlA= +github.com/Trendyol/kafka-cronsumer v1.4.8-0.20240218154451-2072724685ea/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.5.0 h1:MI0/ncHrlCvOV0Ro4h9avm2izsNprBw4QfabiSnzm0U= +github.com/Trendyol/kafka-cronsumer v1.5.0/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4= github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= diff --git a/test/integration/go.mod b/test/integration/go.mod index 89d79c1..1e0444d 100644 --- a/test/integration/go.mod +++ b/test/integration/go.mod @@ -10,7 +10,7 @@ require ( ) require ( - github.com/Trendyol/kafka-cronsumer v1.4.7 // indirect + github.com/Trendyol/kafka-cronsumer v1.5.0 // indirect github.com/Trendyol/otel-kafka-konsumer v0.0.7 // indirect github.com/andybalholm/brotli v1.0.5 // indirect github.com/ansrivas/fiberprometheus/v2 v2.6.1 // indirect diff --git a/test/integration/go.sum b/test/integration/go.sum index 5eb6b0d..5e87da8 100644 --- a/test/integration/go.sum +++ b/test/integration/go.sum @@ -1,5 +1,7 @@ github.com/Trendyol/kafka-cronsumer v1.4.7 h1:xmjxSBJzRRkuaO8k0S4baePyVVLJf3apl7nRiMXFnUY= github.com/Trendyol/kafka-cronsumer v1.4.7/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.4.8-0.20240218154451-2072724685ea/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= +github.com/Trendyol/kafka-cronsumer v1.5.0/go.mod h1:VpweJmKY+6dppFhzWOZDbZfxBNuJkSxB12CcuZWBNFU= github.com/Trendyol/otel-kafka-konsumer v0.0.7 h1:sT1TE2rgfsdrJWrXKz5j6dPkKJsvP+Tv0Dea4ORqJ+4= github.com/Trendyol/otel-kafka-konsumer v0.0.7/go.mod h1:zdCaFclzRCO9fzcjxkHrWOB3I2+uTPrmkq4zczkD1F0= github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=