Accounts

The Conduit blogging platform requires authors to register an account before they can publish articles. We shall begin our first feature by implementing user registration in the accounts context.

Register a user

The API spec for registration is as follows:

HTTP verb URL Required fields
POST /api/users email, username, password

Example request body:

{
  "user":{
    "username": "jake",
    "email": "jake@jake.jake",
    "password": "jakejake"
  }
}

Example response body:

{
  "user": {
    "email": "jake@jake.jake",
    "token": "jwt.token.here",
    "username": "jake",
    "bio": null,
    "image": null
  }
}

The request should fail with a 422 HTTP status code error should any of the required fields be invalid. In this case the response body would be in the following format:

{
  "errors": {
    "body": [
      "can't be empty"
    ]
  }
}

We must ensure that usernames are unique and cannot be registered more than once.

Building our first context

Phoenix includes a set of generators to help scaffold your application:

Command Action
mix phx.gen.channel Generates a Phoenix channel
mix phx.gen.context Generates a context with functions around an Ecto schema
mix phx.gen.embedded Generates an embedded Ecto schema file
mix phx.gen.html Generates controller, views, and context for an HTML resource
mix phx.gen.json Generates controller, views, and context for a JSON resource
mix phx.gen.presence Generates a Presence tracker
mix phx.gen.schema Generates an Ecto schema and migration file
mix phx.gen.secret Generates a secret

Since we’re building a REST API, we can use the phx.gen.json generator to create our first context, resource, controller, and JSON view. As we already know the fields relating to our users we can include them, with their type, in the generator command:

$ mix phx.gen.json Accounts User users username:string email:string hashed_password:string bio:string imag\
e:string --table accounts_users

Overall, this generator will add the following files to lib/conduit:

  • Context module in lib/conduit/accounts/accounts.ex, serving as the API boundary.
  • Ecto schema in lib/conduit/accounts/user.ex, and a database migration to create the accounts_users table.
  • View in lib/conduit_web/views/user_view.ex.
  • Controller in lib/conduit_web/controllers/user_controller.ex.
  • Unit and integration tests in test/conduit/accounts.

Remember that the User module we’re creating here is not our domain model. It will be a read model projection, populated by domain events published from an aggregate.

The generator prompts us to add the resource to the :api scope in our Phoenix router module. For now, we will configure only the :create controller action to support registering a user:

# lib/conduit_web/router.ex
defmodule ConduitWeb.Router do
  use ConduitWeb, :router

  pipeline :api do
    plug :accepts, ["json"]
  end

  scope "/api", ConduitWeb do
    pipe_through :api

    post "/users", UserController, :create
  end
end

Writing our first integration test

Let’s follow Behaviour Driven Development (BDD), thinking “from the outside in”, and start by writing a failing integration test for user registration. We will include tests that cover the happy path of successfully creating a user, and for the two failure scenarios mentioned above: missing required fields and duplicate username registration.

Factories to construct test data

We will use factory functions to generate realistic data for our tests. ExMachina is an Elixir library that makes it easy to create test data and associations.

In mix.exs, add :ex_machina as a test environment dependency:

defp deps do
  [
    {:ex_machina, "~> 2.0", only: :test},
    # ...
  ]
end

Fetch mix dependencies and compile:

$ mix do deps.get, deps.compile

We must ensure the ExMachina application is started in the test helper, test/test_helper.exs, before ExUnit:

{:ok, _} = Application.ensure_all_started(:ex_machina)

Now we create our factory module in test/support/factory.ex:

# test/support/factory.ex
defmodule Conduit.Factory do
  use ExMachina

  def user_factory do
    %{
      email: "jake@jake.jake",
      username: "jake",
      hashed_password: "jakejake",
      bio: "I like to skateboard",
      image: "https://i.stack.imgur.com/xHWG8.jpg",
    }
  end
end

In our test module, we must import Conduit.Factory to access our user factory. Then we have access to build/1 and build/2 functions to construct params for an example user to register:

  • build(:user)
  • build(:user, username: "ben")
User registration integration test

In our integration test we want to verify successful user registration and check any failure includes a useful error message to help the user identify the problem.

  1. To test success, we assert that the returned HTTP status code is 201 and JSON response matches the API:
     test "should create and return user when data is valid", %{conn: conn} do
       conn = post conn, user_path(conn, :create), user: build(:user)
       json = json_response(conn, 201)["user"]
    
       assert json == build(:user, bio: nil, image: nil)
     end
    
  2. To test a validation failure, we assert the response is 422 and errors are included:
     test "should not create user and render errors when data is invalid", %{conn: conn} do
       conn = post conn, user_path(conn, :create), user: build(:user, username: nil)
       assert json_response(conn, 422)["errors"] == [
         username: [
           "can't be empty",
         ]
       ]
     end
    

The full integration test is given below:

# test/conduit_web/controllers/user_controller_test.exs
defmodule ConduitWeb.UserControllerTest do
  use ConduitWeb.ConnCase

  import Conduit.Factory

  alias Conduit.Accounts

  def fixture(:user, attrs \\ []) do
    build(:user, attrs) |> Accounts.create_user()
  end

  setup %{conn: conn} do
    {:ok, conn: put_req_header(conn, "accept", "application/json")}
  end

  describe "register user" do
    @tag :web
    test "should create and return user when data is valid", %{conn: conn} do
      conn = post conn, user_path(conn, :create), user: build(:user)
      json = json_response(conn, 201)["user"]

      assert json == %{
        "bio" => nil,
        "email" => "jake@jake.jake",
        "image" => nil,
        "username" => "jake",
      }
    end

    @tag :web
    test "should not create user and render errors when data is invalid", %{conn: conn} do
      conn = post conn, user_path(conn, :create), user: build(:user, username: "")
      assert json_response(conn, 422)["errors"] == %{
        "username" => [
          "can't be empty",
        ]
      }
    end

    @tag :web
    test "should not create user and render errors when username has been taken", %{conn: conn} do
      # register a user
      {:ok, _user} = fixture(:user)

      # attempt to register the same username
      conn = post conn, user_path(conn, :create), user: build(:user, email: "jake2@jake.jake")
      assert json_response(conn, 422)["errors"] == %{
        "username" => [
          "has already been taken",
        ]
      }
    end
  end
end

Before running these tests, we must create the event store and read store databases for the test environment:

$ MIX_ENV=test mix do event_store.create, event_store.init
$ MIX_ENV=test mix ecto.create

Then we can execute our registration integration test using the mix test command:

$ mix test test/conduit_web/controllers/user_controller_test.exs

The execution result of running these tests will be:

Finished in 0.2 seconds
3 tests, 3 failures

Great, we have three failing tests. We now have the acceptance criteria of user registration codified in our tests. When these tests pass, the feature will be done. These tests will also help to prevent regressions caused by any changes we, or anyone else, may make in the future.

Let’s move forward by building the domain model to support user registration.

Application structure

The default directory structure used by the Phoenix generator creates a folder per context, and places them inside a folder named after the application within the lib directory.

We currently have our accounts context, along with the Phoenix web folder:

  • lib/conduit/accounts
  • lib/conduit_web

One benefit of this approach is that when a context becomes too large, it can be extracted into its own project. Using an umbrella project allows these separate Elixir applications to be used together, via internal mix references.

When our application grows too large to be comfortably hosted by a single, monolithic service we can migrate the individual apps into their own microservices. This is why it’s important to focus on separation of concerns, using contexts, from the outset. It allows us to split the application apart as production usage and requirements dictate, and more importantly it supports rewriting and deletion of code. Keeping each context highly cohesive, but loosely coupled, provides these benefits.

Within each context in our CQRS application we will create modules for the common building blocks: aggregates, commands, events, read model projections and projectors, and queries. I prefer to create a separate folder for each of these. It provides further segregation within a single context and allows you to easily locate any module by its type and name.

The folder structure for our first accounts context will be:

  • lib/conduit/accounts/aggregates
  • lib/conduit/accounts/commands
  • lib/conduit/accounts/events
  • lib/conduit/accounts/projections
  • lib/conduit/accounts/projectors
  • lib/conduit/accounts/queries
  • lib/conduit/accounts/validators

Inside the lib/conduit/accounts folder we will place the context module and a supervisor module:

  • lib/conduit/accounts/accounts.ex
  • lib/conduit/accounts/supervisor.ex

These are the public facing parts of the account context, and provide the API into its available behaviour. The supervisor is responsible for supervising the workers contained within, such as the event handlers and projectors.

Alternate structure

Instead of grouping modules by their type within a context, you may chose to group by their aggregate functionality. You can follow the convention used by Phoenix where filename and module names are suffixed by their type (e.g. user_aggregate.ex).

In this example I’ve illustrated the file structure for modules related to the User aggregate in the Accounts context, including the commands, events, projection, validators, and queries:

  • lib/conduit/accounts/user/user_aggregate.ex
  • lib/conduit/accounts/user/register_user.ex
  • lib/conduit/accounts/user/user_registered.ex
  • lib/conduit/accounts/user/user_projection.ex
  • lib/conduit/accounts/user/user_projector.ex
  • lib/conduit/accounts/user/user_by_email_query.ex
  • lib/conduit/accounts/user/unique_username_validator.ex

You can use either approach, or another application structure entirely, but it’s important that you choose a convention and adhere to it within your application.

Building our first aggregate

As we’re dealing with registering users, our first aggregate will be the user.

One decision we must take when designing an aggregate is how to identify an instance. The simplest approach is to use a UUID7. This may be generated by the client or the server, and is used to guarantee a unique identity per aggregate instance. All commands must include this identity to allow locating the target aggregate instance.

For Conduit users, we have a restriction that their username must be unique. So we could use the username to identify the aggregate and enforce this business rule. Domain events persisted for each user would be appended to a stream based upon their individual username. Populating the user aggregate would retrieve their events from the stream based on their username. Attempting to register an already taken username would fail since the aggregate exists and its state will be non-empty. However, one downside to this approach is that it would prevent us from allowing a user to amend their username at some point in the future. Remember that domain events are immutable once appended to the event store, so you cannot amend them, or move them to another stream. Instead you would need to create a new aggregate instance, using the new username, initialise its state from the existing aggregate, and mark the old aggregate instance as obsoleted.

We’ll use an assigned unique identifier for each user. The uuid package provides a UUID generator and utilities for Elixir. With this library we can assign a unique identity to a user using UUID.uuid4().

We’ll add the UUID package to our mix dependencies:

defp deps do
  [
    {:uuid, "~> 1.1"},
    # ...
  ]
end

To enforce the username uniqueness, we will validate the command before execution to ensure the username has not already been taken.

The User aggregate module, created in lib/conduit/accounts/aggregates/user.ex, defines the relevant fields as a struct, and exposes two public functions:

  1. execute/2 that accepts the empty user struct, %User{}, and the register user command, %RegisterUser{}, returning the user registered domain event.
  2. apply/2 that takes the user struct and the resultant user registered event %UserRegistered{} and mutates the aggregate state.
# lib/conduit/accounts/aggregates/user.ex
defmodule Conduit.Accounts.Aggregates.User do
  defstruct [
    :uuid,
    :username,
    :email,
    :hashed_password,
  ]

  alias Conduit.Accounts.Aggregates.User

  @doc """
  Register a new user
  """
  def execute(%User{uuid: nil}, %RegisterUser{} = register) do
    %UserRegistered{
      user_uuid: register.user_uuid,
      username: register.username,
      email: register.email,
      hashed_password: register.hashed_password,
    }
  end

  # state mutators

  def apply(%User{} = user, %UserRegistered{} = registered) do
    %User{user |
      uuid: registered.user_uuid,
      username: registered.username,
      email: registered.email,
      hashed_password: registered.hashed_password,
    }
  end
end

This approach to building aggregates will be followed for all new commands and events. The execute/2 function takes the command and returns zero, one, or more domain events. While the apply/2 function mutates the aggregate state by applying a single event.

The execute/2 function to register the user uses pattern matching to ensure the uuid field is nil. This ensures a user aggregate for a given identity can only be created once.

Why did I name the command function execute/2 and not register_user/2? This is to allow commands to be dispatched directly to the aggregate, without needing an intermediate command handler. This means less code to write. You can choose to have descriptive command function, but you must also write a command handler module to route each command to the function on the aggregate module. It’s also possible to have the command handler module implement the domain logic by returning any domain events itself, if you prefer.

Building our first command

We need to create a command to register a user. A command is a standard Elixir module using the defstruct keyword to define its fields. A struct is a map with an extra field indicating its type and allows developers to provide default values for keys.

defmodule Conduit.Accounts.Commands.RegisterUser do
  defstruct [
    :user_uuid,
    :username,
    :email,
    :password,
    :hashed_password,
  ]
end

The register user command can be constructed using familiar Elixir syntax:

alias Conduit.Accounts.Commands.RegisterUser

%RegisterUser{
  user_uuid: UUID.uuid4(),
  username: "jake",
  email: "jake@jake.jake",
  password: "jakejake",
}

When building a struct, Elixir will automatically guarantee all keys belongs to the struct. This helps prevent accidental typos:

iex(1)> %RegisterUser{         
...(1)>   user: "jake",
...(1)>   email: "jake@jake.jake",
...(1)>   password: "jakejake",
...(1)> }
** (KeyError) key :user not found in: %RegisterUser{email: "jake@jake.jake", hashed_password: nil, passwor\
d: "jakejake", username: nil, user_uuid: nil}
Constructing commands from external data

Commands will usually be populated from external data. In Conduit, this will be JSON data sent to our Phoenix web server. Phoenix will parse JSON data into an Elixir map with key based strings. We therefore need a way to construct commands from these key/value maps.

ExConstructor is an Elixir library that makes it easy to instantiate structs from external data, such as that emitted by a JSON parser.

Add use ExConstructor after a defstruct statement to inject a constructor function into the module.

ExConstructor

We’ll add this library to our mix dependencies:

defp deps do
  [
    {:exconstructor, "~> 1.1"},
    # ...
  ]
end

Then we add use ExConstructor to our command:

# lib/conduit/accounts/commands/register_user.ex
defmodule Conduit.Accounts.Commands.RegisterUser do
  defstruct [
    :user_uuid,
    :username,
    :email,
    :password,
    :hashed_password,
  ]

  use ExConstructor
end

This allows us to create the command struct from a plain map, such as that provided by the params argument in a Phoenix controller function using the new/1 function:

iex(1)> alias Conduit.Accounts.Commands.RegisterUser
iex(2)> RegisterUser.new(%{"username" => "jake", "email" => "jake@jake.jake", "password" => "jakejake"})
%Conduit.Accounts.Commands.RegisterUser{email: "jake@jake.jake",
 hashed_password: nil, password: "jakejake", user_uuid: nil, username: "jake"}

Building our first domain event

Domain events must be named in the past tense, so for user registration an appropriate event name is UserRegistered. Again we’ll use plain Elixir modules and structs to define our domain event:

# lib/conduit/accounts/events/user_registered.ex
defmodule Conduit.Accounts.Events.UserRegistered do
  @derive [Poison.Encoder]
  defstruct [
    :user_uuid,
    :username,
    :email,
    :hashed_password,
  ]
end

Note we derive the Poison.Encoder protocol in the domain event module. This is because Commanded uses the poison pure Elixir JSON library to serialize events in the database. For maximum performance, you should @derive [Poison.Encoder] for any struct used for encoding.

Writing our first unit test

With our User aggregate, register user command, and user registered event modules defined we can author the first unit test. We’ll use the test to verify the expected domain event is returned when executing the command, and that the fields are being correctly populated.

ExUnit provides an ExUnit.CaseTemplate module that allows a developer to define a test case template to be used throughout their tests. This is useful when there are a set of functions that should be shared between tests, or a shared set of setup callbacks.

We can create a case template for aggregate unit tests that provides a reusable way to execute commands against an aggregate and verify the resultant domain events. In the following Conduit.AggregateCase case template an Elixir macro is used to allow each unit test to specify the aggregate module acting as the test subject:

# test/support/aggregate_case.ex
defmodule Conduit.AggregateCase do
  @moduledoc """
  This module defines the test case to be used by aggregate tests.
  """

  use ExUnit.CaseTemplate

  using [aggregate: aggregate] do
    quote bind_quoted: [aggregate: aggregate] do
      @aggregate_module aggregate

      import Conduit.Factory

      # assert that the expected events are returned when the given commands
      # have been executed
      defp assert_events(commands, expected_events) do
        assert execute(List.wrap(commands)) == expected_events
      end

      # execute one or more commands against the aggregate
      defp execute(commands) do
        {_, events} = Enum.reduce(commands, {%@aggregate_module{}, []}, fn (command, {aggregate, _}) ->
          events = @aggregate_module.execute(aggregate, command)

          {evolve(aggregate, events), events}
        end)

        List.wrap(events)
      end

      # apply the given events to the aggregate state
      defp evolve(aggregate, events) do
        events
        |> List.wrap()
        |> Enum.reduce(aggregate, &@aggregate_module.apply(&2, &1))
      end
    end
  end
end

The unit test asserts that the register user command returns a user registered event:

# test/conduit/accounts/aggregates/user_test.exs
defmodule Conduit.Accounts.Aggregates.UserTest do
  use Conduit.AggregateCase, aggregate: Conduit.Accounts.Aggregates.User

  alias Conduit.Accounts.Events.UserRegistered

  describe "register user" do
    @tag :unit
    test "should succeed when valid" do
      user_uuid = UUID.uuid4()

      assert_events build(:register_user, user_uuid: user_uuid), [
        %UserRegistered{
          user_uuid: user_uuid,
          email: "jake@jake.jake",
          username: "jake",
          hashed_password: "jakejake",
        }
      ]
    end
  end
end

To facilitate test-driven development I use the mix test.watch command provided by the mix_test_watch package. It will automatically run tests whenever files change.

In mix.exs, add the package as a dev environment dependency:

defp deps do
  [
    {:mix_test_watch, "~> 0.5", only: :dev, runtime: false},
    # ...
  ]
end

Fetch and compile the mix dependencies:

$ mix do deps.get, deps.compile

We can now execute our tagged unit test as a one off test run:

$ mix test --only unit

… or automatically whenever a file is saved:

$ mix test.watch --only unit

Command dispatch and routing

We’ve implemented registration for the user aggregate. Now we need to expose this behaviour through the public API, the Conduit.Accounts context module. We will create a register_user/1 function that takes an Elixir map containing the user attributes, then construct and dispatch a register user command.

To dispatch a command to its intended aggregate we must create a router module that implements the Commanded.Commands.Router behaviour:

# lib/conduit/router.ex
defmodule Conduit.Router do
  use Commanded.Commands.Router

  alias Conduit.Accounts.Aggregates.User
  alias Conduit.Accounts.Commands.RegisterUser

  dispatch [RegisterUser], to: User, identity: :user_uuid
end

Once configured, the router allows us to dispatch a command:

alias Conduit.Router
alias Conduit.Accounts.Commands.RegisterUser

register_user = %RegisterUser{
  user_uuid: UUID.uuid4(),
  email: "jake@jake.jake",
  username: "jake",
  hashed_password: "hashedpw",
}

:ok = Router.dispatch(register_user)

We can pattern match on the return value to ensure that the command succeeded, or handle any failures.

The register_user/1 function in the accounts context assigns a unique identity to the user, constructs the register user command, and dispatches it to the configured aggregate:

# lib/conduit/accounts/accounts.ex
defmodule Conduit.Accounts do
  @moduledoc """
  The boundary for the Accounts system.
  """

  alias Conduit.Accounts.Commands.RegisterUser
  alias Conduit.Router

  @doc """
  Register a new user.
  """
  def register_user(attrs \\ %{}) do
    attrs
    |> assign_uuid(:user_uuid)
    |> RegisterUser.new()
    |> Router.dispatch()
  end

  # generate a unique identity
  defp assign_uuid(attrs, identity), do: Map.put(attrs, identity, UUID.uuid4())
end

To verify the expected behaviour of the register user function we turn to the accounts test in test/conduit/accounts/accounts_test.exs:

defmodule Conduit.AccountsTest do
  use Conduit.DataCase

  alias Conduit.Accounts
  alias Conduit.Accounts.Projections.User

  describe "register user" do
    @tag :integration
    test "should succeed with valid data" do
      assert {:ok, %User{} = user} = Accounts.register_user(build(:user))

      assert user.bio == "some bio"
      assert user.email == "some email"
      assert user.hashed_password == "some hashed_password"
      assert user.image == "some image"
      assert user.username == "some username"
    end
  end
end

Again the test is tagged, using @tag integration, to indicate it is an integration test and will likely be slower due to accessing the database. Running the test results in a failure:

$ mix test --only integration
Including tags: [:integration]
Excluding tags: [:test]

1) test register user should succeed with valid data (Conduit.AccountsTest)
   test/conduit/accounts/accounts_test.exs:9
   match (=) failed
   code:  assert {:ok, %User{} = user} = Accounts.register_user(build(:user))
   right: :ok
   stacktrace:
     test/conduit/accounts/accounts_test.exs:10: (test)

Finished in 0.2 seconds
8 tests, 1 failure, 7 skipped

A failing test is helpful feedback: it guides us as to what we need to build next. In this case, we need to populate our read model and return the newly registered user.

Writing our first read model projection

A projection is a read-only view of some application state, built up from the published domain events.

We’ll be using Ecto to persist our read model to a PostgreSQL database. A projection is built by a projector module defined as an event handler: it receives each published domain event and updates the read model. So a projection is read model state that is projected from domain events by a projector.

In Commanded, an event handler is an Elixir module that implements the Commanded.Event.Handler behaviour. Each event handler is given a unique name and should be included in the application supervision tree by being registered as a child of a supervisor. An event handler must implement the handle/2 callback function which receives the domain event and its associated metadata. The function must return :ok to indicate successful processing of the event. You can use pattern matching to define multiple handle/2 functions, one per domain event you want to process.

Here’s an example event handler using the Commanded.Event.Handler macro:

defmodule ExampleEventHandler do
  use Commanded.Event.Handler, name: "ExampleEventHandler"

  def handle(%AnEvent{..}, _metadata) do
    # Process domain event and return `:ok` on success
    :ok
  end
end
Commanded Ecto projections

The commanded_ecto_projections Elixir library helps you to build read model projections using the Ecto database library. Commanded Ecto projections provides a macro for defining a read model projection inside a module which are is defined as a Commanded event handler. The project macro provides a convenient DSL8 for defining projections. It uses pattern matching to specify the domain event being projected, and provides access to an Ecto.Multi data structure for grouping multiple Repo operations. Ecto.Multi is used to insert, update, and delete data, and these will be executed within a single database transaction.

The Phoenix generator has already included the Ecto package as a dependency and created an Ecto repo for us, Conduit.Repo in lib/repo.ex. We configured the database connection details for the repo in the configuring the read model store section of the getting started chapter.

We’ll add the Commanded Ecto projections package to our dependencies in mix.exs:

defp deps do
  [
    {:commanded_ecto_projections, "~> 0.6"},
    # ...
  ]
end

Fetch mix dependencies and compile:

$ mix do deps.get, deps.compile

We need to configure the commanded_ecto_projections library with the Ecto repo used by our application in config/config.exs:

config :commanded_ecto_projections, repo: Conduit.Repo

Then we generate an Ecto migration to create a projection_versions table:

$ mix ecto.gen.migration create_projection_versions

This table is used to track which events each projector has seen, to ignore events already seen should they be resent as the event store guarantees at-least-once delivery of events. It’s possible an event may be handled more than once if the event store doesn’t receive the acknowledgement of successful processing.

We need to modify the generated migration, in priv/repo/migrations, to create the projection_versions table as detailed in the Commanded projections project README:

# priv/repo/migrations/20170610130729_create_projection_versions.exs
defmodule Conduit.Repo.Migrations.CreateProjectionVersions do
  use Ecto.Migration

  def change do
    create table(:projection_versions, primary_key: false) do
      add :projection_name, :text, primary_key: true
      add :last_seen_event_number, :bigint

      timestamps()
    end
  end
end
Creating a user projection

Now we need to create our User schema module, a database migration to create the accounts_users table, and a corresponding projector module.

When we ran the Phoenix resource generator to create the accounts context, we also asked it to create a user schema and specified its fields. It generated a database migration for us in priv/repo/migrations. By default Phoenix schemas use auto-incrementing integer fields as the table primary key. As we’re using UUIDs to identify our user aggregate we need to amend the schema and migration to use the uuid data type.

We’ll add two unique indexes to the accounts_users table, on username and email, to support efficient querying on those fields.

# priv/repo/migrations/20170607124956_create_accounts_user.exs
defmodule Conduit.Repo.Migrations.CreateConduit.Accounts.User do
  use Ecto.Migration

  def change do
    create table(:accounts_users, primary_key: false) do
      add :uuid, :uuid, primary_key: true
      add :username, :string
      add :email, :string
      add :hashed_password, :string
      add :bio, :string
      add :image, :string

      timestamps()
    end

    create unique_index(:accounts_users, [:username])
    create unique_index(:accounts_users, [:email])
  end
end

The user projection schema is modified to use Ecto’s binary_id as the primary key data type:

# lib/conduit/accounts/projections/user.ex
defmodule Conduit.Accounts.Projections.User do
  use Ecto.Schema

  @primary_key {:uuid, :binary_id, autogenerate: false}

  schema "accounts_users" do
    field :username, :string, unique: true
    field :email, :string, unique: true
    field :hashed_password, :string
    field :bio, :string
    field :image, :string

    timestamps()
  end
end

Then we migrate our database:

$ mix ecto.migrate

Finally, we create a projector module to insert a user projection each time a user is registered. This uses the project macro, provided by the Commanded Ecto projections library, to match each user registered domain event and insert a new User projection into the database.

# lib/conduit/accounts/projectors/user.ex
defmodule Conduit.Accounts.Projectors.User do
  use Commanded.Projections.Ecto, name: "Accounts.Projectors.User"

  alias Conduit.Accounts.Events.UserRegistered
  alias Conduit.Accounts.Projections.User

  project %UserRegistered{} = registered do
    Ecto.Multi.insert(multi, :user, %User{
      uuid: registered.user_uuid,
      username: registered.username,
      email: registered.email,
      hashed_password: registered.hashed_password,
      bio: nil,
      image: nil,
    })
  end
end

The project macro exposes an Ecto.Multi struct, as multi, that we can use to chain together many database operations. They are executed within a single database transaction to ensure all changes succeed, or fail, together.

Include projector in supervision tree

To start and register the projector module as an event handler we need to include it within our application’s supervision tree. We will create a supervisor module per context responsible for handling its processes. The following supervisor, created in lib/conduit/accounts/supervisor.ex, will start the user projector:

# lib/conduit/accounts/supervisor.ex
defmodule Conduit.Accounts.Supervisor do
  use Supervisor

  alias Conduit.Accounts
  
  def start_link do
    Supervisor.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_arg) do
    Supervisor.init([
      Accounts.Projectors.User
    ], strategy: :one_for_one)
  end
end

In lib/application.ex, we add the Conduit.Accounts.Supervisor supervisor module to the top level application supervisor:

# lib/conduit/application.ex
defmodule Conduit.Application do
  use Application

  def start(_type, _args) do
    import Supervisor.Spec

    children = [
      # Start the Ecto repository
      supervisor(Conduit.Repo, []),

      # Start the endpoint when the application starts
      supervisor(ConduitWeb.Endpoint, []),

      # Accounts supervisor
      supervisor(Conduit.Accounts.Supervisor, []),
    ]

    opts = [strategy: :one_for_one, name: Conduit.Supervisor]
    Supervisor.start_link(children, opts)
  end

  # Tell Phoenix to update the endpoint configuration
  # whenever the application is updated.
  def config_change(changed, _new, removed) do
    ConduitWeb.Endpoint.config_change(changed, removed)
    :ok
  end
end
Reset storage between test execution

To ensure test independence we must clear the event store and read store test databases between each test execution. We already have a Conduit.DataCase module, generated by Phoenix, to use for integration tests accessing the database. This can be extended to clear both databases; so we add reset_eventstore/0 and reset_readstore/0 functions to do just that.

For the event store, we take advantage of the EventStore.Storage.Initializer.reset!/1 function to reset the database structure, removing any events, streams, and clearing all subscriptions.

The read model database is manually reset by executing a truncate table SQL statement specifying each projection table to clear. We must remember to add any additional tables to this statement as we build our application to also reset them during test execution.

defmodule Conduit.DataCase do
  use ExUnit.CaseTemplate

  using do
    quote do
      alias Conduit.Repo

      import Ecto
      import Ecto.Changeset
      import Ecto.Query
      import Conduit.Factory
      import Conduit.DataCase
    end
  end

  setup _tags do
    Application.stop(:conduit)
    Application.stop(:commanded)
    Application.stop(:eventstore)

    reset_eventstore()
    reset_readstore()

    Application.ensure_all_started(:conduit)

    :ok
  end

  defp reset_eventstore do
    {:ok, conn} =
      EventStore.configuration()
      |> EventStore.Config.parse()
      |> Postgrex.start_link()

    EventStore.Storage.Initializer.reset!(conn)
  end

  defp reset_readstore do
    readstore_config = Application.get_env(:conduit, Conduit.Repo)

    {:ok, conn} = Postgrex.start_link(readstore_config)

    Postgrex.query!(conn, truncate_readstore_tables(), [])
  end

  defp truncate_readstore_tables do
"""
TRUNCATE TABLE
  accounts_users,
  projection_versions
RESTART IDENTITY;
"""
  end
end
Returning to the accounts integration test

We have now done almost enough to make our register user test in the accounts context pass. The remaining change is to return the User projection from the register_user/1 function.

In this scenario, we could attempt to return a %User{} struct populated from the parameters passed to the register_user/1 function. The concern with this approach is the additional duplicate code we must write, and the potential for it getting out of sync during future changes. Instead we’ll take advantage of Commanded’s support for strongly consistent command dispatch.

The Commanded consistency model is opt-in, with the default consistency being :eventual. We need to define our user projector as strongly consistent:

defmodule Conduit.Accounts.Projectors.User do
  use Commanded.Projections.Ecto,
    name: "Accounts.Projectors.User",
    consistency: :strong

  # .. projection code omitted
end

Returning to the accounts context, we will update the register_user/1 function to dispatch the command using consistency: :strong:

# lib/conduit/accounts/accounts.ex
defmodule Conduit.Accounts do
  @moduledoc """
  The boundary for the Accounts system.
  """

  alias Conduit.Accounts.Commands.RegisterUser
  alias Conduit.Accounts.Projections.User
  alias Conduit.Repo
  alias Conduit.Router

  @doc """
  Register a new user.
  """
  def register_user(attrs \\ %{}) do
    uuid = UUID.uuid4()

    register_user =
      attrs
      |> assign(:user_uuid, uuid)
      |> RegisterUser.new()

    with :ok <- Router.dispatch(register_user, consistency: :strong) do
      get(User, uuid)
    else
      reply -> reply
    end
  end

  defp get(schema, uuid) do
    case Repo.get(schema, uuid) do
      nil -> {:error, :not_found}
      projection -> {:ok, projection}
    end
  end

  defp assign(attrs, key, value), do: Map.put(attrs, key, value)
end

Now when we receive an :ok reply from command dispatch we can be assured that the user projection has been updated with our newly registered user. Allowing us to query the projection and return the populated %User{}. Let’s run the accounts test to check our changes:

$ mix test test/conduit/accounts/accounts_test.exs

Success, we have a passing test.

We still have one other failing test, but that’s useful as it directs us towards what needs to be worked on next: adding command validation.

Command validation

We want to build our application to ensure that most commands are successfully handled by an aggregate. The first way to achieve this is to limit which commands can be dispatched by only allowing acceptable commands to be shown to the end user in the UI. The second level of protection before a command reaches an aggregate is command validation; all commands should be validated before being passed to the aggregate.

There are three different levels of command validation that apply to an application:

  1. Command property validation: mandatory fields, data format, min/max ranges.
  2. Domain validation rules: prevent duplicate usernames, application state based validation logic.
  3. Business invariants: protection against invalid state changes.
Command property validation

Superficial command field validation is the simplest to apply. You add rules to each command property specifying its data type, whether it’s mandatory or optional, and apply basic range checking (e.g. date must be in the future). You can even apply cross field validation (e.g. start date must be before end date). These rules apply to the command itself, requiring no external information.

Property validation helps guard against common errors, such as the user forgetting to fill out a mandatory field, by applying the rules before accepting the command and rejecting upon validation failure. These rules can be applied at the user interface to help assist the user.

Domain validation rules

In our user registration feature we have a rule that usernames must be unique. To enforce this rule we must check that a given username does not yet exist when executing the register user command. This information will need to be read from a read model. We cannot enforce this rule within our user aggregate because each aggregate instance runs completely independent from any other. It’s not possible to have a command that affects, or uses, multiple aggregates since an aggregate is itself the transaction boundary.

You could decide that this invariant was important enough to warrant protection by using an aggregate whose purpose is to record and assign unique usernames. Its job would be to enforce uniqueness by acting as a gatekeeper to the user registration. A command, such as reserve username, could be used to claim the username. The aggregate would publish a domain event with the newly assigned username on success, allowing an event handler to then register the user with the guaranteed unique username.

In Elixir a GenServer process can be successfully used to enforce uniqueness as concurrent requests to a process are handled serially. The process would allow a username to be claimed or reserved during command dispatch, preventing any later request from using the same username. The only caveat to this approach is that the GenServer’s in-memory state must be persisted to storage so that it can be restarted with the list of already taken usernames.

Business invariants

An aggregate root must protect itself against commands that would cause an invariant to be broken. This includes attempting to execute a command that is invalid for the aggregate’s current state. An example would be attempting to rename an article that has been deleted. In this scenario the aggregate would return an {:error, reason} tagged tuple from the command execute/2 function.

For certain business operations you might decide to return a domain event indicating an error, rather than preventing the command execution. An example would be attempting to withdraw money from a bank account where the amount requested is larger than the account balance. Retail banks earn interest or fees when an account goes overdrawn, so rather than reject the withdraw money command, a bank account aggregate might instead allow the withdrawal and also return an account overdrawn domain event.

Applying command property validation

For command field validation we will be using Vex.

An extensible data validation library for Elixir.

Can be used to check different data types for compliance with criteria.

Ships with built-in validators to check for attribute presence, absence, inclusion, exclusion, format, length, acceptance, and by a custom function. You can easily define new validators and override existing ones.

Vex

We’ll add the vex package to our dependencies in mix.exs:

defp deps do
  [
    {:vex, "~> 0.6"},
    # ...
  ]
end

Fetch mix dependencies and compile:

$ mix do deps.get, deps.compile

Then we add validation rules for each of the fields in the command:

defmodule Conduit.Accounts.Commands.RegisterUser do
  defstruct [
    :user_uuid,
    :username,
    :email,
    :hashed_password,
  ]

  use ExConstructor
  use Vex.Struct

  validates :user_uuid, uuid: true
  validates :username, presence: true, string: true
  validates :email, presence: true, string: true
  validates :hashed_password, presence: true, string: true
end

For the uuid field we will use a custom validator that attempts to parse the given string as a UUID:

# lib/conduit/support/validators/uuid.ex
defmodule Conduit.Support.Validators.Uuid do
  use Vex.Validator

  def validate(value, _options) do
    Vex.Validators.By.validate(value, [function: &valid_uuid?/1, allow_nil: false, allow_blank: false])
  end

  defp valid_uuid?(uuid) do
    case UUID.info(uuid) do
      {:ok, _} -> true
      {:error, _} -> false
    end
  end
end

To validate string fields, such as username and email, we will use another custom validator:

# lib/conduit/support/validators/string.ex
defmodule Conduit.Support.Validators.String do
  use Vex.Validator

  def validate(nil, _options), do: :ok
  def validate("", _options), do: :ok
  def validate(value, _options) do
    Vex.Validators.By.validate(value, [function: &String.valid?/1])
  end
end

Then register these validators in config/config.exs:

config :vex,
  sources: [
    Conduit.Support.Validators,
    Vex.Validators
  ]

Once registered, we can verify a validator is configured using iex -S mix:

iex(1)> Vex.validator(:uuid)
Conduit.Support.Validators.Uuid

With the validation rules in place, we can validate a register user command as follows:

iex(1)> alias Conduit.Accounts.Commands.RegisterUser
Conduit.Accounts.Commands.RegisterUser
iex(2)> register_user = %RegisterUser{}
%Conduit.Accounts.Commands.RegisterUser{email: nil, hashed_password: nil,
 username: nil, user_uuid: nil}
iex(3)> Vex.valid?(register_user)
 false
iex(3)> Vex.results(register_user)
[{:error, :email, :presence, "must be present"}, {:ok, :email, :string},
 {:error, :hashed_password, :presence, "must be present"},
 {:ok, :hashed_password, :string},
 {:error, :username, :presence, "must be present"}, {:ok, :username, :string},
 {:error, :user_uuid, :uuid, "must be valid"}]

Validating dispatched commands

We’ve defined our command validation rules, now we need to apply them during command dispatch.

Commanded provides middleware as an the extension point to include cross-cutting concerns into command dispatch. This can be used to add in command validation, authorization, logging, and other behaviour that you want to be called for every command the router dispatches. You define your own middleware modules and register them in your command router. They are executed before, and after success or failure, of every dispatched command.

We will write a middleware module that implements the Commanded.Middleware behaviour. It uses the Vex.valid?/1 and Vex.errors/1 functions to validate commands before dispatch:

# lib/conduit/support/middleware/validate.ex
defmodule Conduit.Support.Middleware.Validate do
  @behaviour Commanded.Middleware

  alias Commanded.Middleware.Pipeline
  import Pipeline

  def before_dispatch(%Pipeline{command: command} = pipeline) do
    case Vex.valid?(command) do
      true -> pipeline
      false -> failed_validation(pipeline)
    end
  end

  def after_dispatch(pipeline), do: pipeline
  def after_failure(pipeline), do: pipeline

  defp failed_validation(%Pipeline{command: command} = pipeline) do
    errors = command |> Vex.errors() |> merge_errors()

    pipeline
    |> respond({:error, :validation_failure, errors})
    |> halt
  end

  defp merge_errors(errors) do
    errors
    |> Enum.group_by(
      fn {_error, field, _type, _message} -> field end,
      fn {_error, _field, _type, message} -> message end)
    |> Map.new()
  end
end

This middleware will return an {error, :validation_failure, errors} tagged tuple should a command fail validation. The errors will contain the collection of validation failures, per field, that can be returned and shown to the client.

The validation middleware module is registered in the router:

# lib/conduit/router.ex
defmodule Conduit.Router do
  use Commanded.Commands.Router

  alias Conduit.Accounts.Aggregates.User
  alias Conduit.Accounts.Commands.RegisterUser
  alias Conduit.Support.Middleware.Validate

  middleware Validate

  dispatch [RegisterUser], to: User, identity: :user_uuid
end

Testing user registration validation

Returning to our accounts test module, which includes our failing test:

# test/conduit/accounts/accounts_test.exs
defmodule Conduit.AccountsTest do
  use Conduit.DataCase

  alias Conduit.Accounts
  alias Conduit.Accounts.Projections.User

  describe "register user" do
    @tag :integration
    test "should succeed with valid data" do
      assert {:ok, %User{} = user} = Accounts.register_user(build(:user))

      assert user.username == "jake"
      assert user.email == "jake@jake.jake"
      assert user.hashed_password == "jakejake"
      assert user.bio == nil
      assert user.image == nil
    end

    @tag :integration
    test "should fail with invalid data and return error" do
      assert {:error, :validation_failure, errors} = Accounts.register_user(build(:user, username: ""))

      assert errors == %{username: ["can't be empty"]}
    end
  end
end

We can run the test again to check whether it passes:

$ mix test test/conduit/accounts/accounts_test.exs
Excluding tags: [:pending]
.
  1) test register user should fail with invalid data and return error (Conduit.AccountsTest)
     test/conduit/accounts/accounts_test.exs:21
     Assertion with == failed
     code:  errors == [username: ["can't be empty"]]
     left:  [username: ["must be present"]]
     right: [username: ["can't be empty"]]
     stacktrace:
       test/conduit/accounts/accounts_test.exs:24: (test)

Finished in 0.4 seconds
2 tests, 1 failure

It’s still failing, but only because the validation error message we’re expecting, “can’t be empty”, differs from the default validation error message provided by Vex, “must be present”.

We can provide our own message when registering the validation rules in the command:

# lib/conduit/accounts/commands/register_user.ex
defmodule Conduit.Accounts.Commands.RegisterUser do
  defstruct [
    :user_uuid,
    :username,
    :email,
    :hashed_password,
  ]

  use ExConstructor
  use Vex.Struct

  validates :user_uuid, uuid: true
  validates :username, presence: [message: "can't be empty"], string: true
  validates :email, presence: [message: "can't be empty"], string: true
  validates :hashed_password, presence: [message: "can't be empty"], string: true
end

Run the test again to see it succeed:

$ mix test test/conduit/accounts/accounts_test.exs
Compiling 3 files (.ex)
Excluding tags: [:pending]
..
Finished in 0.4 seconds
2 tests, 0 failures

We now have complete end-to-end user registration including command dispatch and validation, aggregate construction, domain event publishing, and read model projection. That covers the entire flow of a CQRS application from an initial command dispatch resulting in an updated read model available to query.

Enforce unique usernames

We’ve implemented basic command field validation using Vex. Now we need to move on to the second level validation: domain validation rules. Enforcing unique usernames when registering a new user will be the first that we’ll implement.

Typically domain validation will use a read model to query for application state. In our case we already have a user projection that contains the username. We even specified a unique index on the username column in the database migration:

create unique_index(:users, [:username])

The index will ensure that querying on this column is performant.

Let’s write an integration test to assert that registering the same username will fail with a useful error message:

@tag :integration
test "should fail when username already taken and return error" do
  assert {:ok, %User{}} = Accounts.register_user(build(:user))
  assert {:error, :validation_failure, errors} = Accounts.register_user(build(:user))

  assert errors == %{username: ["has already been taken"]}
end

Running this test will fail, so we need to implement the unique username validation rule. First we build a read model query to lookup the user projection by username:

# lib/conduit/accounts/queries/user_by_username.ex
defmodule Conduit.Accounts.Queries.UserByUsername do
  import Ecto.Query

  alias Conduit.Accounts.Projections.User

  def new(username) do
    from u in User,
    where: u.username == ^username
  end
end

This can be executed by passing the query to our Ecto repository: UserByUsername.new("jake") |> Conduit.Repo.one()

We use this query to expose a new public function in the accounts context: user_by_username/1:

# lib/conduit/accounts/accounts.ex (diff)
defmodule Conduit.Accounts do
  alias Conduit.Accounts.Queries.UserByUsername

  @doc """
  Get an existing user by their username, or return `nil` if not registered
  """
  def user_by_username(username) do
    username
    |> String.downcase()
    |> UserByUsername.new()
    |> Repo.one()
  end
end

Then we can check if a username already exists in the new unique username validator, added to the accounts context in lib/conduit/accounts/validators:

# lib/conduit/accounts/validators/unique_username.ex
defmodule Conduit.Accounts.Validators.UniqueUsername do
  use Vex.Validator

  alias Conduit.Accounts

  def validate(value, _options) do
    Vex.Validators.By.validate(value, [
      function: fn value -> !username_registered?(value) end,
      message: "has already been taken"
    ])
  end

  defp username_registered?(username) do
    case Accounts.user_by_username(username) do
      nil -> false
      _ -> true
    end
  end
end

Add the accounts validators to the vex config in config/config.exs:

config :vex,
  sources: [
    Conduit.Accounts.Validators,
    Conduit.Support.Validators,
    Vex.Validators
  ]

Then we can register the new validator against the username property:

# lib/conduit/accounts/commands/register_user.ex (diff)
defmodule Conduit.Accounts.Commands.RegisterUser do
  validates :username, presence: [message: "can't be empty"], string: true, unique_username: true
end

Run the accounts integration test and we now have three passing tests:

$ mix test test/conduit/accounts/accounts_test.exs
Excluding tags: [:pending]
...
Finished in 1.3 seconds
3 tests, 0 failures
Concurrent registration

We’ve included command validation to ensure unique usernames, and have tested this when registering one user after another. However, there’s a problem: attempting to register two users with the same username concurrently. The unique username validation will pass for both commands, allowing both users with an identical username to be created. This issue exists during the small period of time between registering the user and the read model being updated.

An integration test demonstrates the problem:

defmodule Conduit.AccountsTest do
  use Conduit.DataCase

  alias Conduit.Accounts
  alias Conduit.Accounts.User

  describe "register user" do
    @tag :integration
    test "should fail when registering identical username at same time and return error" do
      1..2
      |> Enum.map(fn _ -> Task.async(fn -> Accounts.register_user(build(:user)) end) end)
      |> Enum.map(&Task.await/1)
    end
  end
end

Since the issue exists only during concurrent command handling we can use another router middleware module to enforce uniqueness. In the before_dispatch/1 callback for the register user command we can attempt to claim the username. Should that fail, it indicates that another user registration for that username is currently being processed and return a validation failure.

The middleware will use a new Unique module that provides a claim/2 function. This attempts to reserve a unique value for a given context. It returns :ok on success, or {:error, :already_taken} on failure. To make the middleware reusable for other fields we define an Elixir protocol (UniqueFields) allowing commands to specify which fields are unique.

# lib/conduit/support/middleware/uniqueness.ex
defmodule Conduit.Support.Middleware.Uniqueness do
  @behaviour Commanded.Middleware

  defprotocol UniqueFields do
    @fallback_to_any true
    @doc "Returns unique fields for the command"
    def unique(command)
  end

  defimpl UniqueFields, for: Any do
    def unique(_command), do: []
  end

  alias Conduit.Support.Unique
  alias Commanded.Middleware.Pipeline

  import Pipeline

  def before_dispatch(%Pipeline{command: command} = pipeline) do
    case ensure_uniqueness(command) do
      :ok ->
        pipeline

      {:error, errors} ->
        pipeline
        |> respond({:error, :validation_failure, errors})
        |> halt()
    end
  end

  def after_dispatch(pipeline), do: pipeline
  def after_failure(pipeline), do: pipeline

  defp ensure_uniqueness(command) do
    command
    |> UniqueFields.unique()
    |> Enum.reduce_while(:ok, fn ({unique_field, error_message}, _) ->
      value = Map.get(command, unique_field)

      case Unique.claim(unique_field,  value) do
        :ok -> {:cont, :ok}
        {:error, :already_taken} -> {:halt, {:error, Keyword.new([{unique_field, error_message}])}}
      end
    end)
  end
end

For the RegisterUser command we specify the :username field must by unique by implementing the UniqueFields protocol:

defimpl Conduit.Support.Middleware.Uniqueness.UniqueFields, for: Conduit.Accounts.Commands.RegisterUser do
  def unique(_command), do: [
    {:username, "has already been taken"},
  ]
end

The new Uniqueness middleware is registered after command validation so that it will only be applied to valid commands:

# lib/conduit/router.ex
defmodule Conduit.Router do
  use Commanded.Commands.Router

  alias Conduit.Accounts.Aggregates.User
  alias Conduit.Accounts.Commands.RegisterUser
  alias Conduit.Support.Middleware.{Uniqueness,Validate}

  middleware Validate
  middleware Uniqueness

  dispatch [RegisterUser], to: User, identity: :user_uuid
end

We’ll use a GenServer to track assigned unique values. Its state is a mapping between a context, such as :username, and the already claimed values. Attempting to claim an assigned value for a context returns an {:error, :already_taken} tagged error.

# lib/conduit/support/unique.ex
defmodule Conduit.Support.Unique do
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end

  def claim(context, value) do
    GenServer.call(__MODULE__, {:claim, context, value})
  end

  def init(state), do: {:ok, state}

  def handle_call({:claim, context, value}, _from, assignments) do
    {reply, state} = case Map.get(assignments, context) do
      nil -> {:ok, Map.put(assignments, context, MapSet.new([value]))}
      values ->
        case MapSet.member?(values, value) do
          true -> {{:error, :already_taken}, assignments}
          false -> {:ok, Map.put(assignments, context, MapSet.put(values, value))}
        end
    end

    {:reply, reply, state}
  end
end

The Conduit.Support.Unique module is included in the application supervision tree, in lib/conduit/application.ex:

defmodule Conduit.Application do
  use Application

  def start(_type, _args) do
    import Supervisor.Spec

    children = [
      # ...

      # Enforce unique constraints
      worker(Conduit.Support.Unique, []),
    ]

    opts = [strategy: :one_for_one, name: Conduit.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

We now have unique usernames enforced as part of the register user command dispatch pipeline. This should prevent duplicate usernames from being registered at exactly the same time. We can verify this by running the integration tests again:

$ mix test --only integration
Including tags: [:integration]
Excluding tags: [:test, :pending]
....
Finished in 1.7 seconds
11 tests, 0 failures, 7 skipped

Additional username validation

There are two further validation rules to implement on usernames during registration:

  1. Must be lowercase.
  2. Must only contain alphanumeric characters (a-z, 0-9).

We can use a regular expression9 to enforce both of these rules.

First we add two integration tests to cover these requirements:

@tag :integration
test "should fail when username format is invalid and return error" do
  assert {:error, :validation_failure, errors} = Accounts.register_user(build(:user, username: "j@ke"))

  assert errors == %{username: ["is invalid"]}
end

@tag :integration
test "should convert username to lowercase" do
  assert {:ok, %User{} = user} = Accounts.register_user(build(:user, username: "JAKE"))

  assert user.username == "jake"
end

Vex supports regex validation using the format validator. We add this to the username validation rules in the register user command:

defmodule Conduit.Accounts.Commands.RegisterUser do
  validates :username,
    presence: [message: "can't be empty"],
    format: [with: ~r/^[a-z0-9]+$/, allow_nil: true, allow_blank: true, message: "is invalid"],
    string: true,
    unique_username: true
end

The allow_nil and allow_blank options are included as we already have validation to ensure the username is present. We don’t want duplicate error messages when it is not provided: “can’t be empty” and “is invalid”.

We need to convert the username to lowercase during registration in the Accounts context register_user/1 function. Let’s take the opportunity to make a small refactoring by moving the existing assign_uuid/2 function into the RegisterUser module. At the same time we will include a new downcase_username/1 function that does as described. These functions are chained together using the pipeline operator after constructing the RegisterUser command struct from the user supplied attributes.

defmodule Conduit.Accounts do
  @doc """
  Register a new user.
  """
  def register_user(attrs \\ %{}) do
    uuid = UUID.uuid4()

    register_user =
      attrs
      |> RegisterUser.new()
      |> RegisterUser.assign_uuid(uuid)
      |> RegisterUser.downcase_username()

    with :ok <- Router.dispatch(register_user, consistency: :strong) do
      get(User, uuid)
    else
      reply -> reply
    end
  end
end

The new functions are added to the RegisterUser command:

defmodule Conduit.Accounts.Commands.RegisterUser do
  alias Conduit.Accounts.Commands.RegisterUser

  @doc """
  Assign a unique identity for the user
  """
  def assign_uuid(%RegisterUser{} = register_user, uuid) do
    %RegisterUser{register_user | user_uuid: uuid}
  end

  @doc """
  Convert username to lowercase characters
  """
  def downcase_username(%RegisterUser{username: username} = register_user) do
    %RegisterUser{register_user | username: String.downcase(username)}
  end
end

Running the integration test suite confirms our changes are good.

Validating a user’s email address

We can now apply the same strategy to email address validation. The rules we need to enforce are that an email address:

  1. Must be unique.
  2. Must be lowercase.
  3. Must be in the desired format: contain an @ character.

The implementation will follow a similar approach to how we validated usernames.

First, we write failing tests to cover the scenarios above:

@tag :integration
test "should fail when email address already taken and return error" do
  assert {:ok, %User{}} = Accounts.register_user(build(:user, username: "jake"))
  assert {:error, :validation_failure, errors} = Accounts.register_user(build(:user, username: "jake2"))

  assert errors == %{email: ["has already been taken"]}
end

@tag :integration
test "should fail when registering identical email addresses at same time and return error" do
  1..2
  |> Enum.map(fn x -> Task.async(fn -> Accounts.register_user(build(:user, username: "user#{x}")) end)  en\
d)
  |> Enum.map(&Task.await/1)
end

@tag :integration
test "should fail when email address format is invalid and return error" do
  assert {:error, :validation_failure, errors} = Accounts.register_user(build(:user, email: "invalidemail"\
))

  assert errors == %{email: ["is invalid"]}
end

@tag :integration
test "should convert email address to lowercase" do
  assert {:ok, %User{} = user} = Accounts.register_user(build(:user, email: "JAKE@JAKE.JAKE"))

  assert user.email == "jake@jake.jake"
end

Second, extend email validation in the command:

defmodule Conduit.Accounts.Commands.RegisterUser do
  validates :email,
    presence: [message: "can't be empty"],
    format: [with: ~r/\S+@\S+\.\S+/, allow_nil: true, allow_blank: true, message: "is invalid"],
    string: true,
    unique_email: true
end

Third, we create the new unique email validator:

# lib/conduit/accounts/validators/unique_email.ex
defmodule Conduit.Accounts.Validators.UniqueEmail do
  use Vex.Validator

  alias Conduit.Accounts

  def validate(value, _options) do
    Vex.Validators.By.validate(value, [
      function: fn value -> !email_registered?(value) end,
      message: "has already been taken"
    ])
  end

  defp email_registered?(email) do
    case Accounts.user_by_email(email) do
      nil -> false
      _ -> true
    end
  end
end

This also requires a new public user_by_email/1 function in the accounts context to retrieve a user by their email address:

defmodule Conduit.Accounts do
  @doc """
  Get an existing user by their email address, or return `nil` if not registered
  """
  def user_by_email(email) when is_binary(email) do
    email
    |> String.downcase()
    |> UserByEmail.new()
    |> Repo.one()
  end
end

The UserByEmail query is a module that constructs a standard Ecto query:

# lib/conduit/accounts/queries/user_by_email.ex
defmodule Conduit.Accounts.Queries.UserByEmail do
  import Ecto.Query

  alias Conduit.Accounts.Projections.User

  def new(email) do
    from u in User,
    where: u.email == ^email
  end
end

Fourth, we extend the UniqueFields protocol implementation for the register user command to include email address:

defimpl Conduit.Support.Middleware.Uniqueness.UniqueFields, for: Conduit.Accounts.Commands.RegisterUser do
  def unique(_command), do: [
    {:email, "has already been taken"},
    {:username, "has already been taken"},
  ]
end

Last we include the RegisterUser.downcase_email/1 function in the register user pipeline:

defmodule Conduit.Accounts do
  @doc """
  Register a new user.
  """
  def register_user(attrs \\ %{}) do
    uuid = UUID.uuid4()

    register_user =
      attrs
      |> RegisterUser.new()
      |> RegisterUser.assign_uuid(uuid)
      |> RegisterUser.downcase_username()
      |> RegisterUser.downcase_email()

    with :ok <- Router.dispatch(register_user, consistency: :strong) do
      get(User, uuid)
    else
      reply -> reply
    end
  end
end

That completes the email address validation: we run the integration test suite again to confirm this with passing tests.

Hashing the user’s password

We don’t want to store a user’s password anywhere in our application. Instead we’ll use a one-way hashing function and store the password hash. To authenticate a user during login we hash the password they provide, using the same algorithm, and compare it with the stored password hash: not the actual password.

For Conduit we’ll use the bcrypt10 password hashing function as described in how to safely store a password using bcrypt. The Comeonin library provides an implementation of the bcrypt hashing function in Elixir.

Password hashing (bcrypt, pbkdf2_sha512 and one-time passwords) library for Elixir.

This library is intended to make it very straightforward for developers to check users’ passwords in as secure a manner as possible.

Comeonin

Add comeonin and bcrypt_elixir to dependencies in mix.exs:

defp deps do
  [
    # ...
    {:bcrypt_elixir, "~> 1.0"},
    {:comeonin, "~> 4.0"},
  ]
end

Fetch mix dependencies and compile:

$ mix do deps.get, deps.compile

For our test environment only we will reduce the number of bcrypt rounds so it doesn’t slow down our test suite. In config/test.exs we configure comeonin as follows:

config :comeonin, :bcrypt_log_rounds, 4

We’ll create a Conduit.Auth module to wrap the Comeonin library’s bcrypt hashing functions:

# lib/conduit/accounts/auth.ex
defmodule Conduit.Auth do
  @moduledoc """
  Authentication using the bcrypt password hashing function.
  """

  alias Comeonin.Bcrypt

  def hash_password(password), do: Bcrypt.hashpwsalt(password)
  def validate_password(password, hash), do: Bcrypt.checkpw(password, hash)
end

Then create an integration test to verify the password is being hashed and stored in the user read model. For the test assertion we use the Auth.validate_password/2 function, shown above, which hashes the provided password, jakejake, and compares with the already hashed password saved for the user, such as $2b$04$W7A/lWysNVUqeYg8vjKCXeBniHoks4jmRziKDmACO.fvqo3wdqsea. Remember that we never store the user’s password, only a one-way hash.

@tag :integration
test "should hash password" do
  assert {:ok, %User{} = user} = Accounts.register_user(build(:user))

  assert Auth.validate_password("jakejake", user.hashed_password)
end

Next we include a password field in the register user command struct, to contain the user provided password in plain text. We add a hash_password/1 function that hashes the password, stores the hash value as hashed_password, and clears the original plain text password. This prevents the user’s password from being exposed by any command auditing.

defmodule Conduit.Accounts.Commands.RegisterUser do
  defstruct [
    user_uuid: "",
    username: "",
    email: "",
    password: "",
    hashed_password: "",
  ]

  @doc """
  Hash the password, clear the original password
  """
  def hash_password(%RegisterUser{password: password} = register_user) do
    %RegisterUser{register_user |
      password: nil,
      hashed_password: Auth.hash_password(password),
    }
  end
end

The final change is to include this function in the register user command dispatch chain:

defmodule Conduit.Accounts do
  @doc """
  Register a new user.
  """
  def register_user(attrs \\ %{}) do
    uuid = UUID.uuid4()

    register_user =
      attrs
      |> RegisterUser.new()
      |> RegisterUser.assign_uuid(uuid)
      |> RegisterUser.downcase_username()
      |> RegisterUser.downcase_email()
      |> RegisterUser.hash_password()

    with :ok <- Router.dispatch(register_user, consistency: :strong) do
      get(User, uuid)
    else
      reply -> reply
    end
  end
end

We’ve now successfully hashed the user’s password during registration, helping to protect our users’ security should our deployed environment be compromised and database accessed. The Comeonin library will generate a different 16 character length salt for each hashed password by default. This is another layer of protection against hashed password dictionary and rainbow table attacks.

Completing user registration

With user registration done, at least from the accounts context, we return to our acceptance criteria defined in the UserControllerTest integration test. To specify the initial requirements and direct our development efforts we started out by writing end-to-end tests to ensure that the /api/users registration endpoint adheres to the requirements of the JSON API.

On successful registration the following response should be returned:

{
  "user": {
    "email": "jake@jake.jake",
    "token": "jwt.token.here",
    "username": "jake",
    "bio": null,
    "image": null
  }
}

For now we will skip the authentication token, that will be addressed in the next chapter.

The integration test for successful user registration asserts against the JSON returned from a POST request to /api/users in the UserControllerTest module:

@tag :web
test "should create and return user when data is valid", %{conn: conn} do
  conn = post conn, user_path(conn, :create), user: build(:user)
  json = json_response(conn, 201)["user"]

  assert json == %{
    "bio" => nil,
    "email" => "jake@jake.jake",
    "image" => nil,
    "username" => "jake",
  }
end

Running the test still results in a failure, so there’s more work for us to do. We need to modify the user view and select a subset of the fields from our user projection to be returned as JSON data:

# lib/conduit_web/views/user_view.ex
defmodule ConduitWeb.UserView do
  use ConduitWeb, :view
  alias ConduitWeb.UserView

  def render("index.json", %{users: users}) do
    %{users: render_many(users, UserView, "user.json")}
  end

  def render("show.json", %{user: user}) do
    %{user: render_one(user, UserView, "user.json")}
  end

  def render("user.json", %{user: user}) do
    %{
      username: user.username,
      email: user.email,
      bio: user.bio,
      image: user.image,
    }
  end
end

As per the API spec we only return the username, email, bio, and image fields.

Next we need to handle the case where validation errors are returned during command dispatch. The request should fail with a 422 HTTP status code and the response body would be in the following format:

{
  "errors": {
    "username": [
      "can't be empty"
    ]
  }
}

This scenario is covered by the following test:

@tag :web
test "should not create user and render errors when data is invalid", %{conn: conn} do
  conn = post conn, user_path(conn, :create), user: build(:user, username: "")
  assert json_response(conn, 422)["errors"] == %{
    "username" => [
      "can't be empty",
    ]
  }
end

To achieve this we will use a new feature in Phoenix 1.3, the action_fallback plug for controllers to support generic error handling. Including the plug inside a controller allows you to ignore errors, and only handle the successful case. Take a look at our existing user controller, where we only pattern match on the {:ok, user} successful outcome:

# lib/conduit_web/controllers/user_controller.ex
defmodule ConduitWeb.UserController do
  use ConduitWeb, :controller

  alias Conduit.Accounts
  alias Conduit.Accounts.Projections.User

  action_fallback ConduitWeb.FallbackController

  def create(conn, %{"user" => user_params}) do
    with {:ok, %User{} = user} <- Accounts.register_user(user_params) do
      conn
      |> put_status(:created)
      |> render("show.json", user: user)
    end
  end
end

Any errors that aren’t handled within your controller can be dealt with by the configured fallback controller. We pattern match on the {:error, :validation_failure, errors} tagged error tuple returned when command dispatch fails due to a validation failure. The errors are rendered using a new validation view module and returned with an HTTP 422 “Unprocessable Entity” status code:

# lib/conduit_web/controllers/fallback_controller.ex
defmodule ConduitWeb.FallbackController do
  use ConduitWeb, :controller

  def call(conn, {:error, :validation_failure, errors}) do
    conn
    |> put_status(:unprocessable_entity)
    |> render(ConduitWeb.ValidationView, "error.json", errors: errors)
  end

  def call(conn, {:error, :not_found}) do
    conn
    |> put_status(:not_found)
    |> render(ConduitWeb.ErrorView, :"404")
  end
end

The validation view returns a map containing the errors that is rendered as JSON:

# lib/conduit_web/views/validation_view.ex
defmodule ConduitWeb.ValidationView do
  use ConduitWeb, :view

  def render("error.json", %{errors: errors}) do
    %{errors: errors}
  end
end

We can run the integration tests tagged with @web after making these changes, and the good news is they all pass:

$ mix test --only web
Including tags: [:web]
Excluding tags: [:test, :pending]

...

Finished in 2.3 seconds
18 tests, 0 failures, 15 skipped

Having completed user registration, we now move on to authentication in the next chapter.