Elixir Machine Learning: Clustering, Bumblebee, and Structured Prompting

A bumblebee on a cluster of purple flowers
Sean Moriarity

Machine Learning Advisor

Sean Moriarity

Whether you need machine-learning capabilities or the right tools to reach thousands of concurrent users, we can help. Book a consultation to learn how our team can help you find digital product success.

One of the things that I love about Elixir is that I am consistently discovering new things about the language. Do you know the feeling you get as a programmer when everything clicks about some technology, and it suddenly opens up a new world of unexplored possibilities? That is the feeling I get working with Elixir. Even after several years of learning the language, there are still many parts that I haven’t explored, and many new exciting possibilities to learn about.

Recently, two things in particular have opened my eyes to a world of new possibilities:

  1. Clustering, and in particular Nx.Serving distributed-by-default design
  2. Instructor, and in particular instructor_ex and the world of structured prompting

In this blog post, we’ll explore both, and demonstrate how we can use Elixir to build a creative text-to-montage video generator.

A Gentle Introduction to Structured Prompting

Large language models enable entirely new types of applications; however, they also introduce a lot of engineering challenges. Guiding large language models in production is difficult. Additionally, open-ended, non-deterministic generations are difficult to bridge with the schemas and assumptions built in to legacy software. The most common application of a large language model in production consists of wrapping a model behind a chat-based interface that connects to some retrieval system backed by data in a business niche. Some of these applications are popular; however, in many cases, a chat-based interface isn’t the most optimal solution for solving a customer’s problems. Chat-based interfaces are really scratching the surface of what these models are capable of.

Viewing large language models as just a means of interpreting data and communicating knowledge is a strong limiting belief about the potential of the technology. I also held this limiting belief until I was introduced to Instructor. Instructor is a Python library for performing structured extraction based on OpenAI’s function-calling API. Initially, the function-calling API was intended for use as a means to connect large language models to external tools. You provide the large language model with a list of available functions and their parameters, and it generates a JSON representation of the appropriate function-call for you based on the input prompt.

The function calling API can be extended as a means of using large language models to generate structured outputs. Because it produces valid JSON according to the schema you specify, you can use function calling as a bridge between your application’s schemas and a large language model. In the Python ecosystem, the instructor library achieves this using Pydantic models. You declare your schema using Pydantic’s API, provide that as a response_model to OpenAI’s chat completion endpoint, and the library will coerce the chat completion response into the response model provided. In the Elixir ecosystem, instructor_ex does the same, but instead relies on Ecto Schemas.

Ecto is the standard data modeling framework in Elixir. If you have any production Elixir application, you likely use Ecto. instructor_ex gives us a bridge between your application’s existing schemas and the capabilities of a large language model. For example, let’s say we have an SMS-based application that collects addresses from a user’s text messages. This is a task that LLMs are well-suited for. With instructor_ex we can declare a schema like this:

defmodule Address do
  use Ecto.Schema

  embedded_schema do
    field :street, :string
    field :state, :string
    field :city, :string
    field :zip, :string
  end
end

Then we can create a function that uses an LLM to extract addresses from an input:

extract_address = fn text ->
  Instructor.chat_completion(
    model: "gpt-3.5-turbo",
    response_model: Address,
    max_retries: 3,
    messages: [
      %{
        role: "user",
        content: """
        Your purpose is to extract addresses from a given text
        message.

        Extract an address from the following: #{text}
        """
      }
    ]
  )
end

And call it:

{:ok, %Address{} = address} = extract_address.("I live at 1 Main Street, Beverly Hills, CA 90210")

Because we’re using Ecto schemas, we can build additional validations into the extract process as well. For example, we might have a validation that checks the Google Maps API to validate that the provided address is valid. It provides a straightforward way to interop LLMs with our application.

Beyond Data Extraction

With structured prompting and instructor_ex we can easily integrate LLMs into our existing applications for complex extraction tasks; however, if you’ve interacted with LLMs in the past, you know they are capable of much more than just data extraction. Structured prompting with instructor_ex enables an entirely new class of software applications that combine the best of Software 1.0 and Software 2.0. For example, LLMs are very good at creative generation tasks. This proves useful for open-ended, unstructured tasks such as creative writing; however, we can extend this to the world of structured outputs. As long as I have a schema that models what I want, I can use an LLM to generate it. For example, I’ve always loved the photo montages that Apple compiles for you automatically on your iPhone based on your pictures. What if we could do the same thing with just a simple prompt? With instructor_ex and a little computer vision, we can!

First, consider that we can model a video montage as just a series of frames where each frame consists of an image and a duration, as well as some background music:

defmodule Montage.Video do
  use Ecto.Schema

  defmodule Frame do
    use Ecto.Schema

    @doc """
    ## Field Descriptions:
    - image: A description of the frame background image. For example,
      "a dog sitting"
    - duration: The duration in seconds of the frame.
    """
    @primary_key false
    embedded_schema do
      field :image, :string
      field :duration, :integer
    end
  end

  @doc """
  ## Field Descriptions:
  - music: A description of the type of music to overlay
  - frames: An array of frames
  """
  @primary_key false
  embedded_schema do
    field :music, :string

    embeds_many :frames, Montage.Video.Frame
  end
end

Notice that I can provide documentation for my schema. instructor_ex will use this documentation as context to guide the LLMs generations. Now I can simply ask an LLM to generate a video for me:

make_video = fn description ->
  Instructor.chat_completion(
    model: "gpt-3.5-turbo",
    response_model: Montage.Video,
    max_retries: 3,
    messages: [
      %{
        role: "user",
        content: """
        You are a creative director. Your job is to take a short description
        of a video montage and generate a beautiful video. The video montage
        consists only of images and some background music. When specifying images
        and music, you should specify a detailed description of the frame, not
        specific file names. For example, "a picture of a car" instead of "car.jpg".

        Create a video from the following description: #{description}
        """
      }
    ]
  )
end

Now if I run this in iex I’ll get something like:

iex> make_video.("A funny video of my dog")
{:ok,
 %Montage.Video{
   music: "funny music",
   frames: [
     %Montage.Video.Frame{
       image: "a picture of my dog wearing sunglasses",
       duration: 5
     },
     %Montage.Video.Frame{
       image: "a picture of my dog chasing its tail",
       duration: 3
     },
     %Montage.Video.Frame{
       image: "a picture of my dog playing with a ball",
       duration: 4
     }
   ]
 }}

Woot! We have a representation of a video with just a simple prompt and some Ecto schemas! This is pretty cool, but we haven’t actually generated a video yet. We still need to convert this schema into something usable. Fortunately, we can use Bumblebee for that.

Turning our Video Struct into Real Video

Our video struct gives us a simple framework for what our video should look like, but we still need a way to convert it into actual video. For this task, we’ll use Bumblebee and CLIP to find images that most closely match the description provided in each frame. This is a basic image search task. First, we can define embedding servings for computing text and image embeddings from input images and texts. First our image serving:

defmodule Montage.ImageEmbedding do
  @model_repo "openai/clip-vit-base-patch32"

  def serving() do
    {:ok, clip} = Bumblebee.load_model({:hf, @model_repo},
      module: Bumblebee.Vision.ClipVision,
      architecture: :for_embedding
    )
    {:ok, featurizer} = Bumblebee.load_featurizer({:hf, @model_repo})
    Bumblebee.Vision.image_embedding(clip, featurizer,
      defn_options: [compiler: EXLA],
      embedding_processor: :l2_norm,
      output_attribute: :embedding
    )
  end

  def predict(image) do
    Nx.Serving.batched_run(__MODULE__, image)
  end
end

Then our text serving:

defmodule Montage.TextEmbedding do
  @model_repo "openai/clip-vit-base-patch32"

  def serving() do
    {:ok, clip} = Bumblebee.load_model({:hf, @model_repo},
      module: Bumblebee.Text.ClipText,
      architecture: :for_embedding
    )
    {:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, @model_repo})

    Bumblebee.Text.text_embedding(clip, tokenizer,
      defn_options: [compiler: EXLA],
      embedding_processor: :l2_norm,
      output_attribute: :embedding
    )
  end

  def predict(query) do
    Nx.Serving.batched_run(__MODULE__, query)
  end
end

Then we can create an ImageSimilarity module, which has a single predict function that takes a list of images and returns the most similar one given a query:

defmodule Montage.ImageSimilarity do

  @doc """
  Predicts the most similar image to the given text.

  Images provided as a filepath.
  """
  def predict(images, text) do
    image_tensors = Enum.map(images, fn image ->
      image
      |> StbImage.read_file!()
      |> StbImage.to_nx()
    end)

    image_embeddings = Nx.Serving.batched_run(Montage.ImageEmbedding, image_tensors)
    image_embeddings = Nx.stack(Enum.map(image_embeddings, & &1.embedding))
    %{embedding: text_embedding} = Nx.Serving.batched_run(Montage.TextEmbedding, text)

    similarities = Nx.dot(image_embeddings, Nx.squeeze(text_embedding))

    similarities
    |> Nx.argmax()
    |> Nx.to_number()
    |> then(&Enum.at(images, &1))
  end
end

Note that we need to add our servings to our application supervision tree:

children = [
  # ...
  {Nx.Serving,
   name: Montage.TextEmbedding,
   batch_size: 8,
   batch_timeout: 50,
   serving: Montage.TextEmbedding.serving()},
  {Nx.Serving,
   name: Montage.ImageEmbedding,
   batch_size: 8,
   batch_timeout: 50,
   serving: Montage.ImageEmbedding.serving()},
]

The image similarity pipeline will take a list of image names and a query, compute an embedding matrix, and then use cosine similarity to return the most probable one. The pipeline itself is pretty naive, but it works well enough. I can use this to find the most similar image to the description provided in the input frame. Now we can write a function that processes each frame and extracts the appropriate filename associated with each. First, let’s create a function that takes our prompt and generates the outline:

defmodule Montage.Maker do
  
  def generate_frames(user_prompt) do
    Instructor.chat_completion(
      model: "gpt-3.5-turbo",
      response_model: Montage.Video,
      max_retries: 3,
      messages: [
        %{
          role: "user",
          content: """
          You are a creative director. Your job is to take a short description
          of a video montage and generate a beautiful video. The video montage
          consists only of images and some background music. When specifying images
          and music, you should specify a detailed description of the frame, not
          specific file names. For example, "a picture of a car" instead of "car.jpg".

          Create a video from the following description: #{description}
          """
        }
      ]
    )
  end
end

Then we can get our frames converted into actual filenames and durations:

def convert_frames(%Montage.Video{} = video) do
  images = load_images()

  video.frames
  |> Enum.map(fn %Montage.Video.Frame{image: image_query, duration: duration} ->
    filename = Montage.ImageSimilarity.predict(images, query)
    %{file: filename, duration: duration}
  end)
end

And, of course, we create our function, which loads images:

defp load_images() do
  Path.wildcard(Path.join([:code.priv_dir(:montage), "images", "pets", "*.jpg"]))
end

Then we can use a tool like ffmpeg to stitch the images into a video:

  def stitch_frames(frames) do
    # generate a video per slide
    frames
    |> Enum.with_index(fn %{file: fname, duration: duration}, i ->
      output_fname = "output_#{i}.mp4"

      System.cmd("ffmpeg", [
        "-loop",
        "1",
        "-i",
        fname,
        "-c:v",
        "libx264",
        "-t",
        "#{duration}",
        "-pix_fmt",
        "yuv420p",
        "-vf",
        "scale=1920:1080",
        output_fname
      ])

      %{file: output_fname, duration: duration}
    end)
    |> Enum.map_join("\n", fn %{file: fname, duration: duration} ->
      "file '#{fname}'\nduration #{duration}"
    end)
    |> then(&File.write!("schema.txt", &1))

    # concatenate them
    System.cmd("ffmpeg", [
      "-f",
      "concat",
      "-safe",
      "0",
      "-i",
      "schema.txt",
      "-c",
      "copy",
      "output.mp4"
    ])
  end

And now if we wrap all of this in a pipeline:

def create_montage(user_prompt) do
  user_prompt
  |> generate_frames()
  |> convert_frames()
  |> stitch_frames()
end

And now we can run it:

iex> Montage.Maker.create_montage("a silly video of pets")

And just like that, we’ve used Bumblebee and OpenAI to generate a video!

Clustering, just for fun

Of course, our end goal with this application would be to deploy it. For the desired performance, we’d probably want to use a GPU for our CLIP model. Thanks to the magic of Elixir, you can deploy this application to a hosting provider like Fly.io and use an on-prem GPU.

First, you’ll want to create a new Fly project and then connect to a Fly private network using this tutorial from their documentation. By connecting to the Fly private network, you can run a local instance of your application and it will be able to discover your remote application via DNS.

Once you have your private network set up, you can verify that you can connect to it in an iex session using:

iex --erl "-proto_dist inet6_tcp" --sname local --cookie my-app-cookie

Being sure that my-app-cookie is the same as the RELEASE_COOKIE specified in your Fly deployment. If everything is hooked up properly, you can now check for other nodes on the network by running:

ips = :inet_res.lookup(~c"montage.internal", :in, :aaaa)

Then you can connect to all found nodes with:

Enum.map(ips, &:inet.ntoa/1)
|> Enum.each(fn aaaa ->
  Node.connect(String.to_atom("montage@#{aaaa}"))
end)

And running Node.list() should result in a list of connected nodes appearing! One thing to keep in mind is that Fly apps will scale to zero on the free tier. That means your remote node will disconnect without any activity, so you’ll need to create a job that repeatedly attempts to connect to your remote. This is a naive example of one way you might do this:

defmodule Montage.Cluster do
  use GenServer
  require Logger

  @poll 5_000

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{})
  end

  @impl true
  def init(_) do
    do_cluster()
    {:ok, %{}}
  end

  @impl true
  def handle_info(:cluster, state) do
    :inet_res.lookup(~c"montage.internal", :in, :aaaa)
    |> Enum.map(&to_string(:inet.ntoa(&1)))
    |> Enum.each(fn aaaa ->
      name = "montage@#{aaaa}"
      if Node.connect(String.to_atom(name)) do
        Logger.info("Connected to #{name}")
      end
    end)

    do_cluster()

    {:noreply, state}
  end

  defp do_cluster do
    Process.send_after(self(), :cluster, @poll)
  end
end

For the purposes of this example, you only want your serving running locally. You also only want your clustering worker to run locally as well. You can achieve this with a simple conditional in your application based on an environment variable:

    children =
      if System.get_env("NODE_TYPE") == "worker" do
        [
          Montage.Cluster,
          {Nx.Serving,
            name: Montage.ImageSimilarity,
            serving: Montage.ImageSimilarity.serving(),
            batch_size: 8,
            batch_timeout: 5}
        ]
      else
        [
          MontageWeb.Telemetry,
          # Montage.Repo,
          {DNSCluster, query: Application.get_env(:phx_local_clustering, :dns_cluster_query) || :ignore},
          {Phoenix.PubSub, name: Montage.PubSub},
          # Start the Finch HTTP client for sending emails
          {Finch, name: Montage.Finch},
          # Start a worker by calling: Montage.Worker.start_link(arg)
          # {Montage.Worker, arg},
          # Start to serve requests, typically the last entry
          MontageWeb.Endpoint
        ]
      end

And that’s really all we need to change in our application! Now anytime we call Nx.Serving.batched_run inside our application, it will use our local node with the GPU. This is because Nx.Serving is distributed by default. It looks for instances in the cluster with a running serving with the given name and executes on one of those nodes. This makes it incredibly easy to manage a node dedicated to inference, either by clustering two different applications or applying some simple logic as shown above! Additionally, as your workload increases, you can easily scale up the number of inference/worker nodes, and Nx.Serving will work exactly as it did before! No changes necessary!

Conclusion

In this post, we had some fun with structured prompting, clustering, and Bumblebee. I hope this brief introduction to all of these topics inspires you to think about the amazing possibilities of working with machine learning on the BEAM. There is so much potential out there yet to be explored! Until next time!

Newsletter

Stay in the Know

Get the latest news and insights on Elixir, Phoenix, machine learning, product strategy, and more—delivered straight to your inbox.

Narwin holding a press release sheet while opening the DockYard brand kit box