How to Create a Custom Broadway Producer Using GenStage

An image of a street sign that reads "Broadway"

Your users expect a rock-solid experience, every time. Elixir can help. Book a free consult to learn how we’ve helped teams like yours reach success with Elixir.

Processing large amounts of data can be difficult. Thankfully, the Elixir community has developed some great tools to help us handle the complexities of data ingestion, manipulation, and consumption in the form of GenStage and Broadway.

GenStage is an Elixir Hex package released in 2016 that provides a specification for a stream-based flow of data with back pressure support. It is used in systems where producers and consumers are decoupled and allows for dynamic demand of data from consumers.

Broadway is an Elixir Hex package released in 2019. It is a flexible toolkit for building data processing pipelines in Elixir. It is built on top of GenStage and is designed to work with high-throughput and low-latency data, providing support for backpressure, batching, and more.

Broadway is built to ingest data from one of many different external sources including RabbitMQ and Amazon SQS. But what if the source of our data is not an external message queue? What if it is internal to our Elixir application? Can we still use all of the features Broadway provides without sourcing our data from some external service?

The answer lies in Broadway’s support for custom producers. This involves defining a custom producer module that implements the GenStage behavior and then configuring the Broadway pipeline to use this custom producer. Let’s look at an example.

defmodule MyBroadway.CustomProducer do
  use GenStage

  @impl true
  def init(opts) do
    {:producer, opts}
  end

  @impl true
  def handle_demand(demand, state) do
    # Implement your data generation logic here
    messages = generate_messages(demand)

    {:noreply, messages, state}
  end

  defp generate_messages(demand) when demand <= 0, do: []

  defp generate_messages(demand) do
    # This is where you would typically query a database or an internal process
    # to get data. In this example, we're just generating some fake data
    Enum.map(1..demand, &broadway_transform/1)
  end

  defp broadway_transform(message) do
    %Broadway.Message{
      acknowledger: Broadway.NoopAcknowledger.init(),
      data: message
    }
  end
end
defmodule MyBroadway do
  use Broadway

  def start_link(_init_arg) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      processors: [default: []],
      producer: [module: {MyBroadway.CustomProducer, []}]
    )
  end

  @impl true
  def handle_message(_processor, message, _context) do
    # Implement your data processing logic here
    IO.inspect(message, label: "MESSAGE")
  end
end

The MyBroadway.CustomProducer process receives demand from the MyBroadway pipeline and will retrieve the exact number of messages to meet that demand. An important thing to take note of is the broadway_transform/1 function that transforms the message into the format required by Broadway.

By using a custom producer we now have the flexibility to handle data from any source. Here is another example of a custom producer that can manage events as our application generates them. This producer will buffer the demand from Broadway while it waits for incoming messages via its insert/1 function.

defmodule MyBroadway.CustomProducer do
  use GenStage

  def insert(message) do
    MyBroadway
    |> Broadway.producer_names()
    |> Enum.random()
    |> GenStage.call({:insert, message})
  end

  @impl true
  def init(_args) do
    initial_demand = 0

    {:producer, initial_demand}
  end

  @impl true
  def handle_demand(incoming_demand, existing_demand) do
    {:noreply, [], existing_demand + incoming_demand}
  end

  @impl true
  def handle_call({:insert, _message}, _from, demand) when demand <= 0 do
    {:reply, {:error, :pipeline_full}, [], demand}
  end

  def handle_call({:insert, message}, _from, demand) do
    {:reply, :ok, [broadway_transform(message)], demand - 1}
  end

  defp broadway_transform(message) do
    %Broadway.Message{
      acknowledger: Broadway.NoopAcknowledger.init(),
      data: message
    }
  end
end

Once the MyBroadway pipeline from earlier has been started with this producer, we can use it by calling MyBroadway.CustomProducer.insert({:hello, :world}). We should see our message ({:hello, :world}) printed to the terminal via the IO.inspect/2 in the handle_message/3 callback.

One thing to note is our example custom producer does not handle acknowledgment of message success or failure, instead opting to use the Broadway.NoopAcknowledger. For more information on handling the success and failure of messages see the Broadway docs on acknowledgment here and here.

By using a Custom GenStage Producer with a Broadway pipeline we can create a flexible and efficient data processing system that can handle large volumes of data with ease. This allows us to build powerful, real-time applications in Elixir that can scale to meet the demands of our users.

Newsletter

Stay in the Know

Get the latest news and insights on Elixir, Phoenix, machine learning, product strategy, and more—delivered straight to your inbox.

Narwin holding a press release sheet while opening the DockYard brand kit box