17 Feb 2020 · Software Engineering

    Test-Driving a Stream-powered Elixir Library

    25 min read
    Contents

    Introduction

    Elixir is a recent language built on top of the battle-tested BEAM virtual machine, which is steadily gaining traction among companies and developers. It brings a modern and fresh perspective to the Erlang world, and with it we can easily build reliable and fault-tolerant applications. Other than that, one of Elixir’s main strengths is productivity.

    Goals of This Tutorial

    We will bootstrap csv, a generic and reusable library to parse and import CSV files. You will learn about:

    • Using Agents to fulfill testing needs,
    • Testing file IO,
    • How to leverage Streams and how they work internally, and
    • Parallelizing work on a pipeline.

    Let’s get started!

    Prerequisites

    For this article, you will need:

    • Basic knowledge about Elixir and ExUnit and
    • Elixir 1.5 installed on your computer.

    To get acquainted with ExUnit and the mechanics of Test-Driven Development, I recommend you to check out our Introduction to Testing Elixir Applications with ExUnit.

    Creating the CSV Application

    We will start with creating our project. Let’s call it admin:

    mix new admin

    Mix helpfully gives some hints on what to do next:

    * creating README.md
    * creating .gitignore
    * creating mix.exs
    * creating config
    * creating config/config.exs
    * creating lib
    * creating lib/test.ex
    * creating test
    * creating test/test_helper.exs
    
    Your Mix project was created successfully.
    You can use "mix" to compile it, test it, and more:
    
        cd test
        mix test
    
    Run "mix help" for more commands.

    As a sanity measure, let’s run the tests generated by Mix and make sure they pass:

    $ mix test
    ..
    
    Finished in 0.03 seconds
    2 tests, 0 failures
    
    Randomized with seed 464454

    Before advancing, do delete those tests because we won’t need them:

    rm -f test/csv_test.exs

    Test-driving the Import Module

    As top-down TDD suggests, let’s start with a high-level test describing the outcome we expect. An outline may be helpful:

    1. Receive a path to the CSV file,
    2. Parse rows and insert them into a repository, and
    3. Retrieve rows back from the repository and assert whether they match the CSV’s.

    Since this code is meant to work on any CSV definition, we will give it three arguments – a file path, a struct atom, and a list of expected headers. The struct will be used as a schema on each CSV row, and the headers will be used to make sure the file conforms to an expected structure.

    Using any schema is possible for this test, so let’s stick with something simple: a Site struct with two fields: name and url. As for the repository, its role will be to insert rows into a persistence layer, but let’s not worry about that for now.

    Given our plan, TDD leans toward picturing client code before it even exists:

    options = [schema: Csv.Schemas.Site, headers: [:name, :url]]
    Csv.Import.call("test/fixtures/sites.csv", options)

    Great, now we’re off to a good start with an integration test:

    # test/csv/import_test.exs
    
    defmodule Csv.ImportTest do
      use ExUnit.Case
      alias Csv.Schemas.Site
      alias Csv.Repo
    
      test "imports records of a csv file" do
        options = [schema: Site, headers: [:name, :url]]
    
        "test/fixtures/sites.csv" |> Csv.Import.call(options)
    
        assert [
          %Site{name: "Elixir Language", url: "https://elixir-lang.org"},
          %Site{name: "Semaphore Blog", url: "https://semaphoreci.com/blog"}
        ] = Repo.all
      end
    end

    After running it with mix test, a compile error will say Site module cannot expand to a struct. To fix it, create a struct with name and url fields:

    # test/support.exs
    
    defmodule Csv.Schemas.Site do
      defstruct name: '', url: ''
    end

    Next, require support.exs in test_helper.exs:

    # test/test_helper.exs
    
    ExUnit.start()
    
    Code.require_file "test/support.exs"

    Running our tests at this point should fail with the following error:

    1) test imports records of a csv file (Csv.ImportTest)
        test/csv/import_test.exs:6
        ** (UndefinedFunctionError) function Csv.Import.call/2 is undefined (module Csv.Import is not available)

    Creating the Main Module

    The error message hints at defining a Csv.Import.call/2 function. Let’s do the bare minimum to keep the flow coming:

    # lib/csv/import.ex
    
    defmodule Csv.Import do
      def call(_input_path, _options) do
      end
    end

    As expected, we hit another error when rerunning our tests: we don’t have a repository.

    1) test imports records of a csv file (Csv.ImportTest)
       test/csv/import_test.exs:6
       ** (UndefinedFunctionError) function Csv.Repo.all/0 is undefined (module Csv.Repo is not available)

    Creating an In-memory Repository with Agent

    Agents are simple wrappers around state. Among other utilities, they are perfect to simulate external boundaries such as databases, especially when all you need to do is set and retrieve some data. Most importantly, we want to avoid setting up a database because our library will delegate that responsibility away.

    In Elixir and Erlang, processes are commonly used to maintain state by running a recursive loop where data is repeatedly passed to the same function. Agents are abstractions over exactly that definition.

    Our Agent will run in a separate process and remember any state we pass in:

    # test/support.exs
    
    defmodule Csv.Repo do
      @me __MODULE__
    
      def start_link do
        Agent.start_link(fn -> [] end, name: @me)
      end
    
      def insert(record) do
        Agent.update @me, fn(collection) -> collection ++ [record] end
        {:ok, record}
      end
    
      def all do
        Agent.get(@me, fn(collection) -> collection end)
      end
    end

    Let’s go over each function: Csv.Repo.start_link/0 uses Agent.start_link/2 to spawn a process named after the current module, Csv.Repo, which in turn is assigned to the @me module attribute as to have a central spot of reference. Moreover, the anonymous function passed to Agent.start_link/2 returns an empty list, which the Agent sets as its initial state.

    As for Csv.Repo.insert/1, it calls Agent.update/2 with two arguments: the process’ name and an anonymous function, which appends the record to the Agent’s state. Although that function comes from the client, its lexical environment is carried over and it runs in the Agent process.

    Finally, Csv.Repo.all/0 is set to fetch the entire state back.

    Using our Agent is very simple, let’s try it out in iex:

    $ iex -S mix
    Interactive Elixir (1.5.0) - press Ctrl+C to exit (type h() ENTER for help)
    iex(1)> Code.require_file "test/support.exs"
    [{Csv.Schemas.Site,
      <<70, 79, 82, 49, 0, 0, 13, 12, 66, 69, 65, 77, 65, 116, 85, 56, 0, 0, 1, 204,
        0, 0, 0, 46, 30, 69, 108, 105, 120, 105, 114, 46, 67, 115, 118, 46, 70, 105,
        120, 116, 117, 114, 101, 115, 46, 83, 105, 116, ...>>}]
    iex(2)> Csv.Repo.start_link
    {:ok, #PID<0.178.0>}
    iex(3)> Csv.Repo.insert %Csv.Schemas.Site{name: "Semaphore Blog"}
    {:ok, %Csv.Schemas.Site{id: nil, name: "Semaphore Blog", url: nil}}
    iex(4)> Csv.Repo.insert %Csv.Schemas.Site{name: "Elixir Language"}
    {:ok, %Csv.Schemas.Site{id: nil, name: "Elixir Language", url: nil}}

    Since Mix does not compile exs files, we had to explicitly require support.exs to reach our Csv.Schemas.Site struct. Then we started an agent process with Csv.Repo.start_link/0 and performed some Site insertions with Csv.Repo.insert/1. Now, we can fetch all of our structs back with Csv.Repo.all/0:

    ex(5)> Csv.Repo.all
    [%Csv.Schemas.Site{id: nil, name: "Semaphore Blog", url: nil},
     %Csv.Schemas.Site{id: nil, name: "Elixir Language", url: nil}]

    Making Our Test Pass

    We should have a new error at this time:

    1) test imports records of a csv file (Csv.ImportTest)
       test/csv/import_test.exs:6
       ** (exit) exited in: GenServer.call(Csv.Repo, {:get, #Function<0.78900424/1 in Csv.Repo.all/0>}, 5000)
           ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    

    Don’t get fooled by the seemingly strange GenServer reference. Under the covers, that’s what Agent uses. That message means that we didn’t start Csv.Repo before running our code, so change import_test.exs to look like this:

    # test/csv/import_test.exs
    
    defmodule Csv.ImportTest do
      use ExUnit.Case
      alias Csv.FixtureSchemas.Site
      alias Csv.Repo
    
      test "imports records of a csv file" do
        Repo.start_link # This line starts our agent
        options = [schema: Site, headers: [:name, :url]]
    
        "test/fixtures/sites.csv" |> Csv.Import.call(options)
    
        assert [
          %Site{name: "Elixir Language", url: "https://elixir-lang.org"},
          %Site{name: "Semaphore Blog", url: "https://semaphoreci.com/blog"}
        ] = Repo.all
      end
    end

    Now we have our first actual failure:

    1) test imports records of a csv file (Csv.ImportTest)
       test/csv/import_test.exs:6
       match (=) failed
       code:  assert [%Site{name: "Elixir Language", url: "https://elixir-lang.org"}, %Site{name: "Semaphore Blog", url: "https://semaphoreci.com/blog"}] = Repo.all()
       right: []
       stacktrace:
         test/csv/import_test.exs:12: (test)

    This makes sense. Since we have not inserted any records, Repo.all/0 returns the initial state: an empty list. Adding to that, Csv.Import.call/2 is not doing anything. Let’s outline the required steps to make our test pass:

    defmodule Csv.Import do
      def call(_input_path, _options) do
        # 1. Read the CSV line by line
        # 2. Parse lines from strings into structs
        # 3. Insert each struct into the repository
      end
    end

    To fulfill step 1, retrieve raw lines out of the file:

    lines = "test/fixtures/sites.csv"
    |> File.read!()
    |> String.split("\n")
    |> Enum.drop(1)
    |> Enum.reject(&(String.trim(&1) == ""))

    It’s worth explaining each step of this pipeline:

    • File.read!/1 reads file contents into a string.
    • String.split/2 splits the string by newline and returns a list.
    • Enum.drop/2 discards the first item off the list, which corresponds to the headers.
    • Rejecting any empty lines with Enum.reject/2.

    Next, we will split each row by comma and use pattern matching to extract name and url fields, putting all data in structs after that:

    structs = lines
    |> Enum.map(&String.split(&1, ","))
    |> Enum.map(fn([name, url]) ->
      %Csv.Schemas.Site{name: name, url: url}
    end)

    Finally, we will pass each struct onto the repository, so that it takes care of persistence. The side-effectful Enum.each/2 instead of Enum.map/2 fits the bill just right, since we don’t need to pipe return values onward.

    structs
    |> Enum.each(&Csv.Repo.insert/1)

    To wire up the above code correctly in our function, we will use input_path as the CSV path and use options[:schema] to create a struct on-the-fly with Kernel.struct/2. We can’t use struct shorthands because they merely support static atom references:

    # lib/csv/import.ex
    
    defmodule Csv.Import do
      def call(input_path, options) do
        input_path
        |> File.read!
        |> String.split("\n")
        |> Enum.drop(1)
        |> Enum.reject(&(String.trim(&1) == ""))
        |> Enum.map(&String.split(&1, ","))
        |> Enum.map(fn([name, url]) ->
          struct(options[:schema], %{name: name, url: url})
        end)
        |> Enum.each(&Csv.Repo.insert/1)
      end
    end

    And don’t forget to create sites.csv file under test/fixtures:

    name,url
    Elixir Language,https://elixir-lang.org
    Semaphore Blog,https://semaphoreci.com/blog
    

    Tests should now be green:

    $ mix test
    ==> csv
    .
    
    Finished in 0.02 seconds
    1 test, 0 failures

    Test-driving a Record Stream

    Our code is working kind of naively. First of all, parsing rows is not generic as we want, and we can’t use a schema other than Site. These requirements must be flexible, otherwise we won’t make good use out of our library. Secondly, we have lots of logic in a single spot. It’s time to start thinking in terms of new abstractions, splitting up concerns, and making room for growth!

    More importantly, we will likely parse thousands of rows, which makes putting a big file into a single string a waste of memory. Furthermore, each map call iterates through the whole collection, thus making our pipeline slower for large inputs.

    Streams

    What if we could have Elixir read our file line by line, and they flowed through our pipeline in one fell swoop? Turns out we can, by using streams. Streams are composable, lazy enumerables. Suppose we have the following pipeline, where we double and add one to each number of a list:

    iex(1)> [1, 2] |> Enum.map(fn(x) -> x * 2 end) |> Enum.map(fn(x) -> x + 1 end)
    [3, 5]

    Both Enum.map/2 calls iterate over the entire list, summing up a total of two iterations. That may be negligible for small lists, but what about big ones? Streams allow performing a single iteration, and refactoring from eager to lazy is easy:

    iex(2)> [1, 2] |> Stream.map(fn(x) -> x * 2 end) |> Stream.map(fn(x) -> x + 1 end) |> Enum.to_list
    [3, 5]

    Renaming Enum.map to Stream.map and appending Enum.to_list/1 to the pipeline affords a single iteration through the enumerable. Enum.to_list/1 is necessary to trigger the iteration, as any Enum function similarly would.

    But how does this work? Elixir being a functional language, functions are first-class citizens and we can compose them. Under the hood, Stream keeps track of an enumerable and functions of the current stream pipeline. For instance:

    iex(3)> stream_1 = Stream.map([1, 2], fn(x) -> x * 2 end)
    #Stream<[enum: [1, 2], funs: [#Function<47.117392430/1 in Stream.map/2>]]>
    iex(4)> stream_2 = stream_1 |> Stream.map(fn(x) -> x + 1 end)
    #Stream<[enum: [1, 2],
     funs: [#Function<47.117392430/1 in Stream.map/2>,
      #Function<47.117392430/1 in Stream.map/2>]]>

    The first call to Stream.map/2 creates a %Stream{} struct with two keys: enum, which is the [1, 2] enumerable; and funs, a list holding the anonymous function “multiply number by two” with some mapping logic applied. The second call appends another function, “map each number to plus one”, to the funs list. Thus, no work on enum happens when calling functions on Stream – instead, data is registered in a struct!

    However, magic happens when we pipe %Stream{} onto Enum.to_list/1:

    iex(5)> stream_2 |> Enum.to_list
    [3, 5]

    This is possible due to streams implementing the Enumerable protocol. The underlying implementation glues our funs together and runs enum through them. The resulting effect is a single, lazy iteration. Let’s do this work manually, so that you get the idea:

    iex(7)> double_number = fn(x) -> x * 2 end
    iex(8)> add_one = fn(x) -> x + 1 end
    iex(9)> funcs = [double, add_one]
    [#Function<6.99386804/1 in :erl_eval.expr/5>,
     #Function<6.99386804/1 in :erl_eval.expr/5>]
    iex(10)> initial_value = fn(x) -> x end
    #Function<6.99386804/1 in :erl_eval.expr/5>
    iex(11)> composed = Enum.reduce funcs, initial_value, fn(acc, func) ->
    ...(12)>   fn(x) -> x |> func.() |> acc.() end
    ...(13)> end
    #Function<6.99386804/1 in :erl_eval.expr/5>
    iex(14)> composed.(2)
    5
    iex(15)> [1, 2] |> Enum.map(composed)
    [3, 5]

    Similarly to what happens on an actual stream, we reduced our functions into a single one (hence Enum.reduce) and ran our enumerable through it. The actual implementation is more complex than that because Stream has a multitude of functions other than Stream.map, but the idea is the same.

    Record Streams

    It might have become clear to you that our code can benefit from streams. Let’s create a RecordStream module to replace some of our pipeline. It will bundle up the following steps as efficiently as it can:

    1. Read the CSV line by line
    2. Parse lines from strings into structs
    

    Since this will be a lower-level component, we will step down the abstraction ladder and work with a device instead of a file path. In Elixir and Erlang, we call file handlers devices. After “grabbing” a device, we can use the IO module to perform read and write operations on it:

    iex(1)> {:ok, device} = File.open("test/fixtures/sites.csv")
    {:ok, #PID<0.195.0>}
    iex(2)> IO.read(device, :all)
    "name,url\nElixir Language,https://elixir-lang.org\nSemaphore Blog,https://semaphoreci.com/blog\n"

    Surprisingly, devices are implemented as processes:

    • File.open spawns a new process and returns a pid back to the caller.
    • IO functions use the pid to request file operations, to which the process may send a response back depending on the request. Helpfully, these functions abstract away the hurdle of sending and receiving messages.
    • It’s possible to have any kind of IO device, as long as it implements the required messaging interface.

    Elixir provides an in-memory IO interface called StringIO, which can also be controlled by IO functions. Instead of a file path, however, StringIO.open/1 asks for the file contents directly:

    iex(1)> {:ok, device} = StringIO.open("name,url\nElixir Language,https://elixir-lang.org")
    {:ok, #PID<0.87.0>}
    iex(2)> IO.read(device, :all)
    "name,url\nElixir Language,https://elixir-lang.org"

    Implementing a Record Stream Module

    Our record stream needs three arguments: a device, a schema, and the expected headers. Let’s start with the most trivial case – when the file has only headers and thus no rows, it should stream an empty list:

    # test/csv/record_stream_test.exs
    
    defmodule RecordStreamTest do
      use ExUnit.Case
    
      test "streams an empty list when csv has no rows" do
        {:ok, device} = StringIO.open("name,url\n")
        {:ok, stream} = Csv.RecordStream.new(device, headers: [:name, :url], schema: Site)
    
        assert [] == Enum.to_list(stream)
      end
    end

    An error will say we haven’t defined RecordStream.new/2. To solve it, introduce that function and make it return a stream which returns an empty list when piped onward:

    # lib/csv/record_stream.ex
    
    defmodule Csv.RecordStream do
      def new(_device, headers: _headers, schema: _schema) do
        stream = Stream.map([], fn(_) -> nil end)
        {:ok, stream}
      end
    end

    Our next case will exercise when the CSV has a few rows:

    # test/csv/record_stream_test.exs
    
    defmodule RecordStreamTest do
      use ExUnit.Case
      alias Csv.Schemas.Site
    
      test "streams an empty list when csv has no rows" do
        # Imagine the previous test here...
      end
    
      test "streams csv rows as structs" do
        {:ok, device} = StringIO.open """
            name,url
            Elixir Language,https://elixir-lang.org
            Semaphore Blog,https://semaphoreci.com/blog
            """
        {:ok, stream} = Csv.RecordStream.new(device, headers: [:name, :url], schema: Site)
    
        assert [
          %Site{name: "Elixir Language", url: "https://elixir-lang.org"},
          %Site{name: "Semaphore Blog", url: "https://semaphoreci.com/blog"}
        ] = Enum.to_list(stream)
      end
    end

    It fails because we are still returning an empty list:

    1) test streams csv rows as structs (RecordStreamTest)
       test/csv/record_stream_test.exs:12
       match (=) failed
       code:  assert [%Site{name: "Elixir Language", url: "https://elixir-lang.org"}, %Site{name: "Semaphore Blog", url: "https://semaphoreci.com/blog"}] = Enum.to_list(stream)
       right: []

    We will use IO.stream/2 to lazily read the lines of our file and pipe further by the stream, while stripping newline characters using String.trim/1 and splitting rows by comma:

    # lib/csv/record_stream.ex
    
    defmodule Csv.RecordStream do
      def new(device, headers: _headers, schema: _schema) do
        stream = device
        |> IO.stream(:line)
        |> Stream.map(&String.trim/1)
        |> Stream.map(&String.split(&1, ","))
    
        {:ok, stream}
      end
    end

    Our test fails with an interesting error. We are getting back headers when we shouldn’t:

    2) test streams csv rows as structs (RecordStreamTest)
       test/csv/record_stream_test.exs:12
       match (=) failed
       code:  assert [%Site{name: "Elixir Language", url: "https://elixir-lang.org"}, %Site{name: "Semaphore Blog", url: "https://semaphoreci.com/blog"}] = Enum.to_list(stream)
       right: [["name", "url"], ["Elixir Language", "https://elixir-lang.org"],
               ["Semaphore Blog", "https://semaphoreci.com/blog"]]

    Now we need to separate out the headers without unwrapping the stream into a list, for that would defeat using streams in the first place. Streams are enumerables, hence we can use Enum.fetch!/2:

    # lib/csv/record_stream.ex
    
    defmodule Csv.RecordStream do
      def new(device, headers: _headers, schema: _schema) do
        stream = device
        |> IO.stream(:line)
        |> Stream.map(&String.trim/1)
        |> Stream.map(&String.split(&1, ","))
    
        # This variable name starts with an underline because
        # we are not using it
        _headers = stream |> Enum.fetch!(0)
    
        {:ok, stream}
      end
    end

    A keen reader will find it strange that headers are no longer showing up on the right hand side of the assertion:

    1) test streams csv rows as structs (RecordStreamTest)
       test/csv/record_stream_test.exs:12
       match (=) failed
       code:  assert [%Site{name: "Elixir Language", url: "https://elixir-lang.org"}, %Site{name: "Semaphore Blog", url: "https://semaphoreci.com/blog"}] = Enum.to_list(stream)
       right: [["Elixir Language", "https://elixir-lang.org"],
               ["Semaphore Blog", "https://semaphoreci.com/blog"]]

    We dropped the first element off the enumerable, and that inadvertently mutated our stream and returned the remaining rows. Isn’t Elixir immutable? Before answering that question, let’s try the same thing out with a plain list:

    iex(1)> list = [1, 2, 3, 4]
    [1, 2, 3, 4]
    iex(2)> list |> Enum.fetch!(0)
    1
    iex(3)> list
    [1, 2, 3, 4]

    As you can see, our list remained the same after calling Enum.fetch!/2 on it. But with devices it is different – remember they are processes? The side-effect of enumerating over an IO stream is that, while reading or writing to a file, the device updates an internal pointer to keep track of the current position. This is much like IO works in Ruby, which may be unexpected for newcomers. But it was an Erlang design decision.

    Next, let’s use the headers function argument to assist with mapping rows into structs. We can build a keyword list out of headers and rows using Enum.zip:

    iex(1)> keys = [:name, :url]
    [:name, :url]
    iex(2)> values = ["Elixir Language", "https://elixir-lang.org"]
    ["Elixir Language", "https://elixir-lang.org"]
    iex(3)> kwlist = Enum.zip(keys, values)
    [name: "Elixir Language", url: "https://elixir-lang.org"]

    Now we need to transform the keyword list into a map and then into a struct. We can do that with Enum.into/2 and Kernel.struct/2:

    iex(5)> Enum.into(kwlist, %{})
    %{name: "Elixir Language", url: "https://elixir-lang.org"}
    iex(6)> struct(Csv.Schemas.Site, kwlist)
    %Csv.Schemas.Site{name: "Elixir Language", url: "https://elixir-lang.org"}

    Now we have all the puzzle pieces to get our tests passing:

    # lib/csv/record_stream.ex
    
    defmodule Csv.RecordStream do
      def new(device, headers: headers, schema: schema) do
        stream = device
        |> IO.stream(:line)
        |> Stream.map(&String.trim/1)
        |> Stream.map(&String.split(&1, ","))
        |> Stream.map(fn(row) ->
          contents = headers
          |> Enum.zip(row)
          |> Enum.into(%{})
    
          struct(schema, contents)
        end)
    
        _headers = stream |> Enum.fetch!(0)
    
        {:ok, stream}
      end
    end

    And sure enough they do:

    ...
    
    Finished in 0.03 seconds
    3 tests, 0 failures
    

    Validating the Headers

    We are missing out two important cases. First, our code must still work with shuffled headers. Secondly, we need to ensure the file has the right columns. Here are the tests:

    # test/csv/record_stream_test.exs
    
    test "streams correctly when headers are shuffled" do
      {:ok, device} = StringIO.open """
        url,name
        https://elixir-lang.org,Elixir Language
        """
      {:ok, stream} = Csv.RecordStream.new(device, headers: [:name, :url], schema: Site)
    
      assert [
        %Site{name: "Elixir Language", url: "https://elixir-lang.org"},
      ] = Enum.to_list(stream)
    end
    
    test "returns :invalid_csv when missing required columns" do
      {:ok, device} = StringIO.open """
        name
        Elixir Language
        """
    
      assert :invalid_csv == Csv.RecordStream.new(device, headers: [:name, :url], schema: Site)
    end

    Let’s make the first one pass. Instead of creating structs using the headers function argument, we can use headers from the file instead, since they dictate column order. Furthermore, converting all headers into atoms is necessary, due to keys of keyword lists being atoms:

    headers = stream
    |> Enum.fetch!(0)
    |> Enum.map(&String.to_atom/1)
    
    structs = stream
    |> Stream.map(fn(row) ->
      # Uses headers from the file to build up structs
      contents = headers
      |> Enum.zip(row)
      |> Enum.into(%{})
    
      struct(schema, contents)
    end)

    The second one is also simple, and requires conditional logic to determine whether the headers we are dealing with are the ones we expect. If so, we return {:ok, structs}, otherwise :invalid_csv:

    if Enum.sort(headers) == Enum.sort(expected_headers) do
      # ... Code to figure out the structs comes here
    
      {:ok, structs}
    else
      :invalid_csv
    end

    Combined, these cases make headers unbreakable! After putting it all together, here’s the resulting code:

    # lib/csv/record_stream.ex
    
    defmodule Csv.RecordStream do
      def new(device, headers: expected_headers, schema: schema) do
        stream = device
        |> IO.stream(:line)
        |> Stream.map(&String.trim/1)
        |> Stream.map(&String.split(&1, ","))
    
        headers = stream
        |> Enum.fetch!(0)
        |> Enum.map(&String.to_atom/1)
    
        if Enum.sort(headers) == Enum.sort(expected_headers) do
          structs = stream
          |> Stream.map(fn(row) ->
            contents = headers
            |> Enum.zip(row)
            |> Enum.into(%{})
    
            struct(schema, contents)
          end)
    
          {:ok, structs}
        else
          :invalid_csv
        end
      end
    end

    Refactoring

    Our code works, but there is too much complexity in a single function. To solve that issue, we recommend extracting logical chunks onto private functions. Good tests make refactoring easy, so you should take small steps and run them constantly during the process. Here’s an idea of how you might reorganize the module:

    # lib/csv/record_stream.ex
    
    defmodule Csv.RecordStream do
      def new(device, headers: expected_headers, schema: schema) do
        stream = device |> to_stream
        headers = stream |> extract_headers
    
        if valid_headers?(headers, expected_headers) do
          {:ok, Stream.map(stream, &to_struct(&1, schema, headers))}
        else
          :invalid_csv
        end
      end
    
      def valid_headers?(headers, valid_headers) do
        Enum.sort(headers) == Enum.sort(valid_headers)
      end
    
      defp to_stream(device) do
        device
        |> IO.stream(:line)
        |> Stream.map(&String.trim/1)
        |> Stream.map(&String.split(&1, ","))
      end
    
      defp extract_headers(stream) do
        stream
        |> Enum.fetch!(0)
        |> Enum.map(&String.to_atom/1)
      end
    
      defp to_struct(row, schema, headers) do
        headers
        |> Enum.zip(row)
        |> Enum.into(%{})
        |> (fn(contents) -> struct(schema, contents) end).()
      end
    end

    And all tests should still be passing.

    Parsing the CSV Properly

    Our in-house CSV parser is fragile and bound to break upon certain scenarios. What if the CSV has commas in between contents? Let’s just use a good library: We recommend NimbleCSV from Plataformatec. It is simple and relies on binary pattern matching for efficiency, having no dependencies other than itself. Include it in mix.exs:

    # mix.exs
    
    defp deps do
      [
        {:nimble_csv, "~> 0.1.0"}
      ]
    end

    And install dependencies:

    mix deps.get

    By default, this library ships with NimbleCSV.RFC4180, which is the most common implementation of CSV parsing/dumping available using comma as separators and double-quote as escape. Thankfully, it works with streams out of the box. Just change this function:

    # lib/csv/record_stream.ex
    
    defp to_stream(device) do
      device
      |> IO.stream(:line)
      |> Stream.map(&String.trim/1)
      |> Stream.map(&String.split(&1, ","))
    end

    Into this:

    # lib/csv/record_stream.ex
    
    defp to_stream(device) do
      device
      |> IO.stream(:line)
      |> NimbleCSV.RFC4180.parse_stream(headers: false)
    end

    Because we are handling the headers elsewhere, we pass headers: false to NimbleCSV.RFC4180.parse_stream/2.

    And all tests will still be green.

    Plugging Record Stream into Import

    It’s time to plug our rock-solid Csv.RecordStream into Csv.Import – and we expect tests to still pass after that! We will replace most of the pipeline in Csv.Import, except for the repository part:

    # lib/csv/import.ex
    
    defmodule Csv.Import do
      alias Csv.RecordStream
    
      def call(input_path, schema: schema, headers: headers) do
        {:ok, device} = File.open(input_path)
        {:ok, stream} = RecordStream.new(device, headers: headers, schema: schema)
    
        stream |> Enum.each(&Csv.Repo.insert/1)
      end
    end

    Parallelizing Insertions

    We are not leveraging any of Elixir’s concurrency features to make things faster. The bulk of our processing regards interacting with a repository, which happens sequentially in our code:

    stream |> Enum.each(&Csv.Repo.insert/1)

    It’s possible to use Task.async_stream/4 to run a given function concurrently on each item of an enumerable. But there is a catch: database connections are bound to a certain pool size, therefore we must limit the amount of simultaneous connections to avoid errors. Even though we aren’t dealing with a database yet, we want to provide sane defaults, and Task.async_stream/4 accepts a max_concurrency option for just that. This kind of rate control is usually called back-pressure.

    Task.async_stream/4 asks for an MFA, which stands for module/function/arguments. For us, that will be Csv.Repo, :insert, and an empty list because there are no arguments other than the struct, which is already passed as the first argument.

    Replacing Enum.each/2 with Task.async_stream and setting max_concurrency to 10 (for now) shall do what we want:

    # lib/csv/import.ex
    
    defmodule Csv.Import do
      alias Csv.RecordStream
    
      def call(input_path, schema: schema, headers: headers) do
        {:ok, device} = File.open(input_path)
        {:ok, stream} = RecordStream.new(device, headers: headers, schema: schema)
    
        stream
        |> Task.async_stream(Csv.Repo, :insert, [], max_concurrency: 10)
        |> Enum.to_list
      end
    end

    Tests still pass after that change. Our import routine became lightning fast.

    Setting up Testing on Semaphore CI

    To complete the first part of our tutorial, let’s setup our project on Semaphore CI. If you don’t have a Semaphore account, you can create one for free.

    First, add the source files to git and push them to GitHub or BitBucket.

    After logging into Semaphore, select “Create new” -> “Project”:

    Create project

    Semaphore will ask you to configure either GitHub or BitBucket, so just follow the authorization steps.

    Next, choose the correct repository:

    Choose repository

    Choose Elixir version 1.4.5 (1.5 is not available at the time of this writing) and setup build steps to look like this:

    Configure build

    Click on the “Build With These Settings” button and wait for the build to complete and the tests to pass:

    Build

    Congratulations! Now you have rock solid continuous integration at your fingertips.

    Conclusion

    Hopefully, this article has shown you how powerful and flexible streams and pipelines can be together. Toward the end, adding concurrency and robust CSV parsing required minimal changes to the code. You can check out all of the code in this git repository.

    In the next article, we will take a tour around mocking and stubbing techniques and explore scenarios where this practice makes sense. See you there!

    Leave a Reply

    Your email address will not be published. Required fields are marked *

    Avatar
    Writen by:
    A full-stack oriented Rubyist, Elixirist, and Javascripter who enjoys exchanging knowledge and aims for well-balanced and easy-to-maintain solutions regarding product needs. Frequently blogs at The Miners.