14 Mar 2023 · Software Engineering

    Creating Data Pipelines With Elixir

    10 min read
    Contents

    Data pipelines are a series of processes that move data from one stage of processing to the next. These processes can include collecting, cleaning, transforming, and storing or analyzing data.

    The goal of a data pipeline is to efficiently and effectively move data through these stages to make it available for further analysis or use.

    As developers, being aware of how to use and implement data pipelines can be invaluable in today’s ecosystem; for example:

    • Data pipelines are an effective way to manage and process large amounts of data.
    • Data pipelines are essential for building data-driven applications. Many applications need to implement things like personalization, analytics, etc.
    • Data pipelines are a key component in distributed systems and are becoming increasingly common as the industry adopts this architecture more frequently.

    When implementing data pipelines, there are many options available to developers. This article doesn’t aim to be a full guide of all the options, rather it focuses on understanding how Elixir and its ecosystem can be used to implement data pipelines directly without relying on complex systems like Kafka.

    Why Elixir?

    Elixir is a good language for implementing data pipelines because it is designed for concurrent and parallel processing. This means that it can handle multiple tasks simultaneously, making it well-suited for handling large amounts of data.

    Additionally, Elixir has built-in support for functional programming, which allows for clear and expressive code that is easy to reason about and test. The language also has a robust ecosystem of libraries and frameworks that can be leveraged for data processing and pipeline management tasks.

    Furthermore, Elixir runs on the Erlang VM, which is known for its reliability and fault-tolerance, making it ideal for data pipelines that require high availability and performance. Additionally, Elixir’s message-passing concurrency model allows for easy distribution and scaling of data pipelines, enabling them to handle large amounts of data and traffic.

    In this tutorial, we will show you how to use the Flow library in Elixir to build a data pipeline. We will cover the following topics:

    • Setting up the environment and installing the necessary dependencies
    • Creating a new Elixir project and configuring it for data processing
    • Defining the stages of the pipeline using the Flow library
    • Building and running the pipeline
    • Testing and debugging the pipeline

    Setting up a data pipeline

    Before we begin, it is assumed that you have a basic understanding of Elixir and functional programming concepts. If you are new to Elixir, it is recommended that you familiarize yourself with the basics before proceeding.

    The complete code for this tutorial can be found in the GitHub repository.

    Setting up the environment

    To use the Flow library, you will need to have Elixir and Erlang installed on your machine. You can download the latest versions of these languages from the official Elixir website.

    Once you have Elixir and Erlang installed, you can create a new Elixir project by running the following command in your terminal:

    $ mix new catalog_pipeline

    This will create a new directory called “catalog_pipeline” that contains the basic structure of an Elixir project.

    Next, you will need to add the Flow library; and a few extra libraries to handle web requests as dependencies to your project. You can do this by ensuring the deps function of your mix.exs file looks like:

    defp deps do
      [
        {:flow, "~> 1.0"},
        {:httpoison, "~> 1.7"},
        {:poison, "~> 4.0"},
      ]
    end

    Then, run the following command in your terminal to install the dependency:

    $ mix deps.get

    Defining the stages of the pipeline

    Now that we have the Flow library installed, we can start defining the stages of our pipeline. The Flow library provides a set of pre-built stages for common data processing tasks such as filtering, mapping, and reducing. These stages can be used out of the box, or developers can create their own custom stages for more specific tasks.

    For the purposes of this tutorial, we will use the Flow library to build a data pipeline that extracts product data from dummyjson.com, transforms it into a tuple with description, title, and price, validates certain fields and finally prepare the data to be loaded into a database or a csv. The pipeline will consist of several stages, as detailed in the following sections

    Data Extraction

    The first step in building a data pipeline using Elixir and the Flow library is to extract the data that we want to process. This can involve a variety of tasks, such as fetching data from a database, web service, or file system.

    One way to extract data using Flow is to use the Flow.from_enumerable/2 function, which allows you to create a flow from any Enumerable data structure, such as a list or map. For example, you could extract data from a database table by querying it and then passing the results to Flow.from_enumerable/2.

    Let’s define the first stage and load product data from dummyjson.com by overwriting your lib/catalog_pipeline.ex file to look like:

    defmodule CatalogPipeline do
      use Flow
    
      def get_products do
        product = extract_product_data('https://dummyjson.com/products')
      end
    
      defp extract_product_data(url) do
        extracted_data =
          HTTPoison.get!(url)
          |> Map.get(:body)
          |> Poison.decode!()
          |> Map.get("products")
          |> Flow.from_enumerable()
      end
    end

    The first step is to define a module that will contain the pipeline; we will call it CatalogPipeline. Next, we will define a function called get_products that will return the product data. This function will call the extract_product_data function, which will use the Flow library to extract data from the dummyjson.com API.

    The extract_product_data function will use the HTTPoison library to make a GET request to the dummyjson.com API and then use the Poison library to decode the JSON response. The decoded data will be passed to the Flow.from_enumerable/2 function, which will create a flow from the data.

    You can already see the products within the Flow enumerable object by running $ mix compile and then $ iex -S mix. Within the interactive prompt, issue iex(1)> CatalogPipeline.get_products. Remember to Ctrl-C and (a)bort to get back to the shell.

    Data Transformation

    The next step in building a data pipeline is to transform the data that we’ve extracted. This can involve a variety of tasks, such as cleaning, filtering, or aggregating the data.

    One way to transform data using Flow is to use the Flow.map/2 function, which allows you to apply a function to each item in a flow and return a new flow with the results. For example, you could clean up the data by removing any fields that are not needed.

    To continue our example, we will define a new function called transform_product_data that will transform the data into a tuple with a description, title, and price:

    defmodule CatalogPipeline do
      use Flow
    
      def get_products do
        product = extract_product_data('https://dummyjson.com/products')
        |> transform_product_data()
      end
    
      defp extract_product_data(url) do
        extracted_data =
          HTTPoison.get!(url)
          |> Map.get(:body)
          |> Poison.decode!()
          |> Map.get("products")
          |> Flow.from_enumerable()
      end
    
      defp transform_product_data(extracted_data) do
        transformed_data = extracted_data
          |> Flow.map(fn product -> {product["description"], product["title"], product["price"]} end)
      end
    end

    This stage of the pipeline uses the following steps:

    • Flow.map to extract the title, description and price of each product and return them as a tuple.

    Data Quality Assurance

    Finally, it is important to check the quality of the data before it is loaded into the target system. In Elixir, we can use Flow.filter to filter out any invalid or incomplete data, or even remove any rows that do not have a valid price and description.

    defmodule CatalogPipeline do
      use Flow
    
      def get_products do
        product = extract_product_data('https://dummyjson.com/products')
        |> transform_product_data()
        |> validate_product_data()
      end
    
      defp extract_product_data(url) do
        extracted_data =
          HTTPoison.get!(url)
          |> Map.get(:body)
          |> Poison.decode!()
          |> Map.get("products")
          |> Flow.from_enumerable()
      end
    
      defp transform_product_data(extracted_data) do
        transformed_data = extracted_data
          |> Flow.map(fn product -> {product["description"], product["title"], product["price"]} end)
      end
    
      defp validate_product_data(transformed_data) do
        validated_data = transformed_data
          |> Flow.filter(fn {description, title, price} -> description != "" and price != "" end)
      end
    end

    Adding this stage for filtering the transformed data where the price is not available ensures that the final data is clean and usable, reducing the risk of errors downstream.

    Data Loading

    The final step in our pipeline is to load the transformed data into a target system. This can involve a variety of tasks, such as inserting the data into a database, writing it to a file, or publishing it to a message queue.

    For this last part of the example, we will parse the list of products into a list that we can either insert into a database or write to a file.

    defmodule CatalogPipeline do
      use Flow
    
      def get_products do
        product = extract_product_data('https://dummyjson.com/products')
        |> transform_product_data()
        |> validate_product_data()
        |> load_product_data()
        |> IO.inspect()
      end
    
      defp extract_product_data(url) do
        extracted_data =
          HTTPoison.get!(url)
          |> Map.get(:body)
          |> Poison.decode!()
          |> Map.get("products")
          |> Flow.from_enumerable()
      end
    
      defp transform_product_data(extracted_data) do
        transformed_data = extracted_data
          |> Flow.map(fn product -> {product["description"], product["title"], product["price"]} end)
      end
    
      defp validate_product_data(transformed_data) do
        validated_data =
          transformed_data
          |> Flow.filter(fn {description, title, price} -> description != "" and price != "" end)
      end
    
      defp load_product_data(validated_data) do
        product_list = validated_data
        |> Flow.map(fn {description, title, price} -> %{title: title, price: price, description: description} end)
        |> Enum.to_list()
      end
    end

    With this last stage, we are loading the data into a list, which depending on the application, can be used to insert the data into a database or write it to a file.

    If everything goes well, you should see a list of products printed on the console.

    [
      %{
        description: "An apple mobile which is nothing like apple",
        price: 549,
        title: "iPhone 9"
      },
      %{
        description: "SIM-Free, Model A19211 6.5-inch Super Retina HD display with OLED technology A12 Bionic chip with ...",
        price: 899,
        title: "iPhone X"
      },
      %{
        description: "Samsung's new variant which goes beyond Galaxy to the Universe",
        price: 1249,
        title: "Samsung Universe 9"
      },
      %{
        description: "OPPO F19 is officially announced on April 2021.",
        price: 280,
        title: "OPPOF19"
      },
      %{
        description: "Huawei’s re-badged P30 Pro New Edition was officially unveiled yesterday in Germany and now the device has made its way to the UK.",
        price: 499,
        title: "Huawei P30"
      },
      ...
    ]

    Conclusion

    This article taught you how to build a data pipeline using Elixir and Flow. You have seen how to extract data from an API, transform it, validate it, and load it into a target system. We have also seen how to use Flow to build a pipeline that can be easily extended and reused.

    However, we have only scratched the surface of what is possible to do with Elixir and Flow; there are many more features and capabilities than can be covered in this article. If you want to learn more about Elixir and Flow, you can check out the following resources:

    Finally, as you continue on your journey to use Elixir and Flow to build data pipelines, it is important to keep in mind that there are some key differences between data pipelines and ETL (Extract, Transform, Load) pipelines:

    • Data Pipelines is an umbrella term, and ETL pipelines are just a subset of data pipelines.
    • An ETL pipeline will always involve transforming the data, while a data pipeline may not.
    • Data pipelines run continuously, while ETL pipelines run only once or in batches.

    One thought on “Creating Data Pipelines With Elixir

    1. Thank you for writing this article. I wonder why functional programming languages aren’t more popular for doing this sort of stuff.

    Leave a Reply

    Your email address will not be published. Required fields are marked *

    Allan MacGregor
    Writen by:
    Allan is a software engineer and entrepreneur based in Canada; extremely passionate about functional programming. With 15 years of professional experience building project and innovative solutions both in and outside of the e-commerce space.
    Avatar
    Reviewed by:
    I picked up most of my soft/hardware troubleshooting skills in the US Army. A decade of Java development drove me to operations, scaling infrastructure to cope with the thundering herd. Engineering coach and CTO of Teleclinic.