Skip to content
Tim Harper edited this page Sep 3, 2015 · 14 revisions

v1.0.0-RC3

Message publish / confirm response changed!

When you publish a message, RabbitControl responds with Message.ConfirmResponse, rather than true or false.

All messages are assigned a sequential id.

Akka Stream MessagePublisherSink

The sink no longer generates messages for you.

Before:

AckedFlow[T].
  to(
    MessagePublisherSink[Int](
      rabbitControl,
      Message.factory(Publisher.queue(queueName))))

After:

AckedFlow[T].
  map(Message.queue(_, queueName)).
  to(MessagePublisherSink(rabbitControl))

This change allows the stream to dynamically determine message destination and properties from the stream element.

Config changes

op-rabbit.topic-exchange-name defaults to amq.topic.

rabbitmq config block is no longer recognized.

Message.apply change

Message.apply(publisher, data) is now Message.apply(data, publisher), to be consistent with the rest of the Message factory methods.

v1.0.0-RC2

I completely blew my commitment to keep API changes to a minimum from RC1 onward. As I was working on tutorial, there were several warts that just were nagging me, and I decided it would be best to make radical API changes while the RC label is still in tact.

This represents one of the largest API changes, of all. I'm seriously excited about it.

Message publisher changes

  • TopicMessage, QueueMessage, ExchangeMessage objects have become Message.topic, Message.queue, Message.exchange, etc.
  • ConfirmedMessage has become simply Message, as it is the default.
  • UnconfirmedMessage gained a symmetrical Factory function API (UnconfirmedMessage.topic, UnconfirmedMessage.factory, etc.)
  • QueuePublisher, ExchangePublisher, and TopicPublisher have become Publisher.topic, Publisher.exchange, Publisher.topic, etc.; There is only one Publisher instance. The factory methods are just for convenience and offer defaults that make sense for each of the scenarios.

TypedHeader extraction

TypedHeader has been introduced to describe message-headers constrained to a specific type (RabbitMQ headers are, otherwise, untyped and limited to a subset of primitives, Maps and Seqs and some others).

property(Header("my-custom-header").as[Int]) has become property(TypedHeader[Int]("my-custom-header")).

Queue / Exchange definition simplified

  • QueueBinding has become Queue, ExchangeBinding has become Exchange.
    • TopicBinding, HeadersBinding, FanoutBinding receive a Queue definition and an Exchange definition, rather than having the parameters for the creation of each flatten.
    • PassiveQueueBinding has become Queue.passive. Queue.passive can receive a Queue definition which will be used to declare the queue in the event that the queue in question doesn't already exist.
    • Exchange.passive has been created, similarly, can receive a non-passive Exchange definition.
    • Exchange definitions are generically typed; if you pass an Exchange[Exchange.Topic.Value] to a HeadersBinding, the compiler will yell at you.
  • Modeled Queue / Exchange arguments introduced, providing compiler-level safety for both argument names, types; Where a duration is concerned, the Modeled argument receives a FiniteDuration, which it maps to an integer of milliseconds.

Here's a complex example using the new Queue definition syntax, showing how easy it is to passively declare a queue, and create it if it doesn't exist.

    import Queue.ModeledArgs._
    Queue.passive(
      Queue(
        s"op-rabbit.retry.${queueName}",
        durable = true,
        arguments = Seq(
          `x-expires`(30 seconds),
          `x-message-ttl`(5 minutes),
          `x-dead-letter-exchange`(""), // default exchange
          `x-dead-letter-routing-key`(queueName))))

Akka Stream integration

ConfirmedPublisherSink and Sources lost their name argument.

Others

  • nack behavior has been simplified. It's no longer communicated as an exception, and providing a reason for message-rejection has been axed. It's gained the parameter requeue = false, or requeue = true.
  • RecoveryStrategy.nack has been simplified, also. It defaults to requeue = false.
  • RecoveryStrategy.limitedRedeliver uses a RabbitMQ queue with dead-letter forwarding options and a TTL to handle message retry. The messages still go to the back of the line, but the consumer is no longer slowed down in failure mode.
  • The Directives queue binding DSL has been updated to incorporate the above changes.
  • Subscription.register became Subscription.run; The same change applies for the register method on a Subscription definition instance.

v1.0.0-RC1

Overview

  • com.spingo.op_rabbit.consumer._ was moved into com.spingo.op_rabbit._; update your imports, accordingly.
  • The method of instantiating Subscription has changed, substantially. Subscription is now completely stateless.
  • Casting Header values in the Handler DSL has changed, and supports optionalProperty.

Subscription changes

Before, subscriptions were declared and registered like this:

val subscription = new Subscription {
  // A qos of 3 will cause up to 3 concurrent messages to be processed at any given time.
  def config = channel(qos = 3) {
    consume(topic("such-message-queue", List("some-topic.#"))) {
      body(as[Person]) { person =>
        // do work; this body is executed in a separate thread, as provided by the implicit execution context
        ack()
      }
    }
  }
}

rabbitControl ! subscription

subscription.initialized.foreach { _ =>
  println("Initialized!")
  subscription.close()
}

Subscription had a close() method and a closed property. This is no more.

The above is rewritten as follows for v1.0.0-RC1:

val subscription = Subscription {
  // A qos of 3 will cause up to 3 concurrent messages to be processed at any given time.
  def config = channel(qos = 3) {
    consume(topic("such-message-queue", List("some-topic.#"))) {
      body(as[Person]) { person =>
        // do work; this body is executed in a separate thread, as provided by the implicit execution context
        ack()
      }
    }
  }
}

val subscriptionRef = subscription.register(rabbitControl)

subscriptionRef.initialized.foreach { _ =>
  println("Initialized!")
  subscriptionRef.close()
}

Casting header values

Before, HeaderValues were cast using as follows:

property(Header("x-retries")).as(typeOf[Int])

This did not work for optionalProperty; the old way no longer works and the new way is shorter:

property(Header("x-retries").as[Int])
optionalProperty(Header("x-retries").as[Int])
Clone this wiki locally