Your users expect a rock-solid experience, every time. Elixir can help. Book a free consult to learn how we’ve helped teams like yours reach success with Elixir.
Processing large amounts of data can be difficult. Thankfully, the Elixir community has developed some great tools to help us handle the complexities of data ingestion, manipulation, and consumption in the form of GenStage and Broadway.
GenStage is an Elixir Hex package released in 2016 that provides a specification for a stream-based flow of data with back pressure support. It is used in systems where producers and consumers are decoupled and allows for dynamic demand of data from consumers.
Broadway is an Elixir Hex package released in 2019. It is a flexible toolkit for building data processing pipelines in Elixir. It is built on top of GenStage and is designed to work with high-throughput and low-latency data, providing support for backpressure, batching, and more.
Broadway is built to ingest data from one of many different external sources including RabbitMQ and Amazon SQS. But what if the source of our data is not an external message queue? What if it is internal to our Elixir application? Can we still use all of the features Broadway provides without sourcing our data from some external service?
The answer lies in Broadway’s support for custom producers. This involves defining a custom producer module that implements the GenStage behavior and then configuring the Broadway pipeline to use this custom producer. Let’s look at an example.
defmodule MyBroadway.CustomProducer do
use GenStage
@impl true
def init(opts) do
{:producer, opts}
end
@impl true
def handle_demand(demand, state) do
# Implement your data generation logic here
messages = generate_messages(demand)
{:noreply, messages, state}
end
defp generate_messages(demand) when demand <= 0, do: []
defp generate_messages(demand) do
# This is where you would typically query a database or an internal process
# to get data. In this example, we're just generating some fake data
Enum.map(1..demand, &broadway_transform/1)
end
defp broadway_transform(message) do
%Broadway.Message{
acknowledger: Broadway.NoopAcknowledger.init(),
data: message
}
end
end
defmodule MyBroadway do
use Broadway
def start_link(_init_arg) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
processors: [default: []],
producer: [module: {MyBroadway.CustomProducer, []}]
)
end
@impl true
def handle_message(_processor, message, _context) do
# Implement your data processing logic here
IO.inspect(message, label: "MESSAGE")
end
end
The MyBroadway.CustomProducer
process receives demand from the MyBroadway
pipeline and will retrieve the exact number of messages to meet that demand. An important thing to take note of is the broadway_transform/1
function that transforms the message into the format required by Broadway.
By using a custom producer we now have the flexibility to handle data from any source. Here is another example of a custom producer that can manage events as our application generates them. This producer will buffer the demand from Broadway while it waits for incoming messages via its insert/1
function.
defmodule MyBroadway.CustomProducer do
use GenStage
def insert(message) do
MyBroadway
|> Broadway.producer_names()
|> Enum.random()
|> GenStage.call({:insert, message})
end
@impl true
def init(_args) do
initial_demand = 0
{:producer, initial_demand}
end
@impl true
def handle_demand(incoming_demand, existing_demand) do
{:noreply, [], existing_demand + incoming_demand}
end
@impl true
def handle_call({:insert, _message}, _from, demand) when demand <= 0 do
{:reply, {:error, :pipeline_full}, [], demand}
end
def handle_call({:insert, message}, _from, demand) do
{:reply, :ok, [broadway_transform(message)], demand - 1}
end
defp broadway_transform(message) do
%Broadway.Message{
acknowledger: Broadway.NoopAcknowledger.init(),
data: message
}
end
end
Once the MyBroadway
pipeline from earlier has been started with this producer, we can use it by calling MyBroadway.CustomProducer.insert({:hello, :world})
. We should see our message ({:hello, :world}
) printed to the terminal via the IO.inspect/2
in the handle_message/3
callback.
One thing to note is our example custom producer does not handle acknowledgment of message success or failure, instead opting to use the Broadway.NoopAcknowledger
. For more information on handling the success and failure of messages see the Broadway docs on acknowledgment here and here.
By using a Custom GenStage Producer with a Broadway pipeline we can create a flexible and efficient data processing system that can handle large volumes of data with ease. This allows us to build powerful, real-time applications in Elixir that can scale to meet the demands of our users.