Test driving a stream powered elixir library

Test-Driving a Stream-powered Elixir Library

In this tutorial, we will test-drive an Elixir library and refactor it to leverage streams, while learning about Umbrella apps and handy testing techniques.

Speed up your Elixir tests and deployment on Semaphore.

Test and deploy faster

Introduction

Elixir is a recent language built on top of the battle-tested BEAM virtual machine, which is steadily gaining traction among companies and developers. It brings a modern and fresh perspective to the Erlang world, and with it we can easily build reliable and fault-tolerant applications. Other than that, one of Elixir's main strengths is productivity.

This series will teach you techniques to developing Umbrella projects composed of small and decoupled applications. We will explore the following topics:

  • Test-driven Development,
  • Umbrella Projects,
  • Streams, pipelines, and basic IO,
  • Testing and mocking techniques,
  • Basic OTP with GenServer and Tasks, and
  • Integrating functionality into a Phoenix front end.

Final Product

Our final product will make use of small libraries to import CSV files into a database. Also, it will provide a great user experience with real-time notifications. Out of the box, Elixir and Phoenix enable this kind of enhanced interactivity, thereby defeating the stateless nature of HTTP.

Our Umbrella will be composed of three applications:

  • A CSV importer,
  • A job runner with tracking capabilities, and
  • A Phoenix frontend that will tie up all of the above under a user interface.

Goals of This Tutorial

We will bootstrap csv, a generic and reusable library to parse and import CSV files. You will learn about:

  • Using Agents to fulfill testing needs,
  • Testing file IO,
  • How to leverage Streams and how they work internally, and
  • Parallelizing work on a pipeline.

Let's get started!

Prerequisites

For this article, you will need:

  • Basic knowledge about Elixir and ExUnit and
  • Elixir 1.5 installed on your computer.

To get acquainted with ExUnit and the mechanics of Test-Driven Development, I recommend you to check out our Introduction to Testing Elixir Applications with ExUnit.

Umbrella Projects

The more a project grows, the harder it is to maintain it. When coupling crawls in and team members begin to step on each other's toes, the time calls for a change.

Umbrella Projects address tearing a big application apart without affecting a team's morale and productivity. Traditionally, that action involves splitting out concerns into Hex libraries, which also implies splitting out their release lifecycles. But that comes with a cost — it gets harder to manage versions and keep them in sync with the host application. Moreover, these components are usually specific to the host application's domain and can't be vendorized.

Mix has the tools to put any number of applications under a so-called Umbrella Project, hence offering a nice balance to that problem: one can enjoy the benefits of decoupled and independent applications while still having all code under a single git repository.

Resorting to this feature later in the lifetime of a project is one use case, but it may make sense to start out with an Umbrella if you have a well-rounded idea about your domain, or if you want to enjoy these benefits from the get-go. It all depends on your particular situation. Whatever your demand is, do not rush; take small steps and wait for domain concepts to emerge before applying namespacing or breaking out sub applications.

Bootstrapping Our Project

We will start with creating our Umbrella project. Let's call it admin:

mix new admin --umbrella

Mix helpfully gives some hints on what to do next:

* creating .gitignore
* creating README.md
* creating mix.exs
* creating apps
* creating config
* creating config/config.exs

Your umbrella project was created successfully.
Inside your project, you will find an apps/ directory
where you can create and host many applications:

    cd admin
    cd apps
    mix new my_app

Commands like "mix compile" and "mix test" when executed
in the umbrella project root will automatically run
for each application in the apps/ directory.

As you can see, Mix commands executed in the project root run on each application, which is extremelly helpful because it ensures that all the components are healthy. You can also run the tests of any application individually, as long as you first cd to the application directory.

Creating the CSV Application

By default, Umbrella projects provide an apps directory where you can put standalone applications. Not surprisingly, applications can be created in the same way you would normally do with Mix. Use the following commands to create csv:

cd apps
mix new csv

As a sanity measure, let's run the tests generated by Mix from the root folder and make sure they pass:

$ mix test
==> csv
Compiling 1 file (.ex)
Generated csv app
==> csv
..

Finished in 0.03 seconds
2 tests, 0 failures

Randomized with seed 464454

Before advancing, do delete those tests because we won't need them:

rm -f apps/csv/test/csv_test.exs

Test-driving the Import Module

As top-down TDD suggests, let's start with a high-level test describing the outcome we expect. An outline may be helpful:

  1. Receive a path to the CSV file,
  2. Parse rows and insert them into a repository, and
  3. Retrieve rows back from the repository and assert whether they match the CSV's.

Since this code is meant to work on any CSV definition, we will give it three arguments - a file path, a struct atom, and a list of expected headers. The struct will be used as a schema on each CSV row, and the headers will be used to make sure the file conforms to an expected structure.

Using any schema is possible for this test, so let's stick with something simple: a Site struct with two fields: name and url. As for the repository, its role will be to insert rows into a persistence layer, but let's not worry about that for now.

Given our plan, TDD leans toward picturing client code before it even exists:

options = [schema: Csv.Schemas.Site, headers: [:name, :url]]
Csv.Import.call("test/fixtures/sites.csv", options)

Great, now we're off to a good start with an integration test:

# apps/csv/test/csv/import_test.exs

defmodule Csv.ImportTest do
  use ExUnit.Case
  alias Csv.Schemas.Site
  alias Csv.Repo

  test "imports records of a csv file" do
    options = [schema: Site, headers: [:name, :url]]

    "test/fixtures/sites.csv" |> Csv.Import.call(options)

    assert [
      %Site{name: "Elixir Language", url: "https://elixir-lang.org"},
      %Site{name: "Semaphore Blog", url: "https://semaphoreci.com/blog"}
    ] = Repo.all
  end
end

After running it with mix test, a compile error will say Site module cannot expand to a struct. To fix it, create a struct with name and url fields:

# apps/csv/test/support.exs

defmodule Csv.Schemas.Site do
  defstruct name: '', url: ''
end

Next, require support.exs in test_helper.exs:

# apps/csv/test/test_helper.exs

ExUnit.start()

Code.require_file "test/support.exs"

Running our tests at this point should fail with the following error:

1) test imports records of a csv file (Csv.ImportTest)
    test/csv/import_test.exs:6
    ** (UndefinedFunctionError) function Csv.Import.call/2 is undefined (module Csv.Import is not available)

Creating the Main Module

The error message hints at defining a Csv.Import.call/2 function. Let's do the bare minimum to keep the flow coming:

# apps/csv/lib/csv/import.ex

defmodule Csv.Import do
  def call(_input_path, _options) do
  end
end

As expected, we hit another error when rerunning our tests: we don't have a repository.

1) test imports records of a csv file (Csv.ImportTest)
   test/csv/import_test.exs:6
   ** (UndefinedFunctionError) function Csv.Repo.all/0 is undefined (module Csv.Repo is not available)

Creating an In-memory Repository with Agent

Agents are simple wrappers around state. Among other utilities, they are perfect to simulate external boundaries such as databases, especially when all you need to do is set and retrieve some data. Most importantly, we want to avoid setting up a database because our library will delegate that responsibility away.

In Elixir and Erlang, processes are commonly used to maintain state by running a recursive loop where data is repeatedly passed to the same function. Agents are abstractions over exactly that definition.

Our Agent will run in a separate process and remember any state we pass in:

# apps/csv/test/support.exs

defmodule Csv.Repo do
  @me __MODULE__

  def start_link do
    Agent.start_link(fn -> [] end, name: @me)
  end

  def insert(record) do
    Agent.update @me, fn(collection) -> collection ++ [record] end
    {:ok, record}
  end

  def all do
    Agent.get(@me, fn(collection) -> collection end)
  end
end

Let's go over each function: Csv.Repo.start_link/0 uses Agent.start_link/2 to spawn a process named after the current module, Csv.Repo, which in turn is assigned to the @me module attribute as to have a central spot of reference. Moreover, the anonymous function passed to Agent.start_link/2 returns an empty list, which the Agent sets as its initial state.

As for Csv.Repo.insert/1, it calls Agent.update/2 with two arguments: the process' name and an anonymous function, which appends the record to the Agent's state. Although that function comes from the client, its lexical environment is carried over and it runs in the Agent process.

Finally, Csv.Repo.all/0 is set to fetch the entire state back.

Using our Agent is very simple, let's try it out in iex:

$ iex -S mix
Interactive Elixir (1.5.0) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> Code.require_file "apps/csv/test/support.exs"
[{Csv.Schemas.Site,
  <<70, 79, 82, 49, 0, 0, 13, 12, 66, 69, 65, 77, 65, 116, 85, 56, 0, 0, 1, 204,
    0, 0, 0, 46, 30, 69, 108, 105, 120, 105, 114, 46, 67, 115, 118, 46, 70, 105,
    120, 116, 117, 114, 101, 115, 46, 83, 105, 116, ...>>}]
iex(2)> Csv.Repo.start_link
{:ok, #PID<0.178.0>}
iex(3)> Csv.Repo.insert %Csv.Schemas.Site{name: "Semaphore Blog"}
{:ok, %Csv.Schemas.Site{id: nil, name: "Semaphore Blog", url: nil}}
iex(4)> Csv.Repo.insert %Csv.Schemas.Site{name: "Elixir Language"}
{:ok, %Csv.Schemas.Site{id: nil, name: "Elixir Language", url: nil}}

Since Mix does not compile exs files, we had to explicitly require support.exs to reach our Csv.Schemas.Site struct. Then we started an agent process with Csv.Repo.start_link/0 and performed some Site insertions with Csv.Repo.insert/1. Now, we can fetch all of our structs back with Csv.Repo.all/0:

ex(5)> Csv.Repo.all
[%Csv.Schemas.Site{id: nil, name: "Semaphore Blog", url: nil},
 %Csv.Schemas.Site{id: nil, name: "Elixir Language", url: nil}]

Making Our Test Pass

We should have a new error at this time:

1) test imports records of a csv file (Csv.ImportTest)
   test/csv/import_test.exs:6
   ** (exit) exited in: GenServer.call(Csv.Repo, {:get, #Function<0.78900424/1 in Csv.Repo.all/0>}, 5000)
       ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started

Don't get fooled by the seemingly strange GenServer reference. Under the covers, that's what Agent uses. That message means that we didn't start Csv.Repo before running our code, so change import_test.exs to look like this:

# apps/csv/test/csv/import_test.exs

defmodule Csv.ImportTest do
  use ExUnit.Case
  alias Csv.FixtureSchemas.Site
  alias Csv.Repo

  test "imports records of a csv file" do
    Repo.start_link # This line starts our agent
    options = [schema: Site, headers: [:name, :url]]

    "test/fixtures/sites.csv" |> Csv.Import.call(options)

    assert [
      %Site{name: "Elixir Language", url: "https://elixir-lang.org"},
      %Site{name: "Semaphore Blog", url: "https://semaphoreci.com/blog"}
    ] = Repo.all
  end
end

Now we have our first actual failure:

1) test imports records of a csv file (Csv.ImportTest)
   test/csv/import_test.exs:6
   match (=) failed
   code:  assert [%Site{name: "Elixir Language", url: "https://elixir-lang.org"}, %Site{name: "Semaphore Blog", url: "https://semaphoreci.com/blog"}] = Repo.all()
   right: []
   stacktrace:
     test/csv/import_test.exs:12: (test)

This makes sense. Since we have not inserted any records, Repo.all/0 returns the initial state: an empty list. Adding to that, Csv.Import.call/2 is not doing anything. Let's outline the required steps to make our test pass:

defmodule Csv.Import do
  def call(_input_path, _options) do
    # 1. Read the CSV line by line
    # 2. Parse lines from strings into structs
    # 3. Insert each struct into the repository
  end
end

To fulfill step 1, retrieve raw lines out of the file:

lines = "apps/csv/test/fixtures/sites.csv"
|> File.read!()
|> String.split("\n")
|> Enum.drop(1)
|> Enum.reject(&(String.trim(&1) == ""))

It's worth explaining each step of this pipeline:

  • File.read!/1 reads file contents into a string.
  • String.split/2 splits the string by newline and returns a list.
  • Enum.drop/2 discards the first item off the list, which corresponds to the headers.
  • Rejecting any empty lines with Enum.reject/2.

Next, we will split each row by comma and use pattern matching to extract name and url fields, putting all data in structs after that:

structs = lines
|> Enum.map(&String.split(&1, ","))
|> Enum.map(fn([name, url]) ->
  %Csv.Schemas.Site{name: name, url: url}
end)

Finally, we will pass each struct onto the repository, so that it takes care of persistence. The side-effectful Enum.each/2 instead of Enum.map/2 fits the bill just right, since we don't need to pipe return values onward.

structs
|> Enum.each(&Csv.Repo.insert/1)

To wire up the above code correctly in our function, we will use input_path as the CSV path and use options[:schema] to create a struct on-the-fly with Kernel.struct/2. We can't use struct shorthands because they merely support static atom references:

# apps/csv/lib/csv/import.ex

defmodule Csv.Import do
  def call(input_path, options) do
    input_path
    |> File.read!
    |> String.split("\n")
    |> Enum.drop(1)
    |> Enum.reject(&(String.trim(&1) == ""))
    |> Enum.map(&String.split(&1, ","))
    |> Enum.map(fn([name, url]) ->
      struct(options[:schema], %{name: name, url: url})
    end)
    |> Enum.each(&Csv.Repo.insert/1)
  end
end

And don't forget to create sites.csv file under test/fixtures:

name,url
Elixir Language,https://elixir-lang.org
Semaphore Blog,https://semaphoreci.com/blog

Tests should now be green:

$ mix test
==> csv
.

Finished in 0.02 seconds
1 test, 0 failures

Test-driving a Record Stream

Our code is working kind of naively. First of all, parsing rows is not generic as we want, and we can't use a schema other than Site. These requirements must be flexible, otherwise we won't make good use out of our library. Secondly, we have lots of logic in a single spot. It's time to start thinking in terms of new abstractions, splitting up concerns, and making room for growth!

More importantly, we will likely parse thousands of rows, which makes putting a big file into a single string a waste of memory. Furthermore, each map call iterates through the whole collection, thus making our pipeline slower for large inputs.

Streams

What if we could have Elixir read our file line by line, and they flowed through our pipeline in one fell swoop? Turns out we can, by using streams. Streams are composable, lazy enumerables. Suppose we have the following pipeline, where we double and add one to each number of a list:

iex(1)> [1, 2] |> Enum.map(fn(x) -> x * 2 end) |> Enum.map(fn(x) -> x + 1 end)
[3, 5]

Both Enum.map/2 calls iterate over the entire list, summing up a total of two iterations. That may be negligible for small lists, but what about big ones? Streams allow performing a single iteration, and refactoring from eager to lazy is easy:

iex(2)> [1, 2] |> Stream.map(fn(x) -> x * 2 end) |> Stream.map(fn(x) -> x + 1 end) |> Enum.to_list
[3, 5]

Renaming Enum.map to Stream.map and appending Enum.to_list/1 to the pipeline affords a single iteration through the enumerable. Enum.to_list/1 is necessary to trigger the iteration, as any Enum function similarly would.

But how does this work? Elixir being a functional language, functions are first-class citizens and we can compose them. Under the hood, Stream keeps track of an enumerable and functions of the current stream pipeline. For instance:

iex(3)> stream_1 = Stream.map([1, 2], fn(x) -> x * 2 end)
#Stream<[enum: [1, 2], funs: [#Function<47.117392430/1 in Stream.map/2>]]>
iex(4)> stream_2 = stream_1 |> Stream.map(fn(x) -> x + 1 end)
#Stream<[enum: [1, 2],
 funs: [#Function<47.117392430/1 in Stream.map/2>,
  #Function<47.117392430/1 in Stream.map/2>]]>

The first call to Stream.map/2 creates a %Stream{} struct with two keys: enum, which is the [1, 2] enumerable; and funs, a list holding the anonymous function "multiply number by two" with some mapping logic applied. The second call appends another function, "map each number to plus one", to the funs list. Thus, no work on enum happens when calling functions on Stream - instead, data is registered in a struct!

However, magic happens when we pipe %Stream{} onto Enum.to_list/1:

iex(5)> stream_2 |> Enum.to_list
[3, 5]

This is possible due to streams implementing the Enumerable protocol. The underlying implementation glues our funs together and runs enum through them. The resulting effect is a single, lazy iteration. Let's do this work manually, so that you get the idea:

iex(7)> double_number = fn(x) -> x * 2 end
iex(8)> add_one = fn(x) -> x + 1 end
iex(9)> funcs = [double, add_one]
[#Function<6.99386804/1 in :erl_eval.expr/5>,
 #Function<6.99386804/1 in :erl_eval.expr/5>]
iex(10)> initial_value = fn(x) -> x end
#Function<6.99386804/1 in :erl_eval.expr/5>
iex(11)> composed = Enum.reduce funcs, initial_value, fn(acc, func) ->
...(12)>   fn(x) -> x |> func.() |> acc.() end
...(13)> end
#Function<6.99386804/1 in :erl_eval.expr/5>
iex(14)> composed.(2)
5
iex(15)> [1, 2] |> Enum.map(composed)
[3, 5]

Similarly to what happens on an actual stream, we reduced our functions into a single one (hence Enum.reduce) and ran our enumerable through it. The actual implementation is more complex than that because Stream has a multitude of functions other than Stream.map, but the idea is the same.

Record Streams

It might have become clear to you that our code can benefit from streams. Let's create a RecordStream module to replace some of our pipeline. It will bundle up the following steps as efficiently as it can:

1. Read the CSV line by line
2. Parse lines from strings into structs

Since this will be a lower-level component, we will step down the abstraction ladder and work with a device instead of a file path. In Elixir and Erlang, we call file handlers devices. After "grabbing" a device, we can use the IO module to perform read and write operations on it:

iex(1)> {:ok, device} = File.open("apps/csv/test/fixtures/sites.csv")
{:ok, #PID<0.195.0>}
iex(2)> IO.read(device, :all)
"name,url\nElixir Language,https://elixir-lang.org\nSemaphore Blog,https://semaphoreci.com/blog\n"

Surprisingly, devices are implemented as processes:

  • File.open spawns a new process and returns a pid back to the caller.
  • IO functions use the pid to request file operations, to which the process may send a response back depending on the request. Helpfully, these functions abstract away the hurdle of sending and receiving messages.
  • It's possible to have any kind of IO device, as long as it implements the required messaging interface.

Elixir provides an in-memory IO interface called StringIO, which can also be controlled by IO functions. Instead of a file path, however, StringIO.open/1 asks for the file contents directly:

iex(1)> {:ok, device} = StringIO.open("name,url\nElixir Language,https://elixir-lang.org")
{:ok, #PID<0.87.0>}
iex(2)> IO.read(device, :all)
"name,url\nElixir Language,https://elixir-lang.org"

Implementing a Record Stream Module

Our record stream needs three arguments: a device, a schema, and the expected headers. Let's start with the most trivial case - when the file has only headers and thus no rows, it should stream an empty list:

# apps/csv/test/csv/record_stream_test.exs

defmodule RecordStreamTest do
  use ExUnit.Case

  test "streams an empty list when csv has no rows" do
    {:ok, device} = StringIO.open("name,url\n")
    {:ok, stream} = Csv.RecordStream.new(device, headers: [:name, :url], schema: Site)

    assert [] == Enum.to_list(stream)
  end
end

An error will say we haven't defined RecordStream.new/2. To solve it, introduce that function and make it return a stream which returns an empty list when piped onward:

# apps/csv/lib/csv/record_stream.ex

defmodule Csv.RecordStream do
  def new(_device, headers: _headers, schema: _schema) do
    stream = Stream.map([], fn(_) -> nil end)
    {:ok, stream}
  end
end

Our next case will exercise when the CSV has a few rows:

# apps/csv/test/csv/record_stream_test.exs

defmodule RecordStreamTest do
  use ExUnit.Case
  alias Csv.Schemas.Site

  test "streams an empty list when csv has no rows" do
    # Imagine the previous test here...
  end

  test "streams csv rows as structs" do
    {:ok, device} = StringIO.open """
                    name,url
                    Elixir Language,https://elixir-lang.org
                    Semaphore Blog,https://semaphoreci.com/blog
                    """
    {:ok, stream} = Csv.RecordStream.new(device, headers: [:name, :url], schema: Site)

    assert [
      %Site{name: "Elixir Language", url: "https://elixir-lang.org"},
      %Site{name: "Semaphore Blog", url: "https://semaphoreci.com/blog"}
    ] = Enum.to_list(stream)
  end
end

It fails because we are still returning an empty list:

1) test streams csv rows as structs (RecordStreamTest)
   test/csv/record_stream_test.exs:12
   match (=) failed
   code:  assert [%Site{name: "Elixir Language", url: "https://elixir-lang.org"}, %Site{name: "Semaphore Blog", url: "https://semaphoreci.com/blog"}] = Enum.to_list(stream)
   right: []

We will use IO.stream/2 to lazily read the lines of our file and pipe further by the stream, while stripping newline characters using String.trim/1 and splitting rows by comma:

# apps/csv/lib/csv/record_stream.ex

defmodule Csv.RecordStream do
  def new(device, headers: _headers, schema: _schema) do
    stream = device
    |> IO.stream(:line)
    |> Stream.map(&String.trim/1)
    |> Stream.map(&String.split(&1, ","))

    {:ok, stream}
  end
end

Our test fails with an interesting error. We are getting back headers when we shouldn't:

2) test streams csv rows as structs (RecordStreamTest)
   test/csv/record_stream_test.exs:12
   match (=) failed
   code:  assert [%Site{name: "Elixir Language", url: "https://elixir-lang.org"}, %Site{name: "Semaphore Blog", url: "https://semaphoreci.com/blog"}] = Enum.to_list(stream)
   right: [["name", "url"], ["Elixir Language", "https://elixir-lang.org"],
           ["Semaphore Blog", "https://semaphoreci.com/blog"]]

Now we need to separate out the headers without unwrapping the stream into a list, for that would defeat using streams in the first place. Streams are enumerables, hence we can use Enum.fetch!/2:

# apps/csv/lib/csv/record_stream.ex

defmodule Csv.RecordStream do
  def new(device, headers: _headers, schema: _schema) do
    stream = device
    |> IO.stream(:line)
    |> Stream.map(&String.trim/1)
    |> Stream.map(&String.split(&1, ","))

    # This variable name starts with an underline because
    # we are not using it
    _headers = stream |> Enum.fetch!(0)

    {:ok, stream}
  end
end

A keen reader will find it strange that headers are no longer showing up on the right hand side of the assertion:

1) test streams csv rows as structs (RecordStreamTest)
   test/csv/record_stream_test.exs:12
   match (=) failed
   code:  assert [%Site{name: "Elixir Language", url: "https://elixir-lang.org"}, %Site{name: "Semaphore Blog", url: "https://semaphoreci.com/blog"}] = Enum.to_list(stream)
   right: [["Elixir Language", "https://elixir-lang.org"],
           ["Semaphore Blog", "https://semaphoreci.com/blog"]]

We dropped the first element off the enumerable, and that inadvertently mutated our stream and returned the remaining rows. Isn't Elixir immutable? Before answering that question, let's try the same thing out with a plain list:

iex(1)> list = [1, 2, 3, 4]
[1, 2, 3, 4]
iex(2)> list |> Enum.fetch!(0)
1
iex(3)> list
[1, 2, 3, 4]

As you can see, our list remained the same after calling Enum.fetch!/2 on it. But with devices it is different - remember they are processes? The side-effect of enumerating over an IO stream is that, while reading or writing to a file, the device updates an internal pointer to keep track of the current position. This is much like IO works in Ruby, which may be unexpected for newcomers. But it was an Erlang design decision.

Next, let's use the headers function argument to assist with mapping rows into structs. We can build a keyword list out of headers and rows using Enum.zip:

iex(1)> keys = [:name, :url]
[:name, :url]
iex(2)> values = ["Elixir Language", "https://elixir-lang.org"]
["Elixir Language", "https://elixir-lang.org"]
iex(3)> kwlist = Enum.zip(keys, values)
[name: "Elixir Language", url: "https://elixir-lang.org"]

Now we need to transform the keyword list into a map and then into a struct. We can do that with Enum.into/2 and Kernel.struct/2:

iex(5)> Enum.into(kwlist, %{})
%{name: "Elixir Language", url: "https://elixir-lang.org"}
iex(6)> struct(Csv.Schemas.Site, kwlist)
%Csv.Schemas.Site{name: "Elixir Language", url: "https://elixir-lang.org"}

Now we have all the puzzle pieces to get our tests passing:

# apps/csv/lib/csv/record_stream.ex

defmodule Csv.RecordStream do
  def new(device, headers: headers, schema: schema) do
    stream = device
    |> IO.stream(:line)
    |> Stream.map(&String.trim/1)
    |> Stream.map(&String.split(&1, ","))
    |> Stream.map(fn(row) ->
      contents = headers
      |> Enum.zip(row)
      |> Enum.into(%{})

      struct(schema, contents)
    end)

    _headers = stream |> Enum.fetch!(0)

    {:ok, stream}
  end
end

And sure enough they do:

...

Finished in 0.03 seconds
3 tests, 0 failures

Validating the Headers

We are missing out two important cases. First, our code must still work with shuffled headers. Secondly, we need to ensure the file has the right columns. Here are the tests:

# apps/csv/test/csv/record_stream_test.exs

test "streams correctly when headers are shuffled" do
  {:ok, device} = StringIO.open """
    url,name
    https://elixir-lang.org,Elixir Language
  """
  {:ok, stream} = Csv.RecordStream.new(device, headers: [:name, :url], schema: Site)

  assert [
    %Site{name: "Elixir Language", url: "https://elixir-lang.org"},
  ] = Enum.to_list(stream)
end

test "returns :invalid_csv when missing required columns" do
  {:ok, device} = StringIO.open """
    name
    Elixir Language
  """

  assert :invalid_csv == Csv.RecordStream.new(device, headers: [:name, :url], schema: Site)
end

Let's make the first one pass. Instead of creating structs using the headers function argument, we can use headers from the file instead, since they dictate column order. Furthermore, converting all headers into atoms is necessary, due to keys of keyword lists being atoms:

headers = stream
|> Enum.fetch!(0)
|> Enum.map(&String.to_atom/1)

structs = stream
|> Stream.map(fn(row) ->
  # Uses headers from the file to build up structs
  contents = headers
  |> Enum.zip(row)
  |> Enum.into(%{})

  struct(schema, contents)
end)

The second one is also simple, and requires conditional logic to determine whether the headers we are dealing with are the ones we expect. If so, we return {:ok, structs}, otherwise :invalid_csv:

if Enum.sort(headers) == Enum.sort(expected_headers) do
  # ... Code to figure out the structs comes here

  {:ok, structs}
else
  :invalid_csv
end

Combined, these cases make headers unbreakable! After putting it all together, here's the resulting code:

# apps/csv/lib/csv/record_stream.ex

defmodule Csv.RecordStream do
  def new(device, headers: expected_headers, schema: schema) do
    stream = device
    |> IO.stream(:line)
    |> Stream.map(&String.trim/1)
    |> Stream.map(&String.split(&1, ","))

    headers = stream
    |> Enum.fetch!(0)
    |> Enum.map(&String.to_atom/1)

    if Enum.sort(headers) == Enum.sort(expected_headers) do
      structs = stream
      |> Stream.map(fn(row) ->
        contents = headers
        |> Enum.zip(row)
        |> Enum.into(%{})

        struct(schema, contents)
      end)

      {:ok, structs}
    else
      :invalid_csv
    end
  end
end

Refactoring

Our code works, but there is too much complexity in a single function. To solve that issue, we recommend extracting logical chunks onto private functions. Good tests make refactoring easy, so you should take small steps and run them constantly during the process. Here's an idea of how you might reorganize the module:

# apps/csv/lib/csv/record_stream.ex

defmodule Csv.RecordStream do
  def new(device, headers: expected_headers, schema: schema) do
    stream = device |> to_stream
    headers = stream |> extract_headers

    if valid_headers?(headers, expected_headers) do
      {:ok, Stream.map(stream, &to_struct(&1, schema, headers))}
    else
      :invalid_csv
    end
  end

  def valid_headers?(headers, valid_headers) do
    Enum.sort(headers) == Enum.sort(valid_headers)
  end

  defp to_stream(device) do
    device
    |> IO.stream(:line)
    |> Stream.map(&String.trim/1)
    |> Stream.map(&String.split(&1, ","))
  end

  defp extract_headers(stream) do
    stream
    |> Enum.fetch!(0)
    |> Enum.map(&String.to_atom/1)
  end

  defp to_struct(row, schema, headers) do
    headers
    |> Enum.zip(row)
    |> Enum.into(%{})
    |> (fn(contents) -> struct(schema, contents) end).()
  end
end

And all tests should still be passing.

Parsing the CSV Properly

Our in-house CSV parser is fragile and bound to break upon certain scenarios. What if the CSV has commas in between contents? Let's just use a good library: We recommend NimbleCSV from Plataformatec. It is simple and relies on binary pattern matching for efficiency, having no dependencies other than itself. Include it in mix.exs:

# apps/csv/mix.exs

defp deps do
  [
    {:nimble_csv, "~> 0.1.0"}
  ]
end

And install dependencies:

mix deps.get

By default, this library ships with NimbleCSV.RFC4180, which is the most common implementation of CSV parsing/dumping available using comma as separators and double-quote as escape. Thankfully, it works with streams out of the box. Just change this function:

# apps/csv/lib/csv/record_stream.ex

defp to_stream(device) do
  device
  |> IO.stream(:line)
  |> Stream.map(&String.trim/1)
  |> Stream.map(&String.split(&1, ","))
end

Into this:

# apps/csv/lib/csv/record_stream.ex

defp to_stream(device) do
  device
  |> IO.stream(:line)
  |> NimbleCSV.RFC4180.parse_stream(headers: false)
end

Because we are handling the headers elsewhere, we pass headers: false to NimbleCSV.RFC4180.parse_stream/2.

And all tests will still be green.

Plugging Record Stream into Import

It's time to plug our rock-solid Csv.RecordStream into Csv.Import - and we expect tests to still pass after that! We will replace most of the pipeline in Csv.Import, except for the repository part:

# apps/csv/lib/csv/import.ex

defmodule Csv.Import do
  alias Csv.RecordStream

  def call(input_path, schema: schema, headers: headers) do
    {:ok, device} = File.open(input_path)
    {:ok, stream} = RecordStream.new(device, headers: headers, schema: schema)

    stream |> Enum.each(&Csv.Repo.insert/1)
  end
end

Parallelizing Insertions

We are not leveraging any of Elixir's concurrency features to make things faster. The bulk of our processing regards interacting with a repository, which happens sequentially in our code:

stream |> Enum.each(&Csv.Repo.insert/1)

It's possible to use Task.async_stream/4 to run a given function concurrently on each item of an enumerable. But there is a catch: database connections are bound to a certain pool size, therefore we must limit the amount of simultaneous connections to avoid errors. Even though we aren't dealing with a database yet, we want to provide sane defaults, and Task.async_stream/4 accepts a max_concurrency option for just that. This kind of rate control is usually called back-pressure.

Task.async_stream/4 asks for an MFA, which stands for module/function/arguments. For us, that will be Csv.Repo, :insert, and an empty list because there are no arguments other than the struct, which is already passed as the first argument.

Replacing Enum.each/2 with Task.async_stream and setting max_concurrency to 10 (for now) shall do what we want:

# apps/csv/lib/csv/import.ex

defmodule Csv.Import do
  alias Csv.RecordStream

  def call(input_path, schema: schema, headers: headers) do
    {:ok, device} = File.open(input_path)
    {:ok, stream} = RecordStream.new(device, headers: headers, schema: schema)

    stream
    |> Task.async_stream(Csv.Repo, :insert, [], max_concurrency: 10)
    |> Enum.to_list
  end
end

Tests still pass after that change. Our import routine became lightning fast.

Setting up Testing on Semaphore CI

To complete the first part of our tutorial, let's setup our project on Semaphore CI. If you don't have a Semaphore account, you can create one for free.

First, add the source files to git and push them to GitHub or BitBucket.

After logging into Semaphore, select "Create new" -> "Project":

Create project

Semaphore will ask you to configure either GitHub or BitBucket, so just follow the authorization steps.

Next, choose the correct repository:

Choose repository

Choose Elixir version 1.4.5 (1.5 is not available at the time of this writing) and setup build steps to look like this:

Configure build

Click on the "Build With These Settings" button and wait for the build to complete and the tests to pass:

Build

Congratulations! Now you have rock solid continuous integration at your fingertips.

Conclusion

Hopefully, this article has shown you how powerful and flexible streams and pipelines can be together. Toward the end, adding concurrency and robust CSV parsing required minimal changes to the code. You can check out all of the code in this git repository.

In the next article, we will take a tour around mocking and stubbing techniques and explore scenarios where this practice makes sense. See you there!

Dd4ff5e662529aecee83446a84544cc9
Thiago Araújo Silva

A full-stack oriented Rubyist, Elixirist, and Javascripter who enjoys exchanging knowledge and aims for well-balanced and easy-to-maintain solutions regarding product needs. Frequently blogs at The Miners.

on this tutorial so far.
User deleted author {{comment.createdAt}}

Edited on {{comment.updatedAt}}

Cancel

Sign In You must be logged in to comment.