Accounts
The Conduit blogging platform requires authors to register an account before they can publish articles. We shall begin our first feature by implementing user registration in the accounts context.
Register a user
The API spec for registration is as follows:
| HTTP verb | URL | Required fields |
|---|---|---|
| POST | /api/users | email, username, password |
Example request body:
{
"user":{
"username": "jake",
"email": "jake@jake.jake",
"password": "jakejake"
}
}
Example response body:
{
"user": {
"email": "jake@jake.jake",
"token": "jwt.token.here",
"username": "jake",
"bio": null,
"image": null
}
}
The request should fail with a 422 HTTP status code error should any of the required fields be invalid. In this case the response body would be in the following format:
{
"errors": {
"body": [
"can't be empty"
]
}
}
We must ensure that usernames are unique and cannot be registered more than once.
Building our first context
Phoenix includes a set of generators to help scaffold your application:
| Command | Action |
|---|---|
mix phx.gen.channel |
Generates a Phoenix channel |
mix phx.gen.context |
Generates a context with functions around an Ecto schema |
mix phx.gen.embedded |
Generates an embedded Ecto schema file |
mix phx.gen.html |
Generates controller, views, and context for an HTML resource |
mix phx.gen.json |
Generates controller, views, and context for a JSON resource |
mix phx.gen.presence |
Generates a Presence tracker |
mix phx.gen.schema |
Generates an Ecto schema and migration file |
mix phx.gen.secret |
Generates a secret |
Since we’re building a REST API, we can use the phx.gen.json generator to create our first context, resource, controller, and JSON view. As we already know the fields relating to our users we can include them, with their type, in the generator command:
$ mix phx.gen.json Accounts User users username:string email:string hashed_password:string bio:string imag\
e:string --table accounts_users
Overall, this generator will add the following files to lib/conduit:
- Context module in
lib/conduit/accounts/accounts.ex, serving as the API boundary. - Ecto schema in
lib/conduit/accounts/user.ex, and a database migration to create theaccounts_userstable. - View in
lib/conduit_web/views/user_view.ex. - Controller in
lib/conduit_web/controllers/user_controller.ex. - Unit and integration tests in
test/conduit/accounts.
Remember that the User module we’re creating here is not our domain model. It will be a read model projection, populated by domain events published from an aggregate.
The generator prompts us to add the resource to the :api scope in our Phoenix router module. For now, we will configure only the :create controller action to support registering a user:
# lib/conduit_web/router.ex
defmodule ConduitWeb.Router do
use ConduitWeb, :router
pipeline :api do
plug :accepts, ["json"]
end
scope "/api", ConduitWeb do
pipe_through :api
post "/users", UserController, :create
end
end
Writing our first integration test
Let’s follow Behaviour Driven Development (BDD), thinking “from the outside in”, and start by writing a failing integration test for user registration. We will include tests that cover the happy path of successfully creating a user, and for the two failure scenarios mentioned above: missing required fields and duplicate username registration.
Factories to construct test data
We will use factory functions to generate realistic data for our tests. ExMachina is an Elixir library that makes it easy to create test data and associations.
In mix.exs, add :ex_machina as a test environment dependency:
defp deps do
[
{:ex_machina, "~> 2.0", only: :test},
# ...
]
end
Fetch mix dependencies and compile:
$ mix do deps.get, deps.compile
We must ensure the ExMachina application is started in the test helper, test/test_helper.exs, before ExUnit:
{:ok, _} = Application.ensure_all_started(:ex_machina)
Now we create our factory module in test/support/factory.ex:
# test/support/factory.ex
defmodule Conduit.Factory do
use ExMachina
def user_factory do
%{
email: "jake@jake.jake",
username: "jake",
hashed_password: "jakejake",
bio: "I like to skateboard",
image: "https://i.stack.imgur.com/xHWG8.jpg",
}
end
end
In our test module, we must import Conduit.Factory to access our user factory. Then we have access to build/1 and build/2 functions to construct params for an example user to register:
build(:user)build(:user, username: "ben")
User registration integration test
In our integration test we want to verify successful user registration and check any failure includes a useful error message to help the user identify the problem.
- To test success, we assert that the returned HTTP status code is
201and JSON response matches the API:test"should create and return user when data is valid",%{conn:conn}doconn=postconn,user_path(conn,:create),user:build(:user)json=json_response(conn,201)["user"]assertjson==build(:user,bio:nil,image:nil)end - To test a validation failure, we assert the response is
422and errors are included:test"should not create user and render errors when data is invalid",%{conn:conn}doconn=postconn,user_path(conn,:create),user:build(:user,username:nil)assertjson_response(conn,422)["errors"]==[username:["can't be empty",]]end
The full integration test is given below:
# test/conduit_web/controllers/user_controller_test.exs
defmodule ConduitWeb.UserControllerTest do
use ConduitWeb.ConnCase
import Conduit.Factory
alias Conduit.Accounts
def fixture(:user, attrs \\ []) do
build(:user, attrs) |> Accounts.create_user()
end
setup %{conn: conn} do
{:ok, conn: put_req_header(conn, "accept", "application/json")}
end
describe "register user" do
@tag :web
test "should create and return user when data is valid", %{conn: conn} do
conn = post conn, user_path(conn, :create), user: build(:user)
json = json_response(conn, 201)["user"]
assert json == %{
"bio" => nil,
"email" => "jake@jake.jake",
"image" => nil,
"username" => "jake",
}
end
@tag :web
test "should not create user and render errors when data is invalid", %{conn: conn} do
conn = post conn, user_path(conn, :create), user: build(:user, username: "")
assert json_response(conn, 422)["errors"] == %{
"username" => [
"can't be empty",
]
}
end
@tag :web
test "should not create user and render errors when username has been taken", %{conn: conn} do
# register a user
{:ok, _user} = fixture(:user)
# attempt to register the same username
conn = post conn, user_path(conn, :create), user: build(:user, email: "jake2@jake.jake")
assert json_response(conn, 422)["errors"] == %{
"username" => [
"has already been taken",
]
}
end
end
end
Before running these tests, we must create the event store and read store databases for the test environment:
$ MIX_ENV=test mix do event_store.create, event_store.init
$ MIX_ENV=test mix ecto.create
Then we can execute our registration integration test using the mix test command:
$ mix test test/conduit_web/controllers/user_controller_test.exs
The execution result of running these tests will be:
Finished in 0.2 seconds
3 tests, 3 failures
Great, we have three failing tests. We now have the acceptance criteria of user registration codified in our tests. When these tests pass, the feature will be done. These tests will also help to prevent regressions caused by any changes we, or anyone else, may make in the future.
Let’s move forward by building the domain model to support user registration.
Application structure
The default directory structure used by the Phoenix generator creates a folder per context, and places them inside a folder named after the application within the lib directory.
We currently have our accounts context, along with the Phoenix web folder:
lib/conduit/accountslib/conduit_web
One benefit of this approach is that when a context becomes too large, it can be extracted into its own project. Using an umbrella project allows these separate Elixir applications to be used together, via internal mix references.
When our application grows too large to be comfortably hosted by a single, monolithic service we can migrate the individual apps into their own microservices. This is why it’s important to focus on separation of concerns, using contexts, from the outset. It allows us to split the application apart as production usage and requirements dictate, and more importantly it supports rewriting and deletion of code. Keeping each context highly cohesive, but loosely coupled, provides these benefits.
Within each context in our CQRS application we will create modules for the common building blocks: aggregates, commands, events, read model projections and projectors, and queries. I prefer to create a separate folder for each of these. It provides further segregation within a single context and allows you to easily locate any module by its type and name.
The folder structure for our first accounts context will be:
lib/conduit/accounts/aggregateslib/conduit/accounts/commandslib/conduit/accounts/eventslib/conduit/accounts/projectionslib/conduit/accounts/projectorslib/conduit/accounts/querieslib/conduit/accounts/validators
Inside the lib/conduit/accounts folder we will place the context module and a supervisor module:
lib/conduit/accounts/accounts.exlib/conduit/accounts/supervisor.ex
These are the public facing parts of the account context, and provide the API into its available behaviour. The supervisor is responsible for supervising the workers contained within, such as the event handlers and projectors.
Alternate structure
Instead of grouping modules by their type within a context, you may chose to group by their aggregate functionality. You can follow the convention used by Phoenix where filename and module names are suffixed by their type (e.g. user_aggregate.ex).
In this example I’ve illustrated the file structure for modules related to the User aggregate in the Accounts context, including the commands, events, projection, validators, and queries:
lib/conduit/accounts/user/user_aggregate.exlib/conduit/accounts/user/register_user.exlib/conduit/accounts/user/user_registered.exlib/conduit/accounts/user/user_projection.exlib/conduit/accounts/user/user_projector.exlib/conduit/accounts/user/user_by_email_query.exlib/conduit/accounts/user/unique_username_validator.ex
You can use either approach, or another application structure entirely, but it’s important that you choose a convention and adhere to it within your application.
Building our first aggregate
As we’re dealing with registering users, our first aggregate will be the user.
One decision we must take when designing an aggregate is how to identify an instance. The simplest approach is to use a UUID7. This may be generated by the client or the server, and is used to guarantee a unique identity per aggregate instance. All commands must include this identity to allow locating the target aggregate instance.
For Conduit users, we have a restriction that their username must be unique. So we could use the username to identify the aggregate and enforce this business rule. Domain events persisted for each user would be appended to a stream based upon their individual username. Populating the user aggregate would retrieve their events from the stream based on their username. Attempting to register an already taken username would fail since the aggregate exists and its state will be non-empty. However, one downside to this approach is that it would prevent us from allowing a user to amend their username at some point in the future. Remember that domain events are immutable once appended to the event store, so you cannot amend them, or move them to another stream. Instead you would need to create a new aggregate instance, using the new username, initialise its state from the existing aggregate, and mark the old aggregate instance as obsoleted.
We’ll use an assigned unique identifier for each user. The uuid package provides a UUID generator and utilities for Elixir. With this library we can assign a unique identity to a user using UUID.uuid4().
We’ll add the UUID package to our mix dependencies:
defp deps do
[
{:uuid, "~> 1.1"},
# ...
]
end
To enforce the username uniqueness, we will validate the command before execution to ensure the username has not already been taken.
The User aggregate module, created in lib/conduit/accounts/aggregates/user.ex, defines the relevant fields as a struct, and exposes two public functions:
-
execute/2that accepts the empty user struct,%User{}, and the register user command,%RegisterUser{}, returning the user registered domain event. -
apply/2that takes the user struct and the resultant user registered event%UserRegistered{}and mutates the aggregate state.
# lib/conduit/accounts/aggregates/user.ex
defmodule Conduit.Accounts.Aggregates.User do
defstruct [
:uuid,
:username,
:email,
:hashed_password,
]
alias Conduit.Accounts.Aggregates.User
@doc """
Register a new user
"""
def execute(%User{uuid: nil}, %RegisterUser{} = register) do
%UserRegistered{
user_uuid: register.user_uuid,
username: register.username,
email: register.email,
hashed_password: register.hashed_password,
}
end
# state mutators
def apply(%User{} = user, %UserRegistered{} = registered) do
%User{user |
uuid: registered.user_uuid,
username: registered.username,
email: registered.email,
hashed_password: registered.hashed_password,
}
end
end
This approach to building aggregates will be followed for all new commands and events. The execute/2 function takes the command and returns zero, one, or more domain events. While the apply/2 function mutates the aggregate state by applying a single event.
The execute/2 function to register the user uses pattern matching to ensure the uuid field is nil. This ensures a user aggregate for a given identity can only be created once.
Why did I name the command function execute/2 and not register_user/2? This is to allow commands to be dispatched directly to the aggregate, without needing an intermediate command handler. This means less code to write. You can choose to have descriptive command function, but you must also write a command handler module to route each command to the function on the aggregate module. It’s also possible to have the command handler module implement the domain logic by returning any domain events itself, if you prefer.
Building our first command
We need to create a command to register a user. A command is a standard Elixir module using the defstruct keyword to define its fields. A struct is a map with an extra field indicating its type and allows developers to provide default values for keys.
defmodule Conduit.Accounts.Commands.RegisterUser do
defstruct [
:user_uuid,
:username,
:email,
:password,
:hashed_password,
]
end
The register user command can be constructed using familiar Elixir syntax:
alias Conduit.Accounts.Commands.RegisterUser
%RegisterUser{
user_uuid: UUID.uuid4(),
username: "jake",
email: "jake@jake.jake",
password: "jakejake",
}
When building a struct, Elixir will automatically guarantee all keys belongs to the struct. This helps prevent accidental typos:
iex(1)> %RegisterUser{
...(1)> user: "jake",
...(1)> email: "jake@jake.jake",
...(1)> password: "jakejake",
...(1)> }
** (KeyError) key :user not found in: %RegisterUser{email: "jake@jake.jake", hashed_password: nil, passwor\
d: "jakejake", username: nil, user_uuid: nil}
Constructing commands from external data
Commands will usually be populated from external data. In Conduit, this will be JSON data sent to our Phoenix web server. Phoenix will parse JSON data into an Elixir map with key based strings. We therefore need a way to construct commands from these key/value maps.
ExConstructor is an Elixir library that makes it easy to instantiate structs from external data, such as that emitted by a JSON parser.
Add
use ExConstructorafter a defstruct statement to inject a constructor function into the module.
We’ll add this library to our mix dependencies:
defp deps do
[
{:exconstructor, "~> 1.1"},
# ...
]
end
Then we add use ExConstructor to our command:
# lib/conduit/accounts/commands/register_user.ex
defmodule Conduit.Accounts.Commands.RegisterUser do
defstruct [
:user_uuid,
:username,
:email,
:password,
:hashed_password,
]
use ExConstructor
end
This allows us to create the command struct from a plain map, such as that provided by the params argument in a Phoenix controller function using the new/1 function:
iex(1)> alias Conduit.Accounts.Commands.RegisterUser
iex(2)> RegisterUser.new(%{"username" => "jake", "email" => "jake@jake.jake", "password" => "jakejake"})
%Conduit.Accounts.Commands.RegisterUser{email: "jake@jake.jake",
hashed_password: nil, password: "jakejake", user_uuid: nil, username: "jake"}
Building our first domain event
Domain events must be named in the past tense, so for user registration an appropriate event name is UserRegistered. Again we’ll use plain Elixir modules and structs to define our domain event:
# lib/conduit/accounts/events/user_registered.ex
defmodule Conduit.Accounts.Events.UserRegistered do
@derive [Poison.Encoder]
defstruct [
:user_uuid,
:username,
:email,
:hashed_password,
]
end
Note we derive the Poison.Encoder protocol in the domain event module. This is because Commanded uses the poison pure Elixir JSON library to serialize events in the database. For maximum performance, you should @derive [Poison.Encoder] for any struct used for encoding.
Writing our first unit test
With our User aggregate, register user command, and user registered event modules defined we can author the first unit test. We’ll use the test to verify the expected domain event is returned when executing the command, and that the fields are being correctly populated.
ExUnit provides an ExUnit.CaseTemplate module that allows a developer to define a test case template to be used throughout their tests. This is useful when there are a set of functions that should be shared between tests, or a shared set of setup callbacks.
We can create a case template for aggregate unit tests that provides a reusable way to execute commands against an aggregate and verify the resultant domain events. In the following Conduit.AggregateCase case template an Elixir macro is used to allow each unit test to specify the aggregate module acting as the test subject:
# test/support/aggregate_case.ex
defmodule Conduit.AggregateCase do
@moduledoc """
This module defines the test case to be used by aggregate tests.
"""
use ExUnit.CaseTemplate
using [aggregate: aggregate] do
quote bind_quoted: [aggregate: aggregate] do
@aggregate_module aggregate
import Conduit.Factory
# assert that the expected events are returned when the given commands
# have been executed
defp assert_events(commands, expected_events) do
assert execute(List.wrap(commands)) == expected_events
end
# execute one or more commands against the aggregate
defp execute(commands) do
{_, events} = Enum.reduce(commands, {%@aggregate_module{}, []}, fn (command, {aggregate, _}) ->
events = @aggregate_module.execute(aggregate, command)
{evolve(aggregate, events), events}
end)
List.wrap(events)
end
# apply the given events to the aggregate state
defp evolve(aggregate, events) do
events
|> List.wrap()
|> Enum.reduce(aggregate, &@aggregate_module.apply(&2, &1))
end
end
end
end
The unit test asserts that the register user command returns a user registered event:
# test/conduit/accounts/aggregates/user_test.exs
defmodule Conduit.Accounts.Aggregates.UserTest do
use Conduit.AggregateCase, aggregate: Conduit.Accounts.Aggregates.User
alias Conduit.Accounts.Events.UserRegistered
describe "register user" do
@tag :unit
test "should succeed when valid" do
user_uuid = UUID.uuid4()
assert_events build(:register_user, user_uuid: user_uuid), [
%UserRegistered{
user_uuid: user_uuid,
email: "jake@jake.jake",
username: "jake",
hashed_password: "jakejake",
}
]
end
end
end
To facilitate test-driven development I use the mix test.watch command provided by the mix_test_watch package. It will automatically run tests whenever files change.
In mix.exs, add the package as a dev environment dependency:
defp deps do
[
{:mix_test_watch, "~> 0.5", only: :dev, runtime: false},
# ...
]
end
Fetch and compile the mix dependencies:
$ mix do deps.get, deps.compile
We can now execute our tagged unit test as a one off test run:
$ mix test --only unit
… or automatically whenever a file is saved:
$ mix test.watch --only unit
Command dispatch and routing
We’ve implemented registration for the user aggregate. Now we need to expose this behaviour through the public API, the Conduit.Accounts context module. We will create a register_user/1 function that takes an Elixir map containing the user attributes, then construct and dispatch a register user command.
To dispatch a command to its intended aggregate we must create a router module that implements the Commanded.Commands.Router behaviour:
# lib/conduit/router.ex
defmodule Conduit.Router do
use Commanded.Commands.Router
alias Conduit.Accounts.Aggregates.User
alias Conduit.Accounts.Commands.RegisterUser
dispatch [RegisterUser], to: User, identity: :user_uuid
end
Once configured, the router allows us to dispatch a command:
alias Conduit.Router
alias Conduit.Accounts.Commands.RegisterUser
register_user = %RegisterUser{
user_uuid: UUID.uuid4(),
email: "jake@jake.jake",
username: "jake",
hashed_password: "hashedpw",
}
:ok = Router.dispatch(register_user)
We can pattern match on the return value to ensure that the command succeeded, or handle any failures.
The register_user/1 function in the accounts context assigns a unique identity to the user, constructs the register user command, and dispatches it to the configured aggregate:
# lib/conduit/accounts/accounts.ex
defmodule Conduit.Accounts do
@moduledoc """
The boundary for the Accounts system.
"""
alias Conduit.Accounts.Commands.RegisterUser
alias Conduit.Router
@doc """
Register a new user.
"""
def register_user(attrs \\ %{}) do
attrs
|> assign_uuid(:user_uuid)
|> RegisterUser.new()
|> Router.dispatch()
end
# generate a unique identity
defp assign_uuid(attrs, identity), do: Map.put(attrs, identity, UUID.uuid4())
end
To verify the expected behaviour of the register user function we turn to the accounts test in test/conduit/accounts/accounts_test.exs:
defmodule Conduit.AccountsTest do
use Conduit.DataCase
alias Conduit.Accounts
alias Conduit.Accounts.Projections.User
describe "register user" do
@tag :integration
test "should succeed with valid data" do
assert {:ok, %User{} = user} = Accounts.register_user(build(:user))
assert user.bio == "some bio"
assert user.email == "some email"
assert user.hashed_password == "some hashed_password"
assert user.image == "some image"
assert user.username == "some username"
end
end
end
Again the test is tagged, using @tag integration, to indicate it is an integration test and will likely be slower due to accessing the database. Running the test results in a failure:
$ mix test --only integration
Including tags: [:integration]
Excluding tags: [:test]
1) test register user should succeed with valid data (Conduit.AccountsTest)
test/conduit/accounts/accounts_test.exs:9
match (=) failed
code: assert {:ok, %User{} = user} = Accounts.register_user(build(:user))
right: :ok
stacktrace:
test/conduit/accounts/accounts_test.exs:10: (test)
Finished in 0.2 seconds
8 tests, 1 failure, 7 skipped
A failing test is helpful feedback: it guides us as to what we need to build next. In this case, we need to populate our read model and return the newly registered user.
Writing our first read model projection
A projection is a read-only view of some application state, built up from the published domain events.
We’ll be using Ecto to persist our read model to a PostgreSQL database. A projection is built by a projector module defined as an event handler: it receives each published domain event and updates the read model. So a projection is read model state that is projected from domain events by a projector.
In Commanded, an event handler is an Elixir module that implements the Commanded.Event.Handler behaviour. Each event handler is given a unique name and should be included in the application supervision tree by being registered as a child of a supervisor. An event handler must implement the handle/2 callback function which receives the domain event and its associated metadata. The function must return :ok to indicate successful processing of the event. You can use pattern matching to define multiple handle/2 functions, one per domain event you want to process.
Here’s an example event handler using the Commanded.Event.Handler macro:
defmodule ExampleEventHandler do
use Commanded.Event.Handler, name: "ExampleEventHandler"
def handle(%AnEvent{..}, _metadata) do
# Process domain event and return `:ok` on success
:ok
end
end
Commanded Ecto projections
The commanded_ecto_projections Elixir library helps you to build read model projections using the Ecto database library.
Commanded Ecto projections provides a macro for defining a read model projection inside a module which are is defined as a Commanded event handler. The project macro provides a convenient DSL8 for defining projections. It uses pattern matching to specify the domain event being projected, and provides access to an Ecto.Multi data structure for grouping multiple Repo operations. Ecto.Multi is used to insert, update, and delete data, and these will be executed within a single database transaction.
The Phoenix generator has already included the Ecto package as a dependency and created an Ecto repo for us, Conduit.Repo in lib/repo.ex. We configured the database connection details for the repo in the configuring the read model store section of the getting started chapter.
We’ll add the Commanded Ecto projections package to our dependencies in mix.exs:
defp deps do
[
{:commanded_ecto_projections, "~> 0.6"},
# ...
]
end
Fetch mix dependencies and compile:
$ mix do deps.get, deps.compile
We need to configure the commanded_ecto_projections library with the Ecto repo used by our application in config/config.exs:
config :commanded_ecto_projections, repo: Conduit.Repo
Then we generate an Ecto migration to create a projection_versions table:
$ mix ecto.gen.migration create_projection_versions
This table is used to track which events each projector has seen, to ignore events already seen should they be resent as the event store guarantees at-least-once delivery of events. It’s possible an event may be handled more than once if the event store doesn’t receive the acknowledgement of successful processing.
We need to modify the generated migration, in priv/repo/migrations, to create the projection_versions table as detailed in the Commanded projections project README:
# priv/repo/migrations/20170610130729_create_projection_versions.exs
defmodule Conduit.Repo.Migrations.CreateProjectionVersions do
use Ecto.Migration
def change do
create table(:projection_versions, primary_key: false) do
add :projection_name, :text, primary_key: true
add :last_seen_event_number, :bigint
timestamps()
end
end
end
Creating a user projection
Now we need to create our User schema module, a database migration to create the accounts_users table, and a corresponding projector module.
When we ran the Phoenix resource generator to create the accounts context, we also asked it to create a user schema and specified its fields. It generated a database migration for us in priv/repo/migrations. By default Phoenix schemas use auto-incrementing integer fields as the table primary key. As we’re using UUIDs to identify our user aggregate we need to amend the schema and migration to use the uuid data type.
We’ll add two unique indexes to the accounts_users table, on username and email, to support efficient querying on those fields.
# priv/repo/migrations/20170607124956_create_accounts_user.exs
defmodule Conduit.Repo.Migrations.CreateConduit.Accounts.User do
use Ecto.Migration
def change do
create table(:accounts_users, primary_key: false) do
add :uuid, :uuid, primary_key: true
add :username, :string
add :email, :string
add :hashed_password, :string
add :bio, :string
add :image, :string
timestamps()
end
create unique_index(:accounts_users, [:username])
create unique_index(:accounts_users, [:email])
end
end
The user projection schema is modified to use Ecto’s binary_id as the primary key data type:
# lib/conduit/accounts/projections/user.ex
defmodule Conduit.Accounts.Projections.User do
use Ecto.Schema
@primary_key {:uuid, :binary_id, autogenerate: false}
schema "accounts_users" do
field :username, :string, unique: true
field :email, :string, unique: true
field :hashed_password, :string
field :bio, :string
field :image, :string
timestamps()
end
end
Then we migrate our database:
$ mix ecto.migrate
Finally, we create a projector module to insert a user projection each time a user is registered. This uses the project macro, provided by the Commanded Ecto projections library, to match each user registered domain event and insert a new User projection into the database.
# lib/conduit/accounts/projectors/user.ex
defmodule Conduit.Accounts.Projectors.User do
use Commanded.Projections.Ecto, name: "Accounts.Projectors.User"
alias Conduit.Accounts.Events.UserRegistered
alias Conduit.Accounts.Projections.User
project %UserRegistered{} = registered do
Ecto.Multi.insert(multi, :user, %User{
uuid: registered.user_uuid,
username: registered.username,
email: registered.email,
hashed_password: registered.hashed_password,
bio: nil,
image: nil,
})
end
end
The project macro exposes an Ecto.Multi struct, as multi, that we can use to chain together many database operations. They are executed within a single database transaction to ensure all changes succeed, or fail, together.
Include projector in supervision tree
To start and register the projector module as an event handler we need to include it within our application’s supervision tree. We will create a supervisor module per context responsible for handling its processes. The following supervisor, created in lib/conduit/accounts/supervisor.ex, will start the user projector:
# lib/conduit/accounts/supervisor.ex
defmodule Conduit.Accounts.Supervisor do
use Supervisor
alias Conduit.Accounts
def start_link do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_arg) do
Supervisor.init([
Accounts.Projectors.User
], strategy: :one_for_one)
end
end
In lib/application.ex, we add the Conduit.Accounts.Supervisor supervisor module to the top level application supervisor:
# lib/conduit/application.ex
defmodule Conduit.Application do
use Application
def start(_type, _args) do
import Supervisor.Spec
children = [
# Start the Ecto repository
supervisor(Conduit.Repo, []),
# Start the endpoint when the application starts
supervisor(ConduitWeb.Endpoint, []),
# Accounts supervisor
supervisor(Conduit.Accounts.Supervisor, []),
]
opts = [strategy: :one_for_one, name: Conduit.Supervisor]
Supervisor.start_link(children, opts)
end
# Tell Phoenix to update the endpoint configuration
# whenever the application is updated.
def config_change(changed, _new, removed) do
ConduitWeb.Endpoint.config_change(changed, removed)
:ok
end
end
Reset storage between test execution
To ensure test independence we must clear the event store and read store test databases between each test execution. We already have a Conduit.DataCase module, generated by Phoenix, to use for integration tests accessing the database. This can be extended to clear both databases; so we add reset_eventstore/0 and reset_readstore/0 functions to do just that.
For the event store, we take advantage of the EventStore.Storage.Initializer.reset!/1 function to reset the database structure, removing any events, streams, and clearing all subscriptions.
The read model database is manually reset by executing a truncate table SQL statement specifying each projection table to clear. We must remember to add any additional tables to this statement as we build our application to also reset them during test execution.
defmodule Conduit.DataCase do
use ExUnit.CaseTemplate
using do
quote do
alias Conduit.Repo
import Ecto
import Ecto.Changeset
import Ecto.Query
import Conduit.Factory
import Conduit.DataCase
end
end
setup _tags do
Application.stop(:conduit)
Application.stop(:commanded)
Application.stop(:eventstore)
reset_eventstore()
reset_readstore()
Application.ensure_all_started(:conduit)
:ok
end
defp reset_eventstore do
{:ok, conn} =
EventStore.configuration()
|> EventStore.Config.parse()
|> Postgrex.start_link()
EventStore.Storage.Initializer.reset!(conn)
end
defp reset_readstore do
readstore_config = Application.get_env(:conduit, Conduit.Repo)
{:ok, conn} = Postgrex.start_link(readstore_config)
Postgrex.query!(conn, truncate_readstore_tables(), [])
end
defp truncate_readstore_tables do
"""
TRUNCATE TABLE
accounts_users,
projection_versions
RESTART IDENTITY;
"""
end
end
Returning to the accounts integration test
We have now done almost enough to make our register user test in the accounts context pass. The remaining change is to return the User projection from the register_user/1 function.
In this scenario, we could attempt to return a %User{} struct populated from the parameters passed to the register_user/1 function. The concern with this approach is the additional duplicate code we must write, and the potential for it getting out of sync during future changes. Instead we’ll take advantage of Commanded’s support for strongly consistent command dispatch.
The Commanded consistency model is opt-in, with the default consistency being :eventual. We need to define our user projector as strongly consistent:
defmodule Conduit.Accounts.Projectors.User do
use Commanded.Projections.Ecto,
name: "Accounts.Projectors.User",
consistency: :strong
# .. projection code omitted
end
Returning to the accounts context, we will update the register_user/1 function to dispatch the command using consistency: :strong:
# lib/conduit/accounts/accounts.ex
defmodule Conduit.Accounts do
@moduledoc """
The boundary for the Accounts system.
"""
alias Conduit.Accounts.Commands.RegisterUser
alias Conduit.Accounts.Projections.User
alias Conduit.Repo
alias Conduit.Router
@doc """
Register a new user.
"""
def register_user(attrs \\ %{}) do
uuid = UUID.uuid4()
register_user =
attrs
|> assign(:user_uuid, uuid)
|> RegisterUser.new()
with :ok <- Router.dispatch(register_user, consistency: :strong) do
get(User, uuid)
else
reply -> reply
end
end
defp get(schema, uuid) do
case Repo.get(schema, uuid) do
nil -> {:error, :not_found}
projection -> {:ok, projection}
end
end
defp assign(attrs, key, value), do: Map.put(attrs, key, value)
end
Now when we receive an :ok reply from command dispatch we can be assured that the user projection has been updated with our newly registered user. Allowing us to query the projection and return the populated %User{}. Let’s run the accounts test to check our changes:
$ mix test test/conduit/accounts/accounts_test.exs
Success, we have a passing test.
We still have one other failing test, but that’s useful as it directs us towards what needs to be worked on next: adding command validation.
Command validation
We want to build our application to ensure that most commands are successfully handled by an aggregate. The first way to achieve this is to limit which commands can be dispatched by only allowing acceptable commands to be shown to the end user in the UI. The second level of protection before a command reaches an aggregate is command validation; all commands should be validated before being passed to the aggregate.
There are three different levels of command validation that apply to an application:
- Command property validation: mandatory fields, data format, min/max ranges.
- Domain validation rules: prevent duplicate usernames, application state based validation logic.
- Business invariants: protection against invalid state changes.
Command property validation
Superficial command field validation is the simplest to apply. You add rules to each command property specifying its data type, whether it’s mandatory or optional, and apply basic range checking (e.g. date must be in the future). You can even apply cross field validation (e.g. start date must be before end date). These rules apply to the command itself, requiring no external information.
Property validation helps guard against common errors, such as the user forgetting to fill out a mandatory field, by applying the rules before accepting the command and rejecting upon validation failure. These rules can be applied at the user interface to help assist the user.
Domain validation rules
In our user registration feature we have a rule that usernames must be unique. To enforce this rule we must check that a given username does not yet exist when executing the register user command. This information will need to be read from a read model. We cannot enforce this rule within our user aggregate because each aggregate instance runs completely independent from any other. It’s not possible to have a command that affects, or uses, multiple aggregates since an aggregate is itself the transaction boundary.
You could decide that this invariant was important enough to warrant protection by using an aggregate whose purpose is to record and assign unique usernames. Its job would be to enforce uniqueness by acting as a gatekeeper to the user registration. A command, such as reserve username, could be used to claim the username. The aggregate would publish a domain event with the newly assigned username on success, allowing an event handler to then register the user with the guaranteed unique username.
In Elixir a GenServer process can be successfully used to enforce uniqueness as concurrent requests to a process are handled serially. The process would allow a username to be claimed or reserved during command dispatch, preventing any later request from using the same username. The only caveat to this approach is that the GenServer’s in-memory state must be persisted to storage so that it can be restarted with the list of already taken usernames.
Business invariants
An aggregate root must protect itself against commands that would cause an invariant to be broken. This includes attempting to execute a command that is invalid for the aggregate’s current state. An example would be attempting to rename an article that has been deleted. In this scenario the aggregate would return an {:error, reason} tagged tuple from the command execute/2 function.
For certain business operations you might decide to return a domain event indicating an error, rather than preventing the command execution. An example would be attempting to withdraw money from a bank account where the amount requested is larger than the account balance. Retail banks earn interest or fees when an account goes overdrawn, so rather than reject the withdraw money command, a bank account aggregate might instead allow the withdrawal and also return an account overdrawn domain event.
Applying command property validation
For command field validation we will be using Vex.
An extensible data validation library for Elixir.
Can be used to check different data types for compliance with criteria.
Ships with built-in validators to check for attribute presence, absence, inclusion, exclusion, format, length, acceptance, and by a custom function. You can easily define new validators and override existing ones.
– Vex
We’ll add the vex package to our dependencies in mix.exs:
defp deps do
[
{:vex, "~> 0.6"},
# ...
]
end
Fetch mix dependencies and compile:
$ mix do deps.get, deps.compile
Then we add validation rules for each of the fields in the command:
defmodule Conduit.Accounts.Commands.RegisterUser do
defstruct [
:user_uuid,
:username,
:email,
:hashed_password,
]
use ExConstructor
use Vex.Struct
validates :user_uuid, uuid: true
validates :username, presence: true, string: true
validates :email, presence: true, string: true
validates :hashed_password, presence: true, string: true
end
For the uuid field we will use a custom validator that attempts to parse the given string as a UUID:
# lib/conduit/support/validators/uuid.ex
defmodule Conduit.Support.Validators.Uuid do
use Vex.Validator
def validate(value, _options) do
Vex.Validators.By.validate(value, [function: &valid_uuid?/1, allow_nil: false, allow_blank: false])
end
defp valid_uuid?(uuid) do
case UUID.info(uuid) do
{:ok, _} -> true
{:error, _} -> false
end
end
end
To validate string fields, such as username and email, we will use another custom validator:
# lib/conduit/support/validators/string.ex
defmodule Conduit.Support.Validators.String do
use Vex.Validator
def validate(nil, _options), do: :ok
def validate("", _options), do: :ok
def validate(value, _options) do
Vex.Validators.By.validate(value, [function: &String.valid?/1])
end
end
Then register these validators in config/config.exs:
config :vex,
sources: [
Conduit.Support.Validators,
Vex.Validators
]
Once registered, we can verify a validator is configured using iex -S mix:
iex(1)> Vex.validator(:uuid)
Conduit.Support.Validators.Uuid
With the validation rules in place, we can validate a register user command as follows:
iex(1)> alias Conduit.Accounts.Commands.RegisterUser
Conduit.Accounts.Commands.RegisterUser
iex(2)> register_user = %RegisterUser{}
%Conduit.Accounts.Commands.RegisterUser{email: nil, hashed_password: nil,
username: nil, user_uuid: nil}
iex(3)> Vex.valid?(register_user)
false
iex(3)> Vex.results(register_user)
[{:error, :email, :presence, "must be present"}, {:ok, :email, :string},
{:error, :hashed_password, :presence, "must be present"},
{:ok, :hashed_password, :string},
{:error, :username, :presence, "must be present"}, {:ok, :username, :string},
{:error, :user_uuid, :uuid, "must be valid"}]
Validating dispatched commands
We’ve defined our command validation rules, now we need to apply them during command dispatch.
Commanded provides middleware as an the extension point to include cross-cutting concerns into command dispatch. This can be used to add in command validation, authorization, logging, and other behaviour that you want to be called for every command the router dispatches. You define your own middleware modules and register them in your command router. They are executed before, and after success or failure, of every dispatched command.
We will write a middleware module that implements the Commanded.Middleware behaviour. It uses the Vex.valid?/1 and Vex.errors/1 functions to validate commands before dispatch:
# lib/conduit/support/middleware/validate.ex
defmodule Conduit.Support.Middleware.Validate do
@behaviour Commanded.Middleware
alias Commanded.Middleware.Pipeline
import Pipeline
def before_dispatch(%Pipeline{command: command} = pipeline) do
case Vex.valid?(command) do
true -> pipeline
false -> failed_validation(pipeline)
end
end
def after_dispatch(pipeline), do: pipeline
def after_failure(pipeline), do: pipeline
defp failed_validation(%Pipeline{command: command} = pipeline) do
errors = command |> Vex.errors() |> merge_errors()
pipeline
|> respond({:error, :validation_failure, errors})
|> halt
end
defp merge_errors(errors) do
errors
|> Enum.group_by(
fn {_error, field, _type, _message} -> field end,
fn {_error, _field, _type, message} -> message end)
|> Map.new()
end
end
This middleware will return an {error, :validation_failure, errors} tagged tuple should a command fail validation. The errors will contain the collection of validation failures, per field, that can be returned and shown to the client.
The validation middleware module is registered in the router:
# lib/conduit/router.ex
defmodule Conduit.Router do
use Commanded.Commands.Router
alias Conduit.Accounts.Aggregates.User
alias Conduit.Accounts.Commands.RegisterUser
alias Conduit.Support.Middleware.Validate
middleware Validate
dispatch [RegisterUser], to: User, identity: :user_uuid
end
Testing user registration validation
Returning to our accounts test module, which includes our failing test:
# test/conduit/accounts/accounts_test.exs
defmodule Conduit.AccountsTest do
use Conduit.DataCase
alias Conduit.Accounts
alias Conduit.Accounts.Projections.User
describe "register user" do
@tag :integration
test "should succeed with valid data" do
assert {:ok, %User{} = user} = Accounts.register_user(build(:user))
assert user.username == "jake"
assert user.email == "jake@jake.jake"
assert user.hashed_password == "jakejake"
assert user.bio == nil
assert user.image == nil
end
@tag :integration
test "should fail with invalid data and return error" do
assert {:error, :validation_failure, errors} = Accounts.register_user(build(:user, username: ""))
assert errors == %{username: ["can't be empty"]}
end
end
end
We can run the test again to check whether it passes:
$ mix test test/conduit/accounts/accounts_test.exs
Excluding tags: [:pending]
.
1) test register user should fail with invalid data and return error (Conduit.AccountsTest)
test/conduit/accounts/accounts_test.exs:21
Assertion with == failed
code: errors == [username: ["can't be empty"]]
left: [username: ["must be present"]]
right: [username: ["can't be empty"]]
stacktrace:
test/conduit/accounts/accounts_test.exs:24: (test)
Finished in 0.4 seconds
2 tests, 1 failure
It’s still failing, but only because the validation error message we’re expecting, “can’t be empty”, differs from the default validation error message provided by Vex, “must be present”.
We can provide our own message when registering the validation rules in the command:
# lib/conduit/accounts/commands/register_user.ex
defmodule Conduit.Accounts.Commands.RegisterUser do
defstruct [
:user_uuid,
:username,
:email,
:hashed_password,
]
use ExConstructor
use Vex.Struct
validates :user_uuid, uuid: true
validates :username, presence: [message: "can't be empty"], string: true
validates :email, presence: [message: "can't be empty"], string: true
validates :hashed_password, presence: [message: "can't be empty"], string: true
end
Run the test again to see it succeed:
$ mix test test/conduit/accounts/accounts_test.exs
Compiling 3 files (.ex)
Excluding tags: [:pending]
..
Finished in 0.4 seconds
2 tests, 0 failures
We now have complete end-to-end user registration including command dispatch and validation, aggregate construction, domain event publishing, and read model projection. That covers the entire flow of a CQRS application from an initial command dispatch resulting in an updated read model available to query.
Enforce unique usernames
We’ve implemented basic command field validation using Vex. Now we need to move on to the second level validation: domain validation rules. Enforcing unique usernames when registering a new user will be the first that we’ll implement.
Typically domain validation will use a read model to query for application state. In our case we already have a user projection that contains the username. We even specified a unique index on the username column in the database migration:
create unique_index(:users, [:username])
The index will ensure that querying on this column is performant.
Let’s write an integration test to assert that registering the same username will fail with a useful error message:
@tag :integration
test "should fail when username already taken and return error" do
assert {:ok, %User{}} = Accounts.register_user(build(:user))
assert {:error, :validation_failure, errors} = Accounts.register_user(build(:user))
assert errors == %{username: ["has already been taken"]}
end
Running this test will fail, so we need to implement the unique username validation rule. First we build a read model query to lookup the user projection by username:
# lib/conduit/accounts/queries/user_by_username.ex
defmodule Conduit.Accounts.Queries.UserByUsername do
import Ecto.Query
alias Conduit.Accounts.Projections.User
def new(username) do
from u in User,
where: u.username == ^username
end
end
This can be executed by passing the query to our Ecto repository: UserByUsername.new("jake") |> Conduit.Repo.one()
We use this query to expose a new public function in the accounts context: user_by_username/1:
# lib/conduit/accounts/accounts.ex (diff)
defmodule Conduit.Accounts do
alias Conduit.Accounts.Queries.UserByUsername
@doc """
Get an existing user by their username, or return `nil` if not registered
"""
def user_by_username(username) do
username
|> String.downcase()
|> UserByUsername.new()
|> Repo.one()
end
end
Then we can check if a username already exists in the new unique username validator, added to the accounts context in lib/conduit/accounts/validators:
# lib/conduit/accounts/validators/unique_username.ex
defmodule Conduit.Accounts.Validators.UniqueUsername do
use Vex.Validator
alias Conduit.Accounts
def validate(value, _options) do
Vex.Validators.By.validate(value, [
function: fn value -> !username_registered?(value) end,
message: "has already been taken"
])
end
defp username_registered?(username) do
case Accounts.user_by_username(username) do
nil -> false
_ -> true
end
end
end
Add the accounts validators to the vex config in config/config.exs:
config :vex,
sources: [
Conduit.Accounts.Validators,
Conduit.Support.Validators,
Vex.Validators
]
Then we can register the new validator against the username property:
# lib/conduit/accounts/commands/register_user.ex (diff)
defmodule Conduit.Accounts.Commands.RegisterUser do
validates :username, presence: [message: "can't be empty"], string: true, unique_username: true
end
Run the accounts integration test and we now have three passing tests:
$ mix test test/conduit/accounts/accounts_test.exs
Excluding tags: [:pending]
...
Finished in 1.3 seconds
3 tests, 0 failures
Concurrent registration
We’ve included command validation to ensure unique usernames, and have tested this when registering one user after another. However, there’s a problem: attempting to register two users with the same username concurrently. The unique username validation will pass for both commands, allowing both users with an identical username to be created. This issue exists during the small period of time between registering the user and the read model being updated.
An integration test demonstrates the problem:
defmodule Conduit.AccountsTest do
use Conduit.DataCase
alias Conduit.Accounts
alias Conduit.Accounts.User
describe "register user" do
@tag :integration
test "should fail when registering identical username at same time and return error" do
1..2
|> Enum.map(fn _ -> Task.async(fn -> Accounts.register_user(build(:user)) end) end)
|> Enum.map(&Task.await/1)
end
end
end
Since the issue exists only during concurrent command handling we can use another router middleware module to enforce uniqueness. In the before_dispatch/1 callback for the register user command we can attempt to claim the username. Should that fail, it indicates that another user registration for that username is currently being processed and return a validation failure.
The middleware will use a new Unique module that provides a claim/2 function. This attempts to reserve a unique value for a given context. It returns :ok on success, or {:error, :already_taken} on failure. To make the middleware reusable for other fields we define an Elixir protocol (UniqueFields) allowing commands to specify which fields are unique.
# lib/conduit/support/middleware/uniqueness.ex
defmodule Conduit.Support.Middleware.Uniqueness do
@behaviour Commanded.Middleware
defprotocol UniqueFields do
@fallback_to_any true
@doc "Returns unique fields for the command"
def unique(command)
end
defimpl UniqueFields, for: Any do
def unique(_command), do: []
end
alias Conduit.Support.Unique
alias Commanded.Middleware.Pipeline
import Pipeline
def before_dispatch(%Pipeline{command: command} = pipeline) do
case ensure_uniqueness(command) do
:ok ->
pipeline
{:error, errors} ->
pipeline
|> respond({:error, :validation_failure, errors})
|> halt()
end
end
def after_dispatch(pipeline), do: pipeline
def after_failure(pipeline), do: pipeline
defp ensure_uniqueness(command) do
command
|> UniqueFields.unique()
|> Enum.reduce_while(:ok, fn ({unique_field, error_message}, _) ->
value = Map.get(command, unique_field)
case Unique.claim(unique_field, value) do
:ok -> {:cont, :ok}
{:error, :already_taken} -> {:halt, {:error, Keyword.new([{unique_field, error_message}])}}
end
end)
end
end
For the RegisterUser command we specify the :username field must by unique by implementing the UniqueFields protocol:
defimpl Conduit.Support.Middleware.Uniqueness.UniqueFields, for: Conduit.Accounts.Commands.RegisterUser do
def unique(_command), do: [
{:username, "has already been taken"},
]
end
The new Uniqueness middleware is registered after command validation so that it will only be applied to valid commands:
# lib/conduit/router.ex
defmodule Conduit.Router do
use Commanded.Commands.Router
alias Conduit.Accounts.Aggregates.User
alias Conduit.Accounts.Commands.RegisterUser
alias Conduit.Support.Middleware.{Uniqueness,Validate}
middleware Validate
middleware Uniqueness
dispatch [RegisterUser], to: User, identity: :user_uuid
end
We’ll use a GenServer to track assigned unique values. Its state is a mapping between a context, such as :username, and the already claimed values. Attempting to claim an assigned value for a context returns an {:error, :already_taken} tagged error.
# lib/conduit/support/unique.ex
defmodule Conduit.Support.Unique do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def claim(context, value) do
GenServer.call(__MODULE__, {:claim, context, value})
end
def init(state), do: {:ok, state}
def handle_call({:claim, context, value}, _from, assignments) do
{reply, state} = case Map.get(assignments, context) do
nil -> {:ok, Map.put(assignments, context, MapSet.new([value]))}
values ->
case MapSet.member?(values, value) do
true -> {{:error, :already_taken}, assignments}
false -> {:ok, Map.put(assignments, context, MapSet.put(values, value))}
end
end
{:reply, reply, state}
end
end
The Conduit.Support.Unique module is included in the application supervision tree, in lib/conduit/application.ex:
defmodule Conduit.Application do
use Application
def start(_type, _args) do
import Supervisor.Spec
children = [
# ...
# Enforce unique constraints
worker(Conduit.Support.Unique, []),
]
opts = [strategy: :one_for_one, name: Conduit.Supervisor]
Supervisor.start_link(children, opts)
end
end
We now have unique usernames enforced as part of the register user command dispatch pipeline. This should prevent duplicate usernames from being registered at exactly the same time. We can verify this by running the integration tests again:
$ mix test --only integration
Including tags: [:integration]
Excluding tags: [:test, :pending]
....
Finished in 1.7 seconds
11 tests, 0 failures, 7 skipped
Additional username validation
There are two further validation rules to implement on usernames during registration:
- Must be lowercase.
- Must only contain alphanumeric characters (a-z, 0-9).
We can use a regular expression9 to enforce both of these rules.
First we add two integration tests to cover these requirements:
@tag :integration
test "should fail when username format is invalid and return error" do
assert {:error, :validation_failure, errors} = Accounts.register_user(build(:user, username: "j@ke"))
assert errors == %{username: ["is invalid"]}
end
@tag :integration
test "should convert username to lowercase" do
assert {:ok, %User{} = user} = Accounts.register_user(build(:user, username: "JAKE"))
assert user.username == "jake"
end
Vex supports regex validation using the format validator. We add this to the username validation rules in the register user command:
defmodule Conduit.Accounts.Commands.RegisterUser do
validates :username,
presence: [message: "can't be empty"],
format: [with: ~r/^[a-z0-9]+$/, allow_nil: true, allow_blank: true, message: "is invalid"],
string: true,
unique_username: true
end
The allow_nil and allow_blank options are included as we already have validation to ensure the username is present. We don’t want duplicate error messages when it is not provided: “can’t be empty” and “is invalid”.
We need to convert the username to lowercase during registration in the Accounts context register_user/1 function. Let’s take the opportunity to make a small refactoring by moving the existing assign_uuid/2 function into the RegisterUser module. At the same time we will include a new downcase_username/1 function that does as described. These functions are chained together using the pipeline operator after constructing the RegisterUser command struct from the user supplied attributes.
defmodule Conduit.Accounts do
@doc """
Register a new user.
"""
def register_user(attrs \\ %{}) do
uuid = UUID.uuid4()
register_user =
attrs
|> RegisterUser.new()
|> RegisterUser.assign_uuid(uuid)
|> RegisterUser.downcase_username()
with :ok <- Router.dispatch(register_user, consistency: :strong) do
get(User, uuid)
else
reply -> reply
end
end
end
The new functions are added to the RegisterUser command:
defmodule Conduit.Accounts.Commands.RegisterUser do
alias Conduit.Accounts.Commands.RegisterUser
@doc """
Assign a unique identity for the user
"""
def assign_uuid(%RegisterUser{} = register_user, uuid) do
%RegisterUser{register_user | user_uuid: uuid}
end
@doc """
Convert username to lowercase characters
"""
def downcase_username(%RegisterUser{username: username} = register_user) do
%RegisterUser{register_user | username: String.downcase(username)}
end
end
Running the integration test suite confirms our changes are good.
Validating a user’s email address
We can now apply the same strategy to email address validation. The rules we need to enforce are that an email address:
- Must be unique.
- Must be lowercase.
- Must be in the desired format: contain an
@character.
The implementation will follow a similar approach to how we validated usernames.
First, we write failing tests to cover the scenarios above:
@tag :integration
test "should fail when email address already taken and return error" do
assert {:ok, %User{}} = Accounts.register_user(build(:user, username: "jake"))
assert {:error, :validation_failure, errors} = Accounts.register_user(build(:user, username: "jake2"))
assert errors == %{email: ["has already been taken"]}
end
@tag :integration
test "should fail when registering identical email addresses at same time and return error" do
1..2
|> Enum.map(fn x -> Task.async(fn -> Accounts.register_user(build(:user, username: "user#{x}")) end) en\
d)
|> Enum.map(&Task.await/1)
end
@tag :integration
test "should fail when email address format is invalid and return error" do
assert {:error, :validation_failure, errors} = Accounts.register_user(build(:user, email: "invalidemail"\
))
assert errors == %{email: ["is invalid"]}
end
@tag :integration
test "should convert email address to lowercase" do
assert {:ok, %User{} = user} = Accounts.register_user(build(:user, email: "JAKE@JAKE.JAKE"))
assert user.email == "jake@jake.jake"
end
Second, extend email validation in the command:
defmodule Conduit.Accounts.Commands.RegisterUser do
validates :email,
presence: [message: "can't be empty"],
format: [with: ~r/\S+@\S+\.\S+/, allow_nil: true, allow_blank: true, message: "is invalid"],
string: true,
unique_email: true
end
Third, we create the new unique email validator:
# lib/conduit/accounts/validators/unique_email.ex
defmodule Conduit.Accounts.Validators.UniqueEmail do
use Vex.Validator
alias Conduit.Accounts
def validate(value, _options) do
Vex.Validators.By.validate(value, [
function: fn value -> !email_registered?(value) end,
message: "has already been taken"
])
end
defp email_registered?(email) do
case Accounts.user_by_email(email) do
nil -> false
_ -> true
end
end
end
This also requires a new public user_by_email/1 function in the accounts context to retrieve a user by their email address:
defmodule Conduit.Accounts do
@doc """
Get an existing user by their email address, or return `nil` if not registered
"""
def user_by_email(email) when is_binary(email) do
email
|> String.downcase()
|> UserByEmail.new()
|> Repo.one()
end
end
The UserByEmail query is a module that constructs a standard Ecto query:
# lib/conduit/accounts/queries/user_by_email.ex
defmodule Conduit.Accounts.Queries.UserByEmail do
import Ecto.Query
alias Conduit.Accounts.Projections.User
def new(email) do
from u in User,
where: u.email == ^email
end
end
Fourth, we extend the UniqueFields protocol implementation for the register user command to include email address:
defimpl Conduit.Support.Middleware.Uniqueness.UniqueFields, for: Conduit.Accounts.Commands.RegisterUser do
def unique(_command), do: [
{:email, "has already been taken"},
{:username, "has already been taken"},
]
end
Last we include the RegisterUser.downcase_email/1 function in the register user pipeline:
defmodule Conduit.Accounts do
@doc """
Register a new user.
"""
def register_user(attrs \\ %{}) do
uuid = UUID.uuid4()
register_user =
attrs
|> RegisterUser.new()
|> RegisterUser.assign_uuid(uuid)
|> RegisterUser.downcase_username()
|> RegisterUser.downcase_email()
with :ok <- Router.dispatch(register_user, consistency: :strong) do
get(User, uuid)
else
reply -> reply
end
end
end
That completes the email address validation: we run the integration test suite again to confirm this with passing tests.
Hashing the user’s password
We don’t want to store a user’s password anywhere in our application. Instead we’ll use a one-way hashing function and store the password hash. To authenticate a user during login we hash the password they provide, using the same algorithm, and compare it with the stored password hash: not the actual password.
For Conduit we’ll use the bcrypt10 password hashing function as described in how to safely store a password using bcrypt. The Comeonin library provides an implementation of the bcrypt hashing function in Elixir.
Password hashing (bcrypt, pbkdf2_sha512 and one-time passwords) library for Elixir.
This library is intended to make it very straightforward for developers to check users’ passwords in as secure a manner as possible.
– Comeonin
Add comeonin and bcrypt_elixir to dependencies in mix.exs:
defp deps do
[
# ...
{:bcrypt_elixir, "~> 1.0"},
{:comeonin, "~> 4.0"},
]
end
Fetch mix dependencies and compile:
$ mix do deps.get, deps.compile
For our test environment only we will reduce the number of bcrypt rounds so it doesn’t slow down our test suite. In config/test.exs we configure comeonin as follows:
config :comeonin, :bcrypt_log_rounds, 4
We’ll create a Conduit.Auth module to wrap the Comeonin library’s bcrypt hashing functions:
# lib/conduit/accounts/auth.ex
defmodule Conduit.Auth do
@moduledoc """
Authentication using the bcrypt password hashing function.
"""
alias Comeonin.Bcrypt
def hash_password(password), do: Bcrypt.hashpwsalt(password)
def validate_password(password, hash), do: Bcrypt.checkpw(password, hash)
end
Then create an integration test to verify the password is being hashed and stored in the user read model. For the test assertion we use the Auth.validate_password/2 function, shown above, which hashes the provided password, jakejake, and compares with the already hashed password saved for the user, such as $2b$04$W7A/lWysNVUqeYg8vjKCXeBniHoks4jmRziKDmACO.fvqo3wdqsea. Remember that we never store the user’s password, only a one-way hash.
@tag :integration
test "should hash password" do
assert {:ok, %User{} = user} = Accounts.register_user(build(:user))
assert Auth.validate_password("jakejake", user.hashed_password)
end
Next we include a password field in the register user command struct, to contain the user provided password in plain text. We add a hash_password/1 function that hashes the password, stores the hash value as hashed_password, and clears the original plain text password. This prevents the user’s password from being exposed by any command auditing.
defmodule Conduit.Accounts.Commands.RegisterUser do
defstruct [
user_uuid: "",
username: "",
email: "",
password: "",
hashed_password: "",
]
@doc """
Hash the password, clear the original password
"""
def hash_password(%RegisterUser{password: password} = register_user) do
%RegisterUser{register_user |
password: nil,
hashed_password: Auth.hash_password(password),
}
end
end
The final change is to include this function in the register user command dispatch chain:
defmodule Conduit.Accounts do
@doc """
Register a new user.
"""
def register_user(attrs \\ %{}) do
uuid = UUID.uuid4()
register_user =
attrs
|> RegisterUser.new()
|> RegisterUser.assign_uuid(uuid)
|> RegisterUser.downcase_username()
|> RegisterUser.downcase_email()
|> RegisterUser.hash_password()
with :ok <- Router.dispatch(register_user, consistency: :strong) do
get(User, uuid)
else
reply -> reply
end
end
end
We’ve now successfully hashed the user’s password during registration, helping to protect our users’ security should our deployed environment be compromised and database accessed. The Comeonin library will generate a different 16 character length salt for each hashed password by default. This is another layer of protection against hashed password dictionary and rainbow table attacks.
Completing user registration
With user registration done, at least from the accounts context, we return to our acceptance criteria defined in the UserControllerTest integration test. To specify the initial requirements and direct our development efforts we started out by writing end-to-end tests to ensure that the /api/users registration endpoint adheres to the requirements of the JSON API.
On successful registration the following response should be returned:
{
"user": {
"email": "jake@jake.jake",
"token": "jwt.token.here",
"username": "jake",
"bio": null,
"image": null
}
}
For now we will skip the authentication token, that will be addressed in the next chapter.
The integration test for successful user registration asserts against the JSON returned from a POST request to /api/users in the UserControllerTest module:
@tag :web
test "should create and return user when data is valid", %{conn: conn} do
conn = post conn, user_path(conn, :create), user: build(:user)
json = json_response(conn, 201)["user"]
assert json == %{
"bio" => nil,
"email" => "jake@jake.jake",
"image" => nil,
"username" => "jake",
}
end
Running the test still results in a failure, so there’s more work for us to do. We need to modify the user view and select a subset of the fields from our user projection to be returned as JSON data:
# lib/conduit_web/views/user_view.ex
defmodule ConduitWeb.UserView do
use ConduitWeb, :view
alias ConduitWeb.UserView
def render("index.json", %{users: users}) do
%{users: render_many(users, UserView, "user.json")}
end
def render("show.json", %{user: user}) do
%{user: render_one(user, UserView, "user.json")}
end
def render("user.json", %{user: user}) do
%{
username: user.username,
email: user.email,
bio: user.bio,
image: user.image,
}
end
end
As per the API spec we only return the username, email, bio, and image fields.
Next we need to handle the case where validation errors are returned during command dispatch. The request should fail with a 422 HTTP status code and the response body would be in the following format:
{
"errors": {
"username": [
"can't be empty"
]
}
}
This scenario is covered by the following test:
@tag :web
test "should not create user and render errors when data is invalid", %{conn: conn} do
conn = post conn, user_path(conn, :create), user: build(:user, username: "")
assert json_response(conn, 422)["errors"] == %{
"username" => [
"can't be empty",
]
}
end
To achieve this we will use a new feature in Phoenix 1.3, the action_fallback plug for controllers to support generic error handling. Including the plug inside a controller allows you to ignore errors, and only handle the successful case. Take a look at our existing user controller, where we only pattern match on the {:ok, user} successful outcome:
# lib/conduit_web/controllers/user_controller.ex
defmodule ConduitWeb.UserController do
use ConduitWeb, :controller
alias Conduit.Accounts
alias Conduit.Accounts.Projections.User
action_fallback ConduitWeb.FallbackController
def create(conn, %{"user" => user_params}) do
with {:ok, %User{} = user} <- Accounts.register_user(user_params) do
conn
|> put_status(:created)
|> render("show.json", user: user)
end
end
end
Any errors that aren’t handled within your controller can be dealt with by the configured fallback controller. We pattern match on the {:error, :validation_failure, errors} tagged error tuple returned when command dispatch fails due to a validation failure. The errors are rendered using a new validation view module and returned with an HTTP 422 “Unprocessable Entity” status code:
# lib/conduit_web/controllers/fallback_controller.ex
defmodule ConduitWeb.FallbackController do
use ConduitWeb, :controller
def call(conn, {:error, :validation_failure, errors}) do
conn
|> put_status(:unprocessable_entity)
|> render(ConduitWeb.ValidationView, "error.json", errors: errors)
end
def call(conn, {:error, :not_found}) do
conn
|> put_status(:not_found)
|> render(ConduitWeb.ErrorView, :"404")
end
end
The validation view returns a map containing the errors that is rendered as JSON:
# lib/conduit_web/views/validation_view.ex
defmodule ConduitWeb.ValidationView do
use ConduitWeb, :view
def render("error.json", %{errors: errors}) do
%{errors: errors}
end
end
We can run the integration tests tagged with @web after making these changes, and the good news is they all pass:
$ mix test --only web
Including tags: [:web]
Excluding tags: [:test, :pending]
...
Finished in 2.3 seconds
18 tests, 0 failures, 15 skipped
Having completed user registration, we now move on to authentication in the next chapter.