Skip to content

Latest commit

 

History

History
83 lines (71 loc) · 3.76 KB

interceptors.md

File metadata and controls

83 lines (71 loc) · 3.76 KB

Interceptors

Interceptors are modules implemented as behaviours that allow plugin authors to intercept and modify AMQP methods before they are handled by the channel process. They were originally created for the development of the Sharding Plugin to facilitate mapping queue names as specified by users vs. the actual names used by sharded queues. Another plugin using interceptors is the Message Timestam Plugin which injects timestamps into message properties during basic.publish.

An interceptor must implement the rabbit_channel_interceptor behaviour. The most important callback is intercept/3 where an interceptor will be provided with the original AMQP method record that the channel should process, the AMQP method content, if any, and the interceptor state (see init/1). This callback should take the AMQP method that was passed to it, and the content, and modify it accordingly. For example, the Sharding Plugin will receive a basic.consume method, with a sharded queue called my_queue and it will map that name, to the appropriate shard for the client that issued the basic.consume command, for example: sharding: my_queue - 1. This means that if the channel received the following record:

#'basic.consume'{queue = <<"my_queue">>}

Then the interceptor will pass back the following transformed method:

#'basic.consume'{queue = <<"sharding: my_queue - 1">>}

This process is transparent to the user and to the channel code. There's no need by RabbitMQ core developers to add extra functionality to the rabbit_channel in order to support sharding, since the interceptors take care of that. For example, if we need to inject a timestamp into each message that crossed the broker, instead of modifying the rabbit_channel code to do that, we can just create a new interceptor for the basic.publish method, and there inject the desired timestamp.

Interceptors can do more than just modifying AMQP methods, they can also forbid its access. A good example is again the Sharding Plugin. If we have a sharded queue called my_queue then it won't make much sense to allow users to declare queues with that name, so the sharding interceptor also intercepts the queue.declare method, but in this case if the queue name provided matches that of a sharded queue, then a channel error is produced.

Keep in mind that while we can enable several interceptors, only one interceptor can intercept a particular AMQP method, otherwise we would need to define interceptors priorities, plus a way to merge the results of their invocations.

Enabling Interceptors

To enable interceptors, they have to be registered into the rabbit_registry, via a boot step:

-rabbit_boot_step({?MODULE,
                   [{description, "sharding interceptor"},
                    {mfa, {rabbit_registry, register,
                           [channel_interceptor,
                            <<"sharding interceptor">>, ?MODULE]}},
                    {cleanup, {rabbit_registry, unregister,
                               [channel_interceptor,
                                <<"sharding interceptor">>]}},
                    {requires, rabbit_registry},
                    {enables, recovery}]}).

Once the interceptor is registered, only new channels will use it to intercept AMQP methods. Channels that were already running won't load the interceptor. In a similar fashion, if a plugin that provides interceptors is disabled, then only new channels will stop using these interceptors.