Skip to content

shubhang93/propel

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Propel [v0.5.1]

Propel is a wrapper over the Confluent Kafka Go library which enables you to consume Kafka events in a batch or run a throttled consumer which adapts to your processing speed without leaving the consumer group.

Features

  • Throttled consumer which adapts to your message processing speed unbounded by max.poll.interval.ms
  • Batch consumer which allows you to process a batch of messages bounded by max.poll.interval.ms
  • Small API footprint
  • Uses a single dependency (uses testify only for testing)

Throttled Consumer API

The Throttled Consumer API lets you consume a batch of messages in a single go, but it throttles the Kafka Consumer to adapt to your processing speed, which means your consumer will not get kicked out even your processing exceeds the max.poll.interval.ms

The Throttled Consumer is a good fit if your message processing is blocking in nature and high throughput processing is not your priority

Configuring the ThrottledConsumer

  	throtCon := throttled.Consumer{
		PollTimeoutMS:    2000,
		BatchSize:        200,
		KafkaConsumerConf: &conflgo.ConsumerConfig{
			BoostrapServers: "localhost:9092",
			GroupID:         "test_part_cons",
		},
		Logger: logger,
		StatsNotifier: func(notif stats.Notification) {
			// publish to a metric agent
		},
	}
	
    _= throtCon.RunFunc(context.Background(), func(ctx context.Context, records []record.Record) {
	    // application logic	    
    }, "test-topic")

Batch Consumer API

The Batch Consumer API lets you consume a batch of messages in a single go, the batch consumer API runs each consumer in the group in its own Goroutine. The Batch consumer is a good fit if your consumer processing is fast, and you have high throughput requirements

Configuring the BatchConsumer

	batCon := batch.Consumer{
		StatsNotifier: func(notif stats.Notification) {
			// publish to statsd OR prometheus
		},
		BatchSize: 200,
		KafkaConsumerConf: &conflgo.ConsumerConfig{
			BoostrapServers: "localhost:9092",
			GroupID:         "foo.id",
		},
		Count:           3, // Controls the concurrency, for best performance set it to the number of partitions
		ShutdownTimeout: 10000 * time.Millisecond,
		Logger:          slog.New(slog.NewTextHandler(os.Stdout, nil)),
	}
	
	_= batcCon.RunFunc(context.Background(), func(ctx context.Context, records []record.Record) {
	    // application logic	    
    }, "test-topic")

Note

The RunFunc panics if less than one topic is provided for both batch consumer and throttled consumer

Common API

propel.WorkerContext is used to get the worker-ID which invoked the handler, this is useful for debugging purposes only. The handler is fully decoupled from the worker

FAQs

What is the ConsumerConfig.ConfigMapOverride ?

The ConsumerConfig provides a way to specify the consumer config in a type safe way, ConfigMapOverride lets you directly specify a *kafka.ConfigMap to override extra configuration. For a full list of allowed configs, refer this.

What is the difference between the Run and RunFunc methods ?

The Run method allows you to take an implementation of the Handler interface, whereas the RunFunc takes in a plain Go function as the handler

What happens to the batch after the handler executes ?

The batch is valid only for one handler invocation, the batch is cleared for the next invocation and the messages get GC-ed as per the GC schedule

How are offsets committed ?

We use a combination of auto-commits and manual offset store management, after every handler invocation we store the next set of offsets to be committed in the next auto commit cycle.

How do I select an optimum batch size ?

Selecting a good batch size is about evaluating tradeoffs

  • A Smaller batch size is good if you want to limit the number of uncommitted messages, and want to limit the memory usage, as the batch is held in memory till processing completes.
  • A Larger batch size is good if you have a high throughput and memory is not a concern.
  • The batch itself can be concurrently processed for faster processing.
  • Prefer a smaller batch is your processing time is on the high end of the spectrum and could take higher than max.poll.interval.ms to complete.
  • Prefer a higher batch size if your processing time is very fast and is well within the max.poll.interval.ms
  • Last but not the least, measure, measure and measure to fine tune the batch size.

What is the ShutdownTimeout in the BatchConsumer struct?

  • Sometimes processing can hang up the consumer poll loop preventing it to ignore the shutdown signals. This can cause your process to block indefinitely and not honoring the SIGTERM/SIGINT
  • The ShutdownTimeout should be higher than the PollIntervalMS because the Poll loop by default blocks for PollTimeoutMS amount of time and cannot be cancelled, this is a known limitation of the Confluent Go Kafka Client.
  • The ShutdownTimeout should be avoided at all costs, instead make good use of the context.Context package to propagate timeouts and cancellation signals to your handler logic.

How do I shut down the consumer from the handler ?

  • Use the cancel function returned by the context.<WithFunc>, pass the cancel func to your handler or capture the reference using a closure. Remember, use context and proper timeouts in your handlers to provide proper deadlines, if your handler is blocking indefinitely, it is definitely a red flag in your application logic.

Why are the stats info handled through a notifier function and not a proper stats collector lib ?

There are a lot of stats collectors / agents, and we cannot decide for the user which the better agent is,instead we are leaving that responsibility to the stats libraries which will do it better than us. We want to keep the lib's API footprint as small as possible

Examples

  • For code usage refer the example directory
  • For full docs
godoc -http :8080
open http://localhost:8080/pkg/github.com/shubhang93/propel/

Using the stats.Notifier to record stats

The stats.Notifier receives a stats.Notification payload with the appropriate stats information.

Metric Name Type Example Value Label Values
poll_records_count int64 1 consumer_id,consumer_1
confluent_go_kafka_stats more info string {"key":"value"} N/A
worker_alive (only for throttled consumer ) int64 0/1 worker_id,worker#1

Note

Batch Consumers Only

Depending on the Count value we spin up Count number of consumers in its goroutine, which run their own poll loop and consume messages from Kafka. The intermediate batching layer batches the messages into the batch size specified by BatchSize parameter in the config. The default value is 500. The BatchSize value can be tweaked according to your requirements. Currently, we only support auto-commits, the offset management is managed by the library internally. You still have to process records within max.poll.interval.ms to avoid getting your consumer getting kicked out of the consumer group

Note

Throttled Consumers Only

The Throttled consumer works differently from the BatchConsumer, a single consumer is spun up which consumes from Kafka in one poll loop, messages are grouped based on a combination of topic and partition and is fed to the handler. The Throttled consumer pauses partitions until all records for a topic-partition are fully processed, this ensures that the poll is called as frequently as possible. You will not have to worry about your consumer getting kicked out due to slow consumption but this one comes with a cost, the ingestion throughput will suffer due to pausing and resuming of partitions. This consumer is better suited for blocking handler operations and not meant for fast consumption. All the concurrency is handled at the Goroutine level, records for each partition are handled in its own goroutine without spinning a physical consumer.

Benchmarks

Device: Macbook Air M1 2020

  8GB RAM

  256 GB SSD
Kafka Consumer Config:
  max.poll.interval.ms: 15s
  Poll Timeout: 100ms

Batch Consumer

Produce Count: 10_000 Message Key Max Size: 11 bytes Message Value Max Size: 5 bytes

Note

The memory will vary for different keys and values, so will the allocations. Most of the allocations are happening on the Confluent Go Kafka Lib

The below benchmark uses constant 10ms as the time taken for processing a batch of records.

goos: darwin
goarch: arm64
pkg: github.com/shubhang93/propel/batch
BenchmarkConsumer_RunFunc
BenchmarkConsumer_RunFunc/batch_size_1
BenchmarkConsumer_RunFunc/batch_size_1-8         	       1	76022596041 ns/op	14988048 B/op	  386652 allocs/op
BenchmarkConsumer_RunFunc/batch_size_100
BenchmarkConsumer_RunFunc/batch_size_100-8       	       1	4030724333 ns/op	 9633216 B/op	  186594 allocs/op
BenchmarkConsumer_RunFunc/batch_size_500
BenchmarkConsumer_RunFunc/batch_size_500-8       	       1	3522976500 ns/op	10023040 B/op	  184559 allocs/op
BenchmarkConsumer_RunFunc/batch_size_1000
BenchmarkConsumer_RunFunc/batch_size_1000-8      	       1	3423151125 ns/op	 9560160 B/op	  184259 allocs/op
BenchmarkConsumer_RunFunc/batch_size_5000
BenchmarkConsumer_RunFunc/batch_size_5000-8      	       1	3448683125 ns/op	 9764704 B/op	  183978 allocs/op
PASS

To use batching or not ?

If your processing logic does not support batch operations you will most likely not gain a lot of benefits from batching, but if your processing logic supports batch operations, your processing time drastically decreases, if we compare the first and second benchmark we are seeing a 94% decrease in the processing time while using batching, however we see diminishing returns with higher batch sizes. This benchmark can be used to select an optimum batch size. These numbers are for reference, whether your application will see the same amount of decrease in the processing time is subjective to your message Key Value content and the processing logic's complexity. However, these numbers serve as a good blueprint for optimising your workloads.

Throttled Consumer

The below benchmark doesn't use any sort of sleep in the handler

goos: darwin
goarch: arm64
pkg: github.com/shubhang93/propel/throttled
BenchmarkConsumer_RunFunc
BenchmarkConsumer_RunFunc/batch_size_10
BenchmarkConsumer_RunFunc/batch_size_10-8         	       1	68003927042 ns/op	 9500232 B/op	  224834 allocs/op
BenchmarkConsumer_RunFunc/batch_size_100
BenchmarkConsumer_RunFunc/batch_size_100-8        	       1	13771014584 ns/op	 8003480 B/op	  186456 allocs/op
BenchmarkConsumer_RunFunc/batch_size_500
BenchmarkConsumer_RunFunc/batch_size_500-8        	       1	7242461500 ns/op	 7920240 B/op	  182793 allocs/op
BenchmarkConsumer_RunFunc/batch_size_1000
BenchmarkConsumer_RunFunc/batch_size_1000-8       	       1	6327211292 ns/op	 7939696 B/op	  182280 allocs/op
BenchmarkConsumer_RunFunc/batch_size_5000
BenchmarkConsumer_RunFunc/batch_size_5000-8       	       1	4245761291 ns/op	 8258512 B/op	  181528 allocs/op
PASS

We see that there is a significant overhead with pausing and resuming of partitions. At higher batch sizes the overhead is significantly reduced, hence the throttled consumer isn't a good fit for high throughput workloads. It is a better fit for handlers which block.