Make RabbitMQ Retries Easier with Delayed Message Exchange and Elixir's Broadway

Tags

RabbitMQ iconography in front of code

Introduction

If you ever needed to distribute work to multiple nodes then you probably used or considered using RabbitMQ. RabbitMQ is easily one of the most versatile and reliable message brokers available. As many know it’s so reliable because it’s built with Erlang and OTP. While RabbitMQ has many features, handling and retrying failed messages is not one of them. Recently I developed a solution that leverages core RabbitMQ features along with a plugin to develop a clear and concise retry pattern for failed or rejected messages. While this blog post uses Elixir’s Broadway for processing messages, the overall retry pattern can easily be applied to any other language or implementation. You can find the RetryBroadway code on Github.

Core Components

The first and most important component for handling failed or rejected messages is what RabbitMQ calls the Dead Letter Exchange (dlx). Please read the documentation for details. In short, a message can be republished to a dlx (an exchange) for several failure reasons like AMQP reject, nack, or TTL reached because the message did not get acknowledged.

The second component is a RabbitMQ plugin called Delayed Message Exchange Plugin (dm). The dm plugin adds additional functionality to a standard RabbitMQ exchange to delay the delivery to a queue. Essentially the plugin provides a mechanism to delay the retrying of a failed message. Delaying the reprocessing of a message is important because it allows for implementing backoff when retrying failed messages.

The third component is a retry pipeline (processor). The retry pipeline checks a queue for failed messages then republishes the message to the original exchange. Before republishing, the retry pipeline sets a x-delay delay header so the dm exchange knows how long to delay the message. Most importantly, the retry pipeline is in charge of deciding when to stop republishing the message. At some point retrying a failed message must stop and this pipeline provides a programmatic mechanism for deciding when to give up. Overall the pipeline provides a single location for the retry logic.

Workflow

In an effort to clarify the components described above, here’s a diagram that illustrates a happy path and a retry path. This blog post describes a workflow for jobs and users. While the diagram covers users, the logic applies to jobs as well.

For the “happy path”, when a user is updated in the front end application, a message is published to the main exchange. The first message has zero delays, so it is immediately routed to the user’s queue. The user’s processor in the worker app processes this message and produces some result. To be clear, the user’s processor can do anything with the result (store it in the database, publish it to the results queue). The above diagram illustrates one example where the result is sent back to the main exchange, again with zero delays, and is routed to the results queue, then processed by the front end application.

For the “retry path”, the first message follows the same path until it reaches the user’s processor in the worker app. Here, some processing failure occurs. Because the user’s queue has a dead letter exchange configured, the rejected message is republished to the retry exchange then routed to the retry queue. The retry processor examines the retry count and decides to republish it, this time with some delay and with its retry count incremented. After the delay, the message is routed again to the user’s queue for a second try. This process continues until either a result is successfully published, or the retry processor decides not to republish the message because the retry count reached the max retries

Lastly please note, that the demo code is not actually publishing to a results queue. For the sake of simplicity, the processors just print a message instead of publishing the result or storing it in a database.

Topology

This section covers the setup.ex mix task that builds out the topology for the demo.

# setup.ex
defmodule Mix.Tasks.Setup do
  @moduledoc false

  use Mix.Task

  import RetryBroadway.Topology

  @shortdoc "Setup queues and exchanges"
  def run(_) do
    IO.puts("Creating exchanges and queues!")

    # note this is a basic means of connecting to Rabbitmq.
    # for production, it's best to pool and manage connections / channels
    with {:ok, config} <- RetryBroadway.rabbitmq_config(),
         {:ok, conn} <- AMQP.Connection.open(config),
         {:ok, chan} <- AMQP.Channel.open(conn) do
      setup_rabbitmq(chan)

      # close connection and the channel
      AMQP.Connection.close(conn)

      IO.puts("Done!")
    end
  end

The start of the script is pretty much just plumbing. RetryBroadway.rabbitmq_config() is a helper to provide connection configuration for RabbitMQ. Then a connection and channel are opened. The channel is then passed to setup_rabbitmq where all the real setup work happens. At the end of it all the connection is closed which also closes the channel. Keep in mind this is a naive approach to connection management and is only intended for demonstration purposes.

# setup.ex continued

defp setup_rabbitmq(chan) do
   # messages are delayed after being retried / republished
   :ok =
     AMQP.Exchange.declare(chan, main_exchange(), :"x-delayed-message",
       arguments: [{"x-delayed-type", :longstr, "topic"}],
       durable: true
     )

   # useful to split out the retry exchange from the main exchange
   # allows for setting the dlx (dead letter exchange) for any rejected messages
   :ok = AMQP.Exchange.declare(chan, retry_exchange(), :topic, durable: true)

At the start of the function, the two exchanges are declared. The main exchange is a topic exchange that handles the routing of messages for work pipelines (processors). It also is a delayed message exchange (dm) which comes into play when the retry pipeline republishes a message with a x-delay header. The retry exchange is a simple topic exchange that serves as the dead letter exchange (dlx) and routes failed messages to the retry queue. Lastly, both exchanges are marked as durable so they will survive restarts or crashes.

# setup.ex continued
# setup_rabbitmq/1 continued

queues_topics = [
  {users_queue(), users_topic()},
  {jobs_queue(), jobs_topic()}
]

This data structure is just a convenient way to iterate through the setup of the queues and topics. The functions evaluate to this:

[
  {"users", "users.*"},
  {"jobs", "jobs.*"}
]
# setup.ex continued
# setup_rabbitmq/1 continued

# declare queues and explicitly set a dlx (dead letter exchange)
queues_topics
|> Enum.each(fn {queue, _topic} ->
  {:ok, _queue} =
    {:ok, _response} =
    AMQP.Queue.declare(chan, queue,
      arguments: [{"x-dead-letter-exchange", :longstr, retry_exchange()}],
      durable: true
    )
end)

Now, this code creates the jobs and users queue but also declares (creates) the dlx by setting the header x-dead-letter-exchange to retry by calling retry_exchange(). This important. Here at the queue level, the dlx is specified so any failing message is routed by RabbitMQ to the dlx. Which is really nice because it’s hard to mess up. Of course, it’s marked as durable.

# setup.ex continued
# setup_rabbitmq/1 continued

{:ok, _response} = AMQP.Queue.declare(chan, retry_queue(), durable: true)

Simple enough here. Create the retry queue and like everything else, it’s marked as durable.

# setup.ex continued
# setup_rabbitmq/1 continued

    # bind the queues to the exchanges by a topic (routing key)
    queues_topics
    |> Enum.each(fn {queue, topic} ->
      :ok = AMQP.Queue.bind(chan, queue, main_exchange(), routing_key: topic)
    end)

    # match any routing key of any length and route an message on the retry_exchange (dlx) to the retry queue
    :ok = AMQP.Queue.bind(chan, retry_queue(), retry_exchange(), routing_key: "#")
  end
end

Lastly, the queues are bound to the topics. The work queues are bound by jobs.* and users.* routing keys which mean a message of jobs.created or jobs.whatever is routed to the jobs queue. The same applies to users. Now the retry queue is bound by # which pretty well matches the topic. Read more on routing keys.

Handling Retries

Leading up to this section, we established that failed messages end up in a queue designated by the dlx. Once the messages are in the retry queue there needs to be a processor (pipeline) to republish the messages. Most importantly, the RetryProcessor checks determine when to stop retrying and determining the backoff between retries.

# retry_processor.ex
defmodule RetryBroadway.Processors.RetryProcessor do
  @moduledoc """
    Pipeline that checks the `retry` queue for messages to republish.
    The pipeline tries to republish until the `max_retries()` is reached.
    Also when republishing the message is sent to the main exchange (direct & delayed exchange) with a `x-delay` header.
    When the `x-delay` header (value is milliseconds) presents the `delayed exchange` will wait to publish the message.
  """
  use Broadway
  require Logger

  @type message_headers() :: list({name :: String.t(), type :: atom, value :: term})
  @type retry_header_data() :: %{retry_count: non_neg_integer | nil, index: integer}

  @delay_header_name "x-delay"
  @retry_header "x-retries"

  alias Broadway.Message
  alias RetryBroadway.Topology

The first part of the module basic setup. The use Broadway provides the tooling for processing messages from RabbitMQ. The module attribute @delay_header_name "x-delay" is the header that the dm exchange uses for delaying the routing of the retried message to the work queue. The other module attribute @retry_header "x-retries" tracks the number of times the message got retried. Only the RetryProcessor uses the x-retries to determine when to stop retrying the failed message. Thus encapsulating the retry logic into a single place RetryProcessor.

# retry_processor.ex continued
  def start_link(_opts) do
    {:ok, config} = RetryBroadway.rabbitmq_config()
    queue = Topology.retry_queue()

    {:ok, _pid} =
      Broadway.start_link(__MODULE__,
        name: RetryProcessor,
        producer: [
          module:
            {BroadwayRabbitMQ.Producer,
             on_failure: :reject,
             metadata: [:routing_key, :headers],
             queue: queue,
             connection: config,
             qos: [
               prefetch_count: 50
             ]},
          concurrency: 2
        ],
        processors: [
          default: [
            concurrency: 1
          ]
        ]
      )
  end

The start_link/1 function handles the connection to RabbitMQ and configuring the specifics of the Broadway pipeline. Using the helper function {:ok, config} = RetryBroadway.rabbitmq_config(), it’s easy to pass connection information (host, port, etc) to Broadway to connect to RabbitMQ. Similarly, queue = Topology.retry_queue() is a convenient way to use the same names for queues, exchanges, etc across the application. For the most part Broadway.start_link/2 is a fairly standard boilerplate configuration. See the Broadway docs for more detail. Though there’s one part of the configuration that requires some depth: the on_failure: option.

Notice the on_failure: the setting is configured to reject a failed message. Effectively, Broadway is adhering to AMQP protocol and rejecting the failed message. Rejecting is essential to enable the dlx and retry workflow (on work processors), and aborting the workflow in the RetryProcessor. If the default setting:reject_and_requeue were to be used, the message could get stuck in an endless loop! Any failure with :reject_and_requeue configured results in immediate requeueing of messages without backoff (delay), or ability to track retries in order to abort! I can’t stress this point enough. While:reject_and_requeue is a sensible default for Broadway in general, in practice :reject_and_requeue could result in your application overloading and crashing in a very bad way. Not the good OTP way :) Make sure to consider the on_failure: configuration carefully.

# retry_processor.ex continued
 @impl true
  def handle_message(
        _,
        %Message{data: data, metadata: %{headers: headers, routing_key: routing_key}} = message,
        _
      ) do

The handle_message/3 function is required by Broadway to implement how to process messages. The function header serves to deconstruct the %Message{} into the various components. Soon you’ll see that the components are used to republish the message to the original exchange with the same routing keys and data.

# retry_processor.ex continued
    retry_data = %{retry_count: retry_count, index: _index} = get_retry_count(headers)

    if (retry_count || 0) < max_retries() do
      {:ok, exchange} = find_exchange(headers)

As we’ll cover the RetryProcessor module has various functions that extract and update header data. For example, get_retry_count/1 is a simple function that looks in the headers for x-retries and returns the number of retries attempted and the position of the header in the list. See @type retry_header_data() at the top of the module. With the retry_count extracted, there’s a condition to determine if the message should be retried. Next, another helper function find_exchange/1 finds the original exchange which is necessary to republish the message.

Note that the result is matched with a tagged tuple {:ok, exchange} = find_exchange(headers). If for some reason the exchange can’t be found, then the message can’t be republished. The resulting MatchError will crash the process and Broadway will reject the message (on_failure: :reject) preventing a retry on a message that will never succeed. This is a good thing. No sense in retrying a failed message if it can’t be republished.

# retry_processor.ex continued
      updated_headers =
        headers
        |> update_retry_count(retry_data)
        |> update_retry_delay()

Here the headers extrated from %Message{} has the x-retries and the x-delay header values set or increased. The updated_headers is used when republishing.

# retry_processor.ex continued
  data = fake_retry_behaviour(data, retry_count)

The only purpose of this function is to fake some sort of behavior for demonstration purposes. It has no purpose in a real world scenario.

# retry_processor.ex continued

      # note this is a basic means of connecting to Rabbitmq.
      # for production, it's best to pool and manage connections / channels
      with {:ok, config} <- RetryBroadway.rabbitmq_config(),
           {:ok, conn} <- AMQP.Connection.open(config),
           {:ok, chan} <- AMQP.Channel.open(conn) do
        # don't try to decode data. just republish as if the message was new
        AMQP.Basic.publish(chan, exchange, routing_key, data, headers: updated_headers)
        AMQP.Connection.close(conn)
      end

      message

This the end of the path where the message is retried. Here the same config is used to connect to RabbitMQ via the AMQP hex library. Then the message is republished with the updated_headers so the retry_count is maintained, and a delay is made by the dm exchange. The callback requires a message to be returned.

# retry_processor.ex continued

    else
      Logger.error("Max retries reached! \n #{inspect(message)}")
      # message is marked as failed. must return failed message.
      Message.failed(message, "Message reached max retries of #{max_retries()}")
    end
  end

This is the end of the handle_message/3 function where the if condition fails and retries are aborted. Using the Message.failed/2 is just good practice. It doesn’t serve much purpose in our usage other than documentation. I won’t go into great depth with the other functions. The functions are pretty small and should be understandable with the information provided.

Demonstrating Retries

In this section, I will walk through the basic pipelines (processors). Keep in mind the code is modified to demonstrate failures for retries. In this case, I’ll cover the JobsProcessor since the UsersProcessor is basically the same.

# jobs_processor.ex
defmodule RetryBroadway.Processors.JobsProcessor do
  # code removed for brevity

  def start_link(_opts) do
    {:ok, config} = RetryBroadway.rabbitmq_config()
    queue = Topology.jobs_queue()

    {:ok, _pid} =
      Broadway.start_link(__MODULE__,
        name: JobsProcessor,
        producer: [
          module:
            {BroadwayRabbitMQ.Producer,
             on_failure: :reject,
             metadata: [:routing_key, :headers],
             queue: queue,
             connection: config,
             qos: [
               prefetch_count: 1
             ]},
          concurrency: 2
        ],
        processors: [
          default: [
            concurrency: 1
          ]
        ]
      )
  end

This is almost identical to RetryProcessor. The only substantive difference is the queue being jobs. Again, on_failure: :reject enables usage of the dlx such that any failures route the message to the retry_exchange and processed by the RetryProcessor.

# jobs_processor.ex continued
  @impl true
  def handle_message(_, %Message{data: data} = message, _) do
    cond do
      data == "9" ->
        Logger.warn("Failing Jobs data with a value of 9")
        # fake a failure where the message is routed to the dlx (retry queue)
        # message is marked as failed. must return failed message.
        # this Broadway pipeline is configured to use AMQP.Basic.reject() for a failed message.
        # https://github.com/dashbitco/broadway_rabbitmq/blob/abeee81bbfdd7b562dbd5846cc1e63c9632c5180/lib/broadway_rabbitmq/producer.ex#L442
        # See `:on_failure` above
        Message.failed(message, "Faking a failure. Effectively AMQP.Basic.reject()")

      data == "100" ->
        IO.puts("Passing retry of Jobs data with a value of #{data} ")
        # retry pipeline process message but on retry changes the data to fake a passing retry
        message

      true ->
        # message processed successfully
        # this is where the message is processed more, or batched, or published to a
        # results / completed queue
        IO.puts("Passing Jobs data with a value of #{data} ")
        message
    end
  end
end

In this case, handle_message/3 is implemented to demonstrate failures and retries. In the first condition, Message.failed/2 is used to retry messages with a data payload of "9". Note as UserProcessor demonstrates, an exception results in the same retry workflow. See the demo code. The second condition fakes a scenario where the message passes on a second try. See the fake_retry_behaviour/2. Again, this a contrived example that allows for the demonstration of the retry workflow.

Wrapping Up

While this particular solution fits my use case, it may not be the proper solution for you. Blog post examples are great, but they aren’t always a “one size fits all” solution. There are a few caveats to this approach that might be a deal-breaker.

  1. You have to use the delayed exchange plugin with RabbitMQ. You may not be able to install that plugin on your production env. Last I checked, AWS SQS doesn’t allow for the delayed exchange plugin.
  2. The plugin itself has known limitations around performance limitations for hundreds of thousands of messages.
  3. If RabbitMQ crashes, the message can survive, but the delay timers won’t.

Ok, that’s enough explaining. Now it’s time to give it a try yourself and see the retry workflow in action. Take a look at the README for directions on how to start RabbitMQ and run the demos.

DockYard is a digital product consultancy specializing in user-centered web application design and development. Our collaborative team of product strategists help clients to better understand the people they serve. We use future-forward technology and design thinking to transform those insights into impactful, inclusive, and reliable web experiences. DockYard provides professional services in strategy, user experience, design, and full-stack engineering using Ember.js, React.js, Ruby, and Elixir. From ideation to delivery, we empower ambitious product teams to build for the future.