Skip to content
This repository has been archived by the owner on May 29, 2019. It is now read-only.

Exposing Service Contracts over Message Queue

Eugene Tulika edited this page Jan 8, 2018 · 1 revision

Every Magento service contract which is currently exposed as a Web API can be invoked with the asynchronous call via the Message Queue. This is part of Message Queue framework to support service contracts seamlessly, which allows to make invocations in background, separate time from invocation to processing, control the order of execution.

Catalog APIs exposed via Message Queue

One option to approach bulk API with the minimal effort is to expose Catalog APIs over the message queue. Every product entity which need to be persisted there should be sent as a separate message. It should be encoded in the same way as it is currently encoded for the Web API request. Message queue framework will decode the message and send it to consumer, which in this case will be Catalog Product service contract

Configuration

  • communication.xml should define topics and mapping to the service contracts. Schema indicates that the expected message format is the same as input arguments of the ProductRepository::save
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="catalog.product.create" schema="Magento\Catalog\Api\ProductRepositoryInterface::save">
    </topic>
</config>
  • queue_publisher.xml defines that the message will be published to RabbitMQ
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
    <publisher topic="inventory.counter.updated">
        <connection name="amqp" exchange="magento" />
    </publisher>
</config>
  • queue_topology.xml should define routing from exchange to the queue based on topic
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="magento" type="topic" connection="amqp">
        <binding id="defaultBinding" topic="catalog.product.create"" destinationType="queue" destination="catalog_product_create_queue"/>
    </exchange>
</config>
  • queue_consumer.xml defines the interface method which will receive the message, in our case save of ProductRepository
<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <consumer name="createProduct" queue="catalog_product_create_queue" connection="amqp" consumerInstance="Magento\Framework\MessageQueue\Consumer" handler="Magento\Catalog\Api\ProductRepositoryInterface::save"/>
</config>

Solved issues

It will allow to address couple blocking issues:

  1. System which creates product in Magento can send messages to the queue with own order and tempo. Magento will guarantee that products are either created or rejected and stays in the queue
  2. There is no need for parallel processing of the queue, just one consumer might process all the messages. This will guarantee that parallel product saves are not locking the database tables and allow to avoid deadlocks
  3. this approach is available in bare Magento 2.1+

Open Issues

  1. there is no way to report the status of operations. Rejected messages will be sitting in the queue. It can be easily solved by routing them to "failed message queue"
  2. there is now way to optimize mass product save operations. Every product will be saved one by one, invoking all the events and extension points
  3. single consumer should process the queue in order to avoid Deadlocks