Skip to content

Commit

Permalink
Feat: Support custom routing key on specific event
Browse files Browse the repository at this point in the history
  • Loading branch information
giautm committed Apr 12, 2019
1 parent 15f96fd commit 9930462
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/google/uuid"
eh "github.com/looplab/eventhorizon"
"github.com/pkg/errors"

"github.com/giautm/eh-kafka/json"
)

Expand All @@ -22,10 +23,23 @@ var ErrHandlerNil = errors.New("handler can't be nil")
var ErrMatcherNil = errors.New("matcher can't be nil")
var ErrTimedOut = errors.New("cannot run handle: timeout")

type TopicProducer func(event eh.Event) string
type TopicProducer func(event eh.Event) (topic string, key string)

type TopicsConsumer func(event eh.EventHandler) []string

func DefaultTopicProducer(topic string) TopicProducer {
return func(event eh.Event) (string, string) {
return topic, event.AggregateID().String()
}
}

func DefaultTopicsConsumer(topic string) TopicsConsumer {
topics := []string{topic}
return func(event eh.EventHandler) []string {
return topics
}
}

// EventBus is an event bus that notifies registered EventHandlers of
// published events. It will use the SimpleEventHandlingStrategy by default.
type EventBus struct {
Expand Down Expand Up @@ -125,9 +139,8 @@ func (b *EventBus) PublishEvent(ctx context.Context, event eh.Event) error {
return err
}

topic := b.producerTopicFunc(event)
key := ([]byte)(event.AggregateID().String())
_, _, err = b.producer.SendMessage(ctx, topic, key, data)
topic, key := b.producerTopicFunc(event)
_, _, err = b.producer.SendMessage(ctx, topic, ([]byte)(key), data)
if err != nil {
return errors.Wrapf(err,
"could not publish event (%s) (%s)", event, ctx)
Expand Down

0 comments on commit 9930462

Please sign in to comment.