diff --git a/pkg/events/handler.go b/pkg/events/handler.go index b8b697d..daf7891 100644 --- a/pkg/events/handler.go +++ b/pkg/events/handler.go @@ -39,12 +39,14 @@ type Handler interface { /* MatchTopicAndFormatsMessage matches the topic and formats the message. +**This is deprecated and should not be used.** + It takes the following parameters: - - ctx: the context.Context object for the function. - - decoder: the thunderEvents.EventDecoder object for decoding the message. - - referenceTopic: the reference topic string for matching. - - topic: the topic string to match against the reference topic. - - message: the message object to be formatted. + - ctx: the context.Context object for the function. + - decoder: the thunderEvents.EventDecoder object for decoding the message. + - referenceTopic: the reference topic string for matching. + - topic: the topic string to match against the reference topic. + - message: the message object to be formatted. It returns a pointer to the formatted message object and any error encountered. if the match fails, nil is returned. diff --git a/pkg/events/namedHandler.go b/pkg/events/namedHandler.go new file mode 100644 index 0000000..ab3719c --- /dev/null +++ b/pkg/events/namedHandler.go @@ -0,0 +1,35 @@ +package events + +import "go.uber.org/fx" + +// NamedHandler is a handler that has a queue name associated with it. +type NamedHandler interface { + QueuePosfix() string + Handler +} + +type namedHandler struct { + Handler + queuePosfix string +} + +func (n *namedHandler) QueuePosfix() string { + return n.queuePosfix +} + +// NewNamedHandlerFromHandler creates a new NamedHandler from a Handler. +func NewNamedHandlerFromHandler(handler Handler, queuePosfix string) NamedHandler { + return &namedHandler{ + Handler: handler, + queuePosfix: queuePosfix, + } +} + +// FxAnnotateNamedHandler provides a named handler with group tags. +// This is used to provide named handlers with the group tag `group:"named_handlers"`. +// It's not a type safe function, so it's up to the caller to provide the correct type. +func FxAnnotateNamedHandler(namedHandlerFunc interface{}) fx.Option { + return fx.Provide( + fx.Annotate(namedHandlerFunc, fx.As(new(NamedHandler)), fx.ResultTags(`group:"named_handlers"`)), + ) +} diff --git a/pkg/events/rabbitmq/consumer.go b/pkg/events/rabbitmq/consumer.go index 0b8abe6..95059ab 100644 --- a/pkg/events/rabbitmq/consumer.go +++ b/pkg/events/rabbitmq/consumer.go @@ -11,8 +11,28 @@ import ( "go.uber.org/fx" ) -func NewRabbitMQConsumer(logger *zerolog.Logger, opts ...rabbitmq.RabbitmqConfigOption) (events.EventConsumer, error) { - return consumer.NewConsumer(amqp091.Config{}, logger, opts...) +type namedHandlerParams struct { + fx.In + NamedHandlers []events.NamedHandler `group:"named_handlers"` +} + +func registerNamedConsumers(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.Logger, params namedHandlerParams) { + for _, namedHandler := range params.NamedHandlers { + registerNamedConsumer(lc, s, logger, namedHandler) + } +} + +func registerNamedConsumer(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.Logger, namedHandler events.NamedHandler) { + consumer, err := NewRabbitMQConsumer(logger, WithQueueNamePosfix(namedHandler.QueuePosfix())) + if err != nil { + logger.Error().Err(err).Msg("failed to create consumer") + err = s.Shutdown() + if err != nil { + logger.Error().Err(err).Msg("failed to shutdown") + } + } + + registerProvidedConsumer(lc, s, logger, namedHandler, consumer) } func registerConsumer(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog.Logger, handler events.Handler) { @@ -90,6 +110,10 @@ func registerProvidedConsumer(lc fx.Lifecycle, s fx.Shutdowner, logger *zerolog. ) } +func NewRabbitMQConsumer(logger *zerolog.Logger, opts ...rabbitmq.RabbitmqConfigOption) (events.EventConsumer, error) { + return consumer.NewConsumer(amqp091.Config{}, logger, opts...) +} + // A module that provides a RabbitMQ consumer. // The consumer will be automatically started and stopped gracefully. // The consumer will subscribe to the provided topics. @@ -103,3 +127,11 @@ var InvokeConsumer = fx.Invoke( var InvokeProvidedConsumer = fx.Invoke( registerProvidedConsumer, ) + +var InvokeNamedConsumers = fx.Invoke( + registerNamedConsumers, +) + +var InvokeNamedConsumer = fx.Invoke( + registerNamedConsumer, +)