Skip to content
Pablo Cantero edited this page Jul 1, 2017 · 12 revisions

Basic middleware

class MyMiddleware
  def call(worker_instance, queue, sqs_msg, body)
    puts 'Before work'
    yield
    puts 'After work'
  end
end

Registering a global middleware

Shoryuken.configure_server do |config|
  config.server_middleware do |chain|
    chain.add MyMiddleware
    # chain.remove MyMiddleware
    # chain.add MyMiddleware, foo: 1, bar: 2
    # chain.insert_before MyMiddleware, MyMiddlewareNew
    # chain.insert_after MyMiddleware, MyMiddlewareNew
  end
end

Registering per worker middleware

class MyWorker
  include Shoryuken::Worker

  def perform(sqs_mg, body)
    # ...
  end

  server_middleware do |chain|
    # This will join all "global" middleware with `MyWorkerSpecificMiddleware`
    # if you want to run only `MyWorkerSpecificMiddleware` for this worker
    # you can `chain.clear`
    # or to remove specific middleware for this worker you can `chain.remove OtherMiddleware`
    chain.add MyWorkerSpecificMiddleware
  end
end

Rejecting messages with a middleware

If you don't yield in a Middleware, it will reject the message.

class RejectInvalidMessagesMiddleware
  def call(worker_instance, queue, sqs_msg, body)
    if valid?(sqs_msg)
      # will consume the message
      yield
    else
      # will not consume the message
      Shoryuken.logger.info "sqs_msg '#{sqs_msg.id}' is invalid and was rejected"
      sqs_msg.delete
    end
  end
end

Be careful with batchable workers

When batchable workers are used the sqs_msg and body arguments are arrays.

class DoSomethingMiddleware
  def call(worker_instance, queue, sqs_msg, body)
    # if you want to skip batchable workers
    # if sqs_msg.is_a? Array
    #   yield
    #   return
    # end

    # if you want to process batchable and not batchable in the same way
    Array(sqs_msg).each_with_index do |current_sqs_msg, index|
      current_body = body[index]

      # do_something(current_sqs_msg, current_body)
    end

    yield
  end
end