Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: support static partitioning #163

Closed
haitjema opened this issue Dec 11, 2018 · 6 comments
Closed

Proposal: support static partitioning #163

haitjema opened this issue Dec 11, 2018 · 6 comments

Comments

@haitjema
Copy link

I know the ability to automatically rebalance is a key selling point of goka but if one had to run a goka processor with statically assigned partitions, what would be the easiest way to accomplish that? My first thought would be to generate an assignment notification and provide that groupConsumer but I don't think that will work since the assignment has to actually come from the group coordinator for the Sarama cluster client register the assignment so it can begin receiving & committing the offsets. The confluence consumer does seem to expose the Assign API but I wasn't sure whether using the confluence consumer is fully supported at this point. I could scrap using the group consumer all together and use the Sarama client to create partition consumers but that would seem to require doing a lot of violence to the frameowork so was wondering what your thoughts were. Thanks in advance!

@db7
Copy link
Collaborator

db7 commented Dec 12, 2018

@mhaitjema I think the decision about the partitions is done inside sarama-cluster's balancer.

Possibly, if you give deterministic clientIDs to your processor instances, you'll get a deterministic partitioning.

The strategy used by the balancer can be further configured using the consumer builder:

p, err := goka.NewProcessor(
 goka.DefineGroup(...),
 goka.WithClientID("id-1"),
 goka.WithConsumerBuilder(
   kafka.ConsumerBuilderWithConfig(config)))

config is a Sarama-cluster config object and there you can configure the strategy.

Does this solve your issue?

@haitjema
Copy link
Author

Thanks for the reply. Unfortunately in our context we actually need to prevent the possibility of a rebalance from occurring as we want to the same set of static partitions to always be used regardless of whether an instance drops off. Is there anyway to force the existing group consumer to accept a static assignment or would we need to use the confluent consumer (which supports the Assign API)?

@db7
Copy link
Collaborator

db7 commented Dec 12, 2018

Sorry, I don't know how to do that with sarama-cluster. But if you find a way, let us know.

If you want to use the confluent library, there is this code as starting point: https://github.com/lovoo/goka/tree/master/kafka/confluent

An alternative is to create a new consumer only based on sarama (no Sarama-cluster). You could then configure your processors with goka.WithConsumerBuilder(myConsumerBuilder([]partitions{0,2,4})).

@haitjema
Copy link
Author

Thanks, @db7 I think I will go the latter route.

@db7
Copy link
Collaborator

db7 commented Dec 13, 2018

Yes, sounds the safest. If you do that, consider contributing the code back to the project. I think this may be a non-standard use case, but still relevant. Thank you.

Before you get into the nasty implementation take a look at the interface that has to be implemented here:

// Consumer abstracts a kafka consumer
type Consumer interface {
	Events() <-chan Event

	// group consume assumes co-partioned topics
	// define input topics to consume
	Subscribe(topics map[string]int64) error
	// marks the consumer ready to start consuming the messages
	AddGroupPartition(partition int32)
	Commit(topic string, partition int32, offset int64) error

	// consume individual topic/partitions
	AddPartition(topic string, partition int32, initialOffset int64) error
	RemovePartition(topic string, partition int32) error

	// Close stops closes the events channel
	Close() error
}

It works somehow as follows:

  1. you create a processor
  2. you call Run(ctx) to start the processor. A processor goroutine starts and the caller go-routine blocks waiting processor goroutine to terminate.
  3. The processor goroutine calls Subscribe() and waits for events in Events()
  4. The consumer implementation guarantees an assignment event will be available in Events() channel and no other event is put into it.
  5. The processor goroutine gets the assignment and creates partition goroutines for each partition. Each partition goroutine first tries to recover calling AddPartition(groupTable, par)
  6. Each partition calls RemovePartition(groupTable, par) once the groupTable partition is recovered and then call AddGroupPartition(par) to indicate that it wants to start receiving messages from the streams.

The implementation of Subscribe(), AddGroupPartition() and Commit() is done in group_consumer.go and uses Sarama-cluster. The implementation of AddPartition() and RemovePartition() is done in simple_consumer.go and uses Sarama.

What you'll have to do is to reimplement the group_consumer.go. Some ideas:

  • When the ConsumerBuilder should give the fixed assignment to the consumer.
  • When Subscribe() is called, you'll need to push the fixed assignment into the Events() channel.
  • The consumer saves the list of topics passed to Subscribe().
  • When AddGroupPartition(par) is called you call AddPartition(t, par) for every topic in the saved list.

The challenge will be in the Commit(). You'll need to check in Sarama/Sarama-cluster how to commit and fetch the metadata back.

Hope that helps.

@db7 db7 changed the title Static partitioning Proposal: support static partitioning Dec 19, 2018
@frairon
Copy link
Contributor

frairon commented Feb 3, 2020

Since the issue is pretty old, I guess it's not such an important issue anymore. However, after upgrading to sarama's ConsumerGroup there's a config option https://godoc.org/github.com/Shopify/sarama#BalanceStrategy that looks like it enables static mapping when applied properly. Will close this for now. Feel free to reopen if it's still important.

@frairon frairon closed this as completed Feb 3, 2020
This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants