Building Conduit
Building Conduit
Ben Smith
Buy on Leanpub

Table of Contents

Preface

Welcome to Building Conduit.

In this book you will discover how to implement the Command Query Responsibility Segregation and event sourcing (CQRS/ES) pattern in an Elixir application.

This book will take you through the design and build, from scratch, of an exemplary Medium.com clone. The full source code is available to view and clone from GitHub. As each feature is developed, a link to the corresponding Git commit will be provided so you can browse the source code at each stage of development.

The application will be built as if it were a real world project. Including the specification of integration and unit tests to verify the functionality under development.

By the end of this book you should have a solid grasp of how to apply the CQRS/ES pattern to your own Elixir applications.

You will learn how to:

  • Follow test-driven development to build an HTTP API exposing and consuming JSON data.
  • Validate input data using command validation.
  • Create a functional, event sourced domain model.
  • Define a read model and populate it by projecting domain events.
  • Authenticate a user using a JSON Web Token (JWT).

Introduction

Who is Building Conduit for?

This book is written for anyone who has an interest in CQRS/ES and Elixir.

It assumes the reader will already be familiar with the broad concepts of CQRS/ES. You will be introduced to the building blocks that comprise an application built following this pattern, and shown how to implement them in Elixir.

The reader should be comfortable reading Elixir syntax and understand the basics of its actor concurrency model, implemented as processes and message passing.

What does it cover?

You will learn an approach to implementing the CQRS/ES pattern in a real world Elixir application. You will build a Medium.com clone, called Conduit, using the Phoenix web framework. Conduit is a real world blogging platform allowing users to publish articles, follow authors, and browse and read articles.

The inspiration for this example web application comes from the RealWorld project:

See how the exact same Medium.com clone (called Conduit) is built using any of our supported frontends and backends. Yes, you can mix and match them, because they all adhere to the same API spec.

While most “todo” demos provide an excellent cursory glance at a framework’s capabilities, they typically don’t convey the knowledge & perspective required to actually build real applications with it.

RealWorld solves this by allowing you to choose any frontend (React, Angular 2, & more) and any backend (Node, Django, & more) and see how they power a real world, beautifully designed fullstack app called “Conduit”.

By building a backend in Elixir and Phoenix that adheres to the RealWorld API specs, you can choose to pair it with any of the available frontends. Some of the most popular current implementations are:

You can view a live demo of Conduit that’s powered by React and Redux with a Node.js backend, to get a feel for what we’ll be building.

Many thanks to Eric Simons for pioneering the idea and founding the RealWorld project.

Before we start building Conduit, let’s briefly cover some of the concepts related to command query responsibility segregation and event sourcing.

What is CQRS?

At its simplest, CQRS is the separation of commands from queries.

  • Commands are used to mutate state in a write model.
  • Queries are used to retrieve a value from a read model.

In a typical layered architecture you have a single model to service writes and reads, whereas in a CQRS application the read and write models are different. They may also be separated physically by using a different database or storage mechanism. CQRS is often combined with event sourcing where there’s an event store for persisting domain events (write model) and at least one other data store for the read model.

CQRS overview
CQRS overview

Commands

Commands are used to instruct an application to do something, they are named in the imperative:

  • Register account
  • Transfer funds
  • Mark fraudulent activity

Commands have one, and only one, receiver: the code that fulfils the command request.

Domain events

Domain events indicate something of importance has occurred within a domain model. They are named in the past tense:

  • Account registered
  • Funds transferred
  • Fraudulent activity detected

Domain events describe your system activity over time using a rich, domain-specific language. They are an immutable source of truth for the system. Unlike commands which are restricted to a single handler, domain events may be consumed by multiple subscribers - or potentially no interested subscribers.

Often commands and events come in pairs: a successful register account command results in an account registered event. It’s also possible that a command can be successfully executed and result in many or no domain events.

Queries

Domain events from the write model are used to build and update a read model. I refer to this process as projecting events into a read model projection.

The read model is optimised for querying therefore the data is often stored denormalized to support faster querying performance. You can use whatever technology is most appropriate to support the querying your application demands, and take advantage of multiple different types of storage as appropriate:

  • Relational database.
  • In-memory store.
  • Disk-based file store.
  • NoSQL database.
  • Full-text search index.

What is event sourcing?

Any state changes within your domain are driven by domain events. Therefore your entire application’s state changes are modelled as a stream of domain events:

Bank account event stream
Bank account event stream

An aggregate’s state is built by applying its domain events to some initial empty state. State is further mutated by applying a created domain to the current state:

f(state, event) => state

Domain events are persisted in order – as a logical stream – for each aggregate. The event stream is the canonical source of truth, therefore it is a perfect audit log.

All other state in the system may be rebuilt from these events. Read models are projections of the event stream. You can rebuild the read model by replaying every event from the beginning of time.

What are the costs of using CQRS?

Domain events provide a history of your poor design decisions and they are immutable.

It’s an alternative, and less common, approach to building applications than basic CRUD1. Modelling your application using domain events demands a rich understanding of the domain. It can be more complex to deal with the eventual consistency between the write model and the read model.

Recipe for building a CQRS/ES application in Elixir

  1. A domain model containing aggregates, commands, and events.
  2. Hosting of an aggregate root instance and a way to send it commands.
  3. An event store to persist the created domain events.
  4. A Read model store for querying.
  5. Event handlers to build and update the read model.
  6. An API to query the read model data and to dispatch commands to the write model.

An aggregate

An aggregate defines a consistency boundary for transactions and concurrency. Aggregates should also be viewed from the perspective of being a “conceptual whole”. They are used to enforce invariants in a domain model and to guard against business rule violations.

This concept fits naturally within Elixir’s actor concurrency model. An Elixir GenServer enforces serialised concurrent access and processes communicate by sending messages (commands and events).

An event sourced aggregate

Must adhere to these rules:

  1. Public API functions must accept a command and return any resultant domain events, or an error.
  2. Its internal state may only be modified by applying a domain event to its current state.
  3. Its internal state can be rebuilt from an initial empty state by replaying all domain events in the order they were raised.

Here’s an example event sourced aggregate in Elixir:

defmodule ExampleAggregate do
  # Aggregate's state
  defstruct [:uuid, :name]

  # Public command API
  def create(%ExampleAggregate{}, uuid, name) do
    %CreatedEvent{
      uuid: uuid,
      name: name,
    }
  end

  # State mutator
  def apply(%ExampleAggregate{} = aggregate, %CreatedEvent{uuid: uuid, name: name}) do
    %ExampleAggregate{aggregate | uuid: uuid, name: name}
  end
end

It is preferable to implement aggregates using pure functions2. Why might this be a good rule to follow? Because a pure function is highly testable: you will focus on behaviour rather than state.

By using pure functions in your domain model, you also decouple your domain from the framework’s domain. Allowing you to build your application separately first, and layer the external interface on top. The external interface in our application will be the RESTful API powered by Phoenix.

Unit testing an aggregate

An aggregate function can be tested by executing a command and verifying the expected events are returned.

The following example demonstrates a BankAccount aggregate being tested for opening an account:

defmodule BankAccountTest do
  use ExUnit.Case, async: true

  alias BankAccount.Commands.OpenAccount
  alias BankAccount.Events.BankAccountOpened

  describe "opening an account with a valid initial balance"
    test "should be opened" do
      account = %BankAccount{}
      open_account = %OpenAccount{
        account_number: "ACC123",
        initial_balance: 100,
      }

      event = BankAccount.open_account(account, open_account)

      assert event == %BankAccountOpened{
        account_number: "ACC123",
        initial_balance: 100,
      }
    end
  end
end

Conduit

Conduit is a social blogging site: it is a Medium.com clone.

Welcome to Conduit
Welcome to Conduit

You can view a live demo at: demo.realworld.io

General functionality

As a blogging platform, Conduit’s functionality is based around authors publishing articles.

  • Authenticate users via JWT3.
  • Register, view, and update users.
  • Publish, edit, view, and delete articles.
  • Create, view, and delete comments on articles.
  • Display paginated lists of articles.
  • Favourite articles.
  • Follow other users.

API specs

Conduit uses a custom REST API for all requests, including authentication.

We will be implementing a backend that must adhere to the Conduit API specs.

HTTP verb URL Action  
POST /api/users/login Login a user  
POST /api/users Register a user  
GET /api/user Get current user  
PUT /api/user Update current user  
GET /api/profiles/:username Get profile  
POST /api/profiles/:username/follow Follow user  
DELETE /api/profiles/:username/follow Unfollow user  
GET /api/articles List articles  
GET /api/articles/feed Feed articles  
GET /api/articles/:slug Get an article  
POST /api/articles Publish an article  
PUT /api/articles/:slug Update an article  
DELETE /api/articles/:slug Remove an article  
POST /api/articles/:slug/comments Comment on an article  
GET /api/articles/:slug/comments Get comments on an article  
DELETE /api/articles/:slug/comments/:id Remove an comment  
POST /api/articles/:slug/favorite Favorite an article  
DELETE /api/articles/:slug/favorite Unfavorite an article  
GET /api/tags Get tags  

The full API specs are detailed in Appendix I.

Contexts

Phoenix 1.3 introduces the concept of contexts. These are somewhat inspired by bounded contexts in domain-driven design. They provide a way of defining clear boundaries between parts of your application. A context defines a public API that should be consumed by the rest of the application.

Bounded Context is a central pattern in Domain-Driven Design. It is the focus of DDD’s strategic design section which is all about dealing with large models and teams. DDD deals with large models by dividing them into different Bounded Contexts and being explicit about their interrelationships.

Martin Fowler

Bounded context - by Martin Fowler
Bounded context - by Martin Fowler

Contexts in Phoenix

When using the included phx.gen.* generators you must provide an additional context argument for each resource you create. As an example, when generating a JSON resource:

$ mix phx.gen.json Accounts User users name:string age:integer

The first argument is the context module followed by the schema module and its plural name (used as the schema table name). The context is an Elixir module that serves as an API boundary for the given resource. A context often holds many related resources. It is a module, with some implementation modules behind it, that exposes a public interface the rest of your application can consume.

This is powerful, because now you can define clear boundaries for your application domains. You’re now implementing your application, containing your business logic, separate from the Phoenix web interface. The web interface is merely one of the possible consumers of the API exposed by your application using a context module.4

The official Phoenix documentation has a guide detailing Contexts in further detail.5

Contexts in Conduit

Given the features that we plan to implement in Conduit specified in the previous chapter, we can define the following contexts to provide a boundary for each part of our application.

Accounts Register users, find user by username.
Auth Authenticate users.
Blog Publish articles, browse a paginated list of articles, comment on articles, favourite articles.

Why would we separate the Conduit functionality into three contexts? To keep the responsibilities cohesive within each context. For example the responsibility of our Accounts context is to manage users and their credentials, not handle publishing articles. Therefore we have a blog context, separate from accounts, to publish and list articles.

Contexts have their own folder within lib/conduit which immediately shows at a high level what the Conduit app does and allows easy navigation to help locate modules and files related to a specific area of functionality.

  • lib/conduit/accounts
  • lib/conduit/auth
  • lib/conduit/blog

With this separation of concerns enforced at the directory structure, when I need to add a feature related to blogging, or fix a bug for articles, I can immediately focus on the lib/conduit/blog folder and its contained modules. This reduces the cognitive load when working with the code.

These contexts would provide public API functions such as:

Accounts
  • Accounts.register_user/1
Auth
  • Auth.authenticate/2
Blog
  • Blog.publish_article/1
  • Blog.list_articles/1

Getting started

Let’s start building our Conduit web application. We’ll be using the latest version of the Phoenix Web framework, currently 1.3.

Before proceeding you will need to have Elixir v1.5 or later installed. Please follow the official installation guide to get Elixir running on your operating system. There are instructions for Windows, Mac OS X, Linux, Docker, and Raspberry Pi.

Installing Phoenix

Install the latest version of Phoenix using the mix command:

$ mix archive.install https://github.com/phoenixframework/archives/raw/master/phx_new.ez

Generating a Phoenix project

Once installed, you use the mix phx.new command to create a new Phoenix 1.3 project:

$ mix phx.new conduit --module Conduit --app conduit --no-brunch --no-html

A project in the conduit directory will be created.

The application name and module name have been specified using the --app and --module flags respectively. The Conduit frontend will be provided by one of the existing frameworks so we omit Phoenix’s HTML and static asset support, provided by Brunch, by appending --no-brunch --no-html to the generator command.

We will temporarily comment out the Ecto repo, Conduit.Repo, in the main Conduit application, to allow the server to be started without a database. We can also take the opportunity to remove Phoenix’s pub/sub dependency and channels/sockets as they will not be used for our RESTful API.

Starting the Phoenix server

Fetch mix dependencies and compile:

$ mix do deps.get, compile

Run the Phoenix server:

$ mix phx.server

Visit http://localhost:4000/ in a browser to check the server is running. An error is shown as no routes have been defined yet.

[info] GET /
[debug] ** (Phoenix.Router.NoRouteError) no route found for GET / (Conduit.Web.Router)
    (conduit) lib/conduit_web/router.ex:1: Conduit.Web.Router.__match_route__/4
    (conduit) lib/phoenix/router.ex:303: Conduit.Web.Router.call/2
    (conduit) lib/conduit_web/endpoint.ex:1: Conduit.Web.Endpoint.plug_builder_call/2
    (conduit) lib/plug/debugger.ex:123: Conduit.Web.Endpoint."call (overridable 3)"/2
    (conduit) lib/conduit_web/endpoint.ex:1: Conduit.Web.Endpoint.call/2
    (plug) lib/plug/adapters/cowboy/handler.ex:15: Plug.Adapters.Cowboy.Handler.upgrade/4
    (cowboy) /Users/ben/src/conduit/deps/cowboy/src/cowboy_protocol.erl:442: :cowboy_protocol.execute/4

Commanded facilitates CQRS/ES in Elixir

We will use Commanded6 to build our Elixir application following the CQRS/ES pattern. Commanded is an open source library that contains the building blocks required to implement CQRS/ES in Elixir.

It provides support for:

  • Command registration and dispatch.
  • Hosting and delegation to aggregates.
  • Event handling.
  • Long running process managers.

You can use Commanded with one of the following event stores for persistence:

Your choice of event store has no affect on how you build your application.

For Conduit we will use the PostgreSQL based Elixir EventStore as we will also be using PostgreSQL for our read model store and the Ecto database query library.

Write and read model stores

Applications applying the CQRS pattern have a logical separation between the write and read models. You can choose to make these physically separated by using an alternative database, schema, or storage mechanism for each.

In Conduit, we will be using event sourcing to persist domain events created by our write model. These events are the canonical source of truth for our application, they are used by both the aggregates to rebuild their state and are projected into the read model store for querying. Since the read model is a projection built from all domain events in the event store, we can rebuild it from scratch at any time. To rebuild a read store, the database is recreated and then populated by projecting all of the domain events.

We will use two databases for Conduit: one for the event store; another for the read model.

The database naming convention is to suffix the storage type (event store or read store) and environment name to the application name (conduit):

Environment Event store database Read store database
dev conduit_eventstore_dev conduit_readstore_dev
test conduit_eventstore_test conduit_readstore_test
prod conduit_eventstore_prod conduit_readstore_prod

Installing and configuring Commanded

We’ll be using the following open source libraries, published to Hex, to help build Conduit:

  • commanded - used to build Elixir applications following the CQRS/ES pattern.
  • eventstore - an Elixir event store using PostgreSQL as the underlying storage engine.
  • commanded_eventstore_adapter - adapter to use EventStore with Commanded.

The Commanded README details the steps required to install and configure the library.

  1. Add commanded and commanded_eventstore_adapter to the list of dependencies in mix.exs:
     defp deps do
       [
         {:commanded, "~> 0.15"},
         {:commanded_eventstore_adapter, "~> 0.3"},
         # ...
       ]
     end
    
  2. Include :eventstore in the list of extra applications in mix.exs:
     def application do
       [
         extra_applications: [
           :logger,
           :eventstore,
         ],
         # ...
       ]
     end
    
  3. Configure Commanded to use the EventStore adapter in the mix config file (config/config.exs):
     config :commanded,
       event_store_adapter: Commanded.EventStore.Adapters.EventStore
    
  4. Configure the event store database in each environment’s mix config file (e.g. config/dev.exs):
     # Configure the event store database
     config :eventstore, EventStore.Storage,
       serializer: Commanded.Serialization.JsonSerializer,
       username: "postgres",
       password: "postgres",
       database: "conduit_eventstore_dev",
       hostname: "localhost",
       pool_size: 10
    
  5. Fetch and compile the dependencies:
     $ mix do deps.get, deps.compile
    
  6. Create the event store database and tables using the mix task:
     $ mix do event_store.create, event_store.init
    

Configuring the read model store

Ecto will be used for building and querying the read model. The Phoenix generator will have already included the phoenix_ecto dependency, which includes ecto, and generated config settings for each environment.

  1. Configure the Conduit.Repo Ecto repository database in each environment’s mix config file (e.g. config/dev.exs):
     # Configure the read store database
     config :conduit, Conduit.Repo,
       adapter: Ecto.Adapters.Postgres,
       username: "postgres",
       password: "postgres",
       database: "conduit_readstore_dev",
       hostname: "localhost",
       pool_size: 10
    
  2. Create the read store database:
     $ mix ecto.create
    

There are now two databases in our development environment:

  1. Read model store, conduit_readstore_dev, containing only the Ecto schema migrations table.
  2. An event store, conduit_eventstore_dev, containing the default event store tables.

Accounts

The Conduit blogging platform requires authors to register an account before they can publish articles. We shall begin our first feature by implementing user registration in the accounts context.

Register a user

The API spec for registration is as follows:

HTTP verb URL Required fields
POST /api/users email, username, password

Example request body:

{
  "user":{
    "username": "jake",
    "email": "jake@jake.jake",
    "password": "jakejake"
  }
}

Example response body:

{
  "user": {
    "email": "jake@jake.jake",
    "token": "jwt.token.here",
    "username": "jake",
    "bio": null,
    "image": null
  }
}

The request should fail with a 422 HTTP status code error should any of the required fields be invalid. In this case the response body would be in the following format:

{
  "errors": {
    "body": [
      "can't be empty"
    ]
  }
}

We must ensure that usernames are unique and cannot be registered more than once.

Building our first context

Phoenix includes a set of generators to help scaffold your application:

Command Action
mix phx.gen.channel Generates a Phoenix channel
mix phx.gen.context Generates a context with functions around an Ecto schema
mix phx.gen.embedded Generates an embedded Ecto schema file
mix phx.gen.html Generates controller, views, and context for an HTML resource
mix phx.gen.json Generates controller, views, and context for a JSON resource
mix phx.gen.presence Generates a Presence tracker
mix phx.gen.schema Generates an Ecto schema and migration file
mix phx.gen.secret Generates a secret

Since we’re building a REST API, we can use the phx.gen.json generator to create our first context, resource, controller, and JSON view. As we already know the fields relating to our users we can include them, with their type, in the generator command:

$ mix phx.gen.json Accounts User users username:string email:string hashed_password:string bio:string imag\
e:string --table accounts_users

Overall, this generator will add the following files to lib/conduit:

  • Context module in lib/conduit/accounts/accounts.ex, serving as the API boundary.
  • Ecto schema in lib/conduit/accounts/user.ex, and a database migration to create the accounts_users table.
  • View in lib/conduit_web/views/user_view.ex.
  • Controller in lib/conduit_web/controllers/user_controller.ex.
  • Unit and integration tests in test/conduit/accounts.

Remember that the User module we’re creating here is not our domain model. It will be a read model projection, populated by domain events published from an aggregate.

The generator prompts us to add the resource to the :api scope in our Phoenix router module. For now, we will configure only the :create controller action to support registering a user:

# lib/conduit_web/router.ex
defmodule ConduitWeb.Router do
  use ConduitWeb, :router

  pipeline :api do
    plug :accepts, ["json"]
  end

  scope "/api", ConduitWeb do
    pipe_through :api

    post "/users", UserController, :create
  end
end

Writing our first integration test

Let’s follow Behaviour Driven Development (BDD), thinking “from the outside in”, and start by writing a failing integration test for user registration. We will include tests that cover the happy path of successfully creating a user, and for the two failure scenarios mentioned above: missing required fields and duplicate username registration.

Factories to construct test data

We will use factory functions to generate realistic data for our tests. ExMachina is an Elixir library that makes it easy to create test data and associations.

In mix.exs, add :ex_machina as a test environment dependency:

defp deps do
  [
    {:ex_machina, "~> 2.0", only: :test},
    # ...
  ]
end

Fetch mix dependencies and compile:

$ mix do deps.get, deps.compile

We must ensure the ExMachina application is started in the test helper, test/test_helper.exs, before ExUnit:

{:ok, _} = Application.ensure_all_started(:ex_machina)

Now we create our factory module in test/support/factory.ex:

# test/support/factory.ex
defmodule Conduit.Factory do
  use ExMachina

  def user_factory do
    %{
      email: "jake@jake.jake",
      username: "jake",
      hashed_password: "jakejake",
      bio: "I like to skateboard",
      image: "https://i.stack.imgur.com/xHWG8.jpg",
    }
  end
end

In our test module, we must import Conduit.Factory to access our user factory. Then we have access to build/1 and build/2 functions to construct params for an example user to register:

  • build(:user)
  • build(:user, username: "ben")
User registration integration test

In our integration test we want to verify successful user registration and check any failure includes a useful error message to help the user identify the problem.

  1. To test success, we assert that the returned HTTP status code is 201 and JSON response matches the API:
     test "should create and return user when data is valid", %{conn: conn} do
       conn = post conn, user_path(conn, :create), user: build(:user)
       json = json_response(conn, 201)["user"]
    
       assert json == build(:user, bio: nil, image: nil)
     end
    
  2. To test a validation failure, we assert the response is 422 and errors are included:
     test "should not create user and render errors when data is invalid", %{conn: conn} do
       conn = post conn, user_path(conn, :create), user: build(:user, username: nil)
       assert json_response(conn, 422)["errors"] == [
         username: [
           "can't be empty",
         ]
       ]
     end
    

The full integration test is given below:

# test/conduit_web/controllers/user_controller_test.exs
defmodule ConduitWeb.UserControllerTest do
  use ConduitWeb.ConnCase

  import Conduit.Factory

  alias Conduit.Accounts

  def fixture(:user, attrs \\ []) do
    build(:user, attrs) |> Accounts.create_user()
  end

  setup %{conn: conn} do
    {:ok, conn: put_req_header(conn, "accept", "application/json")}
  end

  describe "register user" do
    @tag :web
    test "should create and return user when data is valid", %{conn: conn} do
      conn = post conn, user_path(conn, :create), user: build(:user)
      json = json_response(conn, 201)["user"]

      assert json == %{
        "bio" => nil,
        "email" => "jake@jake.jake",
        "image" => nil,
        "username" => "jake",
      }
    end

    @tag :web
    test "should not create user and render errors when data is invalid", %{conn: conn} do
      conn = post conn, user_path(conn, :create), user: build(:user, username: "")
      assert json_response(conn, 422)["errors"] == %{
        "username" => [
          "can't be empty",
        ]
      }
    end

    @tag :web
    test "should not create user and render errors when username has been taken", %{conn: conn} do
      # register a user
      {:ok, _user} = fixture(:user)

      # attempt to register the same username
      conn = post conn, user_path(conn, :create), user: build(:user, email: "jake2@jake.jake")
      assert json_response(conn, 422)["errors"] == %{
        "username" => [
          "has already been taken",
        ]
      }
    end
  end
end

Before running these tests, we must create the event store and read store databases for the test environment:

$ MIX_ENV=test mix do event_store.create, event_store.init
$ MIX_ENV=test mix ecto.create

Then we can execute our registration integration test using the mix test command:

$ mix test test/conduit_web/controllers/user_controller_test.exs

The execution result of running these tests will be:

Finished in 0.2 seconds
3 tests, 3 failures

Great, we have three failing tests. We now have the acceptance criteria of user registration codified in our tests. When these tests pass, the feature will be done. These tests will also help to prevent regressions caused by any changes we, or anyone else, may make in the future.

Let’s move forward by building the domain model to support user registration.

Application structure

The default directory structure used by the Phoenix generator creates a folder per context, and places them inside a folder named after the application within the lib directory.

We currently have our accounts context, along with the Phoenix web folder:

  • lib/conduit/accounts
  • lib/conduit_web

One benefit of this approach is that when a context becomes too large, it can be extracted into its own project. Using an umbrella project allows these separate Elixir applications to be used together, via internal mix references.

When our application grows too large to be comfortably hosted by a single, monolithic service we can migrate the individual apps into their own microservices. This is why it’s important to focus on separation of concerns, using contexts, from the outset. It allows us to split the application apart as production usage and requirements dictate, and more importantly it supports rewriting and deletion of code. Keeping each context highly cohesive, but loosely coupled, provides these benefits.

Within each context in our CQRS application we will create modules for the common building blocks: aggregates, commands, events, read model projections and projectors, and queries. I prefer to create a separate folder for each of these. It provides further segregation within a single context and allows you to easily locate any module by its type and name.

The folder structure for our first accounts context will be:

  • lib/conduit/accounts/aggregates
  • lib/conduit/accounts/commands
  • lib/conduit/accounts/events
  • lib/conduit/accounts/projections
  • lib/conduit/accounts/projectors
  • lib/conduit/accounts/queries
  • lib/conduit/accounts/validators

Inside the lib/conduit/accounts folder we will place the context module and a supervisor module:

  • lib/conduit/accounts/accounts.ex
  • lib/conduit/accounts/supervisor.ex

These are the public facing parts of the account context, and provide the API into its available behaviour. The supervisor is responsible for supervising the workers contained within, such as the event handlers and projectors.

Alternate structure

Instead of grouping modules by their type within a context, you may chose to group by their aggregate functionality. You can follow the convention used by Phoenix where filename and module names are suffixed by their type (e.g. user_aggregate.ex).

In this example I’ve illustrated the file structure for modules related to the User aggregate in the Accounts context, including the commands, events, projection, validators, and queries:

  • lib/conduit/accounts/user/user_aggregate.ex
  • lib/conduit/accounts/user/register_user.ex
  • lib/conduit/accounts/user/user_registered.ex
  • lib/conduit/accounts/user/user_projection.ex
  • lib/conduit/accounts/user/user_projector.ex
  • lib/conduit/accounts/user/user_by_email_query.ex
  • lib/conduit/accounts/user/unique_username_validator.ex

You can use either approach, or another application structure entirely, but it’s important that you choose a convention and adhere to it within your application.

Building our first aggregate

As we’re dealing with registering users, our first aggregate will be the user.

One decision we must take when designing an aggregate is how to identify an instance. The simplest approach is to use a UUID7. This may be generated by the client or the server, and is used to guarantee a unique identity per aggregate instance. All commands must include this identity to allow locating the target aggregate instance.

For Conduit users, we have a restriction that their username must be unique. So we could use the username to identify the aggregate and enforce this business rule. Domain events persisted for each user would be appended to a stream based upon their individual username. Populating the user aggregate would retrieve their events from the stream based on their username. Attempting to register an already taken username would fail since the aggregate exists and its state will be non-empty. However, one downside to this approach is that it would prevent us from allowing a user to amend their username at some point in the future. Remember that domain events are immutable once appended to the event store, so you cannot amend them, or move them to another stream. Instead you would need to create a new aggregate instance, using the new username, initialise its state from the existing aggregate, and mark the old aggregate instance as obsoleted.

We’ll use an assigned unique identifier for each user. The uuid package provides a UUID generator and utilities for Elixir. With this library we can assign a unique identity to a user using UUID.uuid4().

We’ll add the UUID package to our mix dependencies:

defp deps do
  [
    {:uuid, "~> 1.1"},
    # ...
  ]
end

To enforce the username uniqueness, we will validate the command before execution to ensure the username has not already been taken.

The User aggregate module, created in lib/conduit/accounts/aggregates/user.ex, defines the relevant fields as a struct, and exposes two public functions:

  1. execute/2 that accepts the empty user struct, %User{}, and the register user command, %RegisterUser{}, returning the user registered domain event.
  2. apply/2 that takes the user struct and the resultant user registered event %UserRegistered{} and mutates the aggregate state.
# lib/conduit/accounts/aggregates/user.ex
defmodule Conduit.Accounts.Aggregates.User do
  defstruct [
    :uuid,
    :username,
    :email,
    :hashed_password,
  ]

  alias Conduit.Accounts.Aggregates.User

  @doc """
  Register a new user
  """
  def execute(%User{uuid: nil}, %RegisterUser{} = register) do
    %UserRegistered{
      user_uuid: register.user_uuid,
      username: register.username,
      email: register.email,
      hashed_password: register.hashed_password,
    }
  end

  # state mutators

  def apply(%User{} = user, %UserRegistered{} = registered) do
    %User{user |
      uuid: registered.user_uuid,
      username: registered.username,
      email: registered.email,
      hashed_password: registered.hashed_password,
    }
  end
end

This approach to building aggregates will be followed for all new commands and events. The execute/2 function takes the command and returns zero, one, or more domain events. While the apply/2 function mutates the aggregate state by applying a single event.

The execute/2 function to register the user uses pattern matching to ensure the uuid field is nil. This ensures a user aggregate for a given identity can only be created once.

Why did I name the command function execute/2 and not register_user/2? This is to allow commands to be dispatched directly to the aggregate, without needing an intermediate command handler. This means less code to write. You can choose to have descriptive command function, but you must also write a command handler module to route each command to the function on the aggregate module. It’s also possible to have the command handler module implement the domain logic by returning any domain events itself, if you prefer.

Building our first command

We need to create a command to register a user. A command is a standard Elixir module using the defstruct keyword to define its fields. A struct is a map with an extra field indicating its type and allows developers to provide default values for keys.

defmodule Conduit.Accounts.Commands.RegisterUser do
  defstruct [
    :user_uuid,
    :username,
    :email,
    :password,
    :hashed_password,
  ]
end

The register user command can be constructed using familiar Elixir syntax:

alias Conduit.Accounts.Commands.RegisterUser

%RegisterUser{
  user_uuid: UUID.uuid4(),
  username: "jake",
  email: "jake@jake.jake",
  password: "jakejake",
}

When building a struct, Elixir will automatically guarantee all keys belongs to the struct. This helps prevent accidental typos:

iex(1)> %RegisterUser{         
...(1)>   user: "jake",
...(1)>   email: "jake@jake.jake",
...(1)>   password: "jakejake",
...(1)> }
** (KeyError) key :user not found in: %RegisterUser{email: "jake@jake.jake", hashed_password: nil, passwor\
d: "jakejake", username: nil, user_uuid: nil}
Constructing commands from external data

Commands will usually be populated from external data. In Conduit, this will be JSON data sent to our Phoenix web server. Phoenix will parse JSON data into an Elixir map with key based strings. We therefore need a way to construct commands from these key/value maps.

ExConstructor is an Elixir library that makes it easy to instantiate structs from external data, such as that emitted by a JSON parser.

Add use ExConstructor after a defstruct statement to inject a constructor function into the module.

ExConstructor

We’ll add this library to our mix dependencies:

defp deps do
  [
    {:exconstructor, "~> 1.1"},
    # ...
  ]
end

Then we add use ExConstructor to our command:

# lib/conduit/accounts/commands/register_user.ex
defmodule Conduit.Accounts.Commands.RegisterUser do
  defstruct [
    :user_uuid,
    :username,
    :email,
    :password,
    :hashed_password,
  ]

  use ExConstructor
end

This allows us to create the command struct from a plain map, such as that provided by the params argument in a Phoenix controller function using the new/1 function:

iex(1)> alias Conduit.Accounts.Commands.RegisterUser
iex(2)> RegisterUser.new(%{"username" => "jake", "email" => "jake@jake.jake", "password" => "jakejake"})
%Conduit.Accounts.Commands.RegisterUser{email: "jake@jake.jake",
 hashed_password: nil, password: "jakejake", user_uuid: nil, username: "jake"}

Building our first domain event

Domain events must be named in the past tense, so for user registration an appropriate event name is UserRegistered. Again we’ll use plain Elixir modules and structs to define our domain event:

# lib/conduit/accounts/events/user_registered.ex
defmodule Conduit.Accounts.Events.UserRegistered do
  @derive [Poison.Encoder]
  defstruct [
    :user_uuid,
    :username,
    :email,
    :hashed_password,
  ]
end

Note we derive the Poison.Encoder protocol in the domain event module. This is because Commanded uses the poison pure Elixir JSON library to serialize events in the database. For maximum performance, you should @derive [Poison.Encoder] for any struct used for encoding.

Writing our first unit test

With our User aggregate, register user command, and user registered event modules defined we can author the first unit test. We’ll use the test to verify the expected domain event is returned when executing the command, and that the fields are being correctly populated.

ExUnit provides an ExUnit.CaseTemplate module that allows a developer to define a test case template to be used throughout their tests. This is useful when there are a set of functions that should be shared between tests, or a shared set of setup callbacks.

We can create a case template for aggregate unit tests that provides a reusable way to execute commands against an aggregate and verify the resultant domain events. In the following Conduit.AggregateCase case template an Elixir macro is used to allow each unit test to specify the aggregate module acting as the test subject:

# test/support/aggregate_case.ex
defmodule Conduit.AggregateCase do
  @moduledoc """
  This module defines the test case to be used by aggregate tests.
  """

  use ExUnit.CaseTemplate

  using [aggregate: aggregate] do
    quote bind_quoted: [aggregate: aggregate] do
      @aggregate_module aggregate

      import Conduit.Factory

      # assert that the expected events are returned when the given commands
      # have been executed
      defp assert_events(commands, expected_events) do
        assert execute(List.wrap(commands)) == expected_events
      end

      # execute one or more commands against the aggregate
      defp execute(commands) do
        {_, events} = Enum.reduce(commands, {%@aggregate_module{}, []}, fn (command, {aggregate, _}) ->
          events = @aggregate_module.execute(aggregate, command)

          {evolve(aggregate, events), events}
        end)

        List.wrap(events)
      end

      # apply the given events to the aggregate state
      defp evolve(aggregate, events) do
        events
        |> List.wrap()
        |> Enum.reduce(aggregate, &@aggregate_module.apply(&2, &1))
      end
    end
  end
end

The unit test asserts that the register user command returns a user registered event:

# test/conduit/accounts/aggregates/user_test.exs
defmodule Conduit.Accounts.Aggregates.UserTest do
  use Conduit.AggregateCase, aggregate: Conduit.Accounts.Aggregates.User

  alias Conduit.Accounts.Events.UserRegistered

  describe "register user" do
    @tag :unit
    test "should succeed when valid" do
      user_uuid = UUID.uuid4()

      assert_events build(:register_user, user_uuid: user_uuid), [
        %UserRegistered{
          user_uuid: user_uuid,
          email: "jake@jake.jake",
          username: "jake",
          hashed_password: "jakejake",
        }
      ]
    end
  end
end

To facilitate test-driven development I use the mix test.watch command provided by the mix_test_watch package. It will automatically run tests whenever files change.

In mix.exs, add the package as a dev environment dependency:

defp deps do
  [
    {:mix_test_watch, "~> 0.5", only: :dev, runtime: false},
    # ...
  ]
end

Fetch and compile the mix dependencies:

$ mix do deps.get, deps.compile

We can now execute our tagged unit test as a one off test run:

$ mix test --only unit

… or automatically whenever a file is saved:

$ mix test.watch --only unit

Command dispatch and routing

We’ve implemented registration for the user aggregate. Now we need to expose this behaviour through the public API, the Conduit.Accounts context module. We will create a register_user/1 function that takes an Elixir map containing the user attributes, then construct and dispatch a register user command.

To dispatch a command to its intended aggregate we must create a router module that implements the Commanded.Commands.Router behaviour:

# lib/conduit/router.ex
defmodule Conduit.Router do
  use Commanded.Commands.Router

  alias Conduit.Accounts.Aggregates.User
  alias Conduit.Accounts.Commands.RegisterUser

  dispatch [RegisterUser], to: User, identity: :user_uuid
end

Once configured, the router allows us to dispatch a command:

alias Conduit.Router
alias Conduit.Accounts.Commands.RegisterUser

register_user = %RegisterUser{
  user_uuid: UUID.uuid4(),
  email: "jake@jake.jake",
  username: "jake",
  hashed_password: "hashedpw",
}

:ok = Router.dispatch(register_user)

We can pattern match on the return value to ensure that the command succeeded, or handle any failures.

The register_user/1 function in the accounts context assigns a unique identity to the user, constructs the register user command, and dispatches it to the configured aggregate:

# lib/conduit/accounts/accounts.ex
defmodule Conduit.Accounts do
  @moduledoc """
  The boundary for the Accounts system.
  """

  alias Conduit.Accounts.Commands.RegisterUser
  alias Conduit.Router

  @doc """
  Register a new user.
  """
  def register_user(attrs \\ %{}) do
    attrs
    |> assign_uuid(:user_uuid)
    |> RegisterUser.new()
    |> Router.dispatch()
  end

  # generate a unique identity
  defp assign_uuid(attrs, identity), do: Map.put(attrs, identity, UUID.uuid4())
end

To verify the expected behaviour of the register user function we turn to the accounts test in test/conduit/accounts/accounts_test.exs:

defmodule Conduit.AccountsTest do
  use Conduit.DataCase

  alias Conduit.Accounts
  alias Conduit.Accounts.Projections.User

  describe "register user" do
    @tag :integration
    test "should succeed with valid data" do
      assert {:ok, %User{} = user} = Accounts.register_user(build(:user))

      assert user.bio == "some bio"
      assert user.email == "some email"
      assert user.hashed_password == "some hashed_password"
      assert user.image == "some image"
      assert user.username == "some username"
    end
  end
end

Again the test is tagged, using @tag integration, to indicate it is an integration test and will likely be slower due to accessing the database. Running the test results in a failure:

$ mix test --only integration
Including tags: [:integration]
Excluding tags: [:test]

1) test register user should succeed with valid data (Conduit.AccountsTest)
   test/conduit/accounts/accounts_test.exs:9
   match (=) failed
   code:  assert {:ok, %User{} = user} = Accounts.register_user(build(:user))
   right: :ok
   stacktrace:
     test/conduit/accounts/accounts_test.exs:10: (test)

Finished in 0.2 seconds
8 tests, 1 failure, 7 skipped

A failing test is helpful feedback: it guides us as to what we need to build next. In this case, we need to populate our read model and return the newly registered user.

Writing our first read model projection

A projection is a read-only view of some application state, built up from the published domain events.

We’ll be using Ecto to persist our read model to a PostgreSQL database. A projection is built by a projector module defined as an event handler: it receives each published domain event and updates the read model. So a projection is read model state that is projected from domain events by a projector.

In Commanded, an event handler is an Elixir module that implements the Commanded.Event.Handler behaviour. Each event handler is given a unique name and should be included in the application supervision tree by being registered as a child of a supervisor. An event handler must implement the handle/2 callback function which receives the domain event and its associated metadata. The function must return :ok to indicate successful processing of the event. You can use pattern matching to define multiple handle/2 functions, one per domain event you want to process.

Here’s an example event handler using the Commanded.Event.Handler macro:

defmodule ExampleEventHandler do
  use Commanded.Event.Handler, name: "ExampleEventHandler"

  def handle(%AnEvent{..}, _metadata) do
    # Process domain event and return `:ok` on success
    :ok
  end
end
Commanded Ecto projections

The commanded_ecto_projections Elixir library helps you to build read model projections using the Ecto database library. Commanded Ecto projections provides a macro for defining a read model projection inside a module which are is defined as a Commanded event handler. The project macro provides a convenient DSL8 for defining projections. It uses pattern matching to specify the domain event being projected, and provides access to an Ecto.Multi data structure for grouping multiple Repo operations. Ecto.Multi is used to insert, update, and delete data, and these will be executed within a single database transaction.

The Phoenix generator has already included the Ecto package as a dependency and created an Ecto repo for us, Conduit.Repo in lib/repo.ex. We configured the database connection details for the repo in the configuring the read model store section of the getting started chapter.

We’ll add the Commanded Ecto projections package to our dependencies in mix.exs:

defp deps do
  [
    {:commanded_ecto_projections, "~> 0.6"},
    # ...
  ]
end

Fetch mix dependencies and compile:

$ mix do deps.get, deps.compile

We need to configure the commanded_ecto_projections library with the Ecto repo used by our application in config/config.exs:

config :commanded_ecto_projections, repo: Conduit.Repo

Then we generate an Ecto migration to create a projection_versions table:

$ mix ecto.gen.migration create_projection_versions

This table is used to track which events each projector has seen, to ignore events already seen should they be resent as the event store guarantees at-least-once delivery of events. It’s possible an event may be handled more than once if the event store doesn’t receive the acknowledgement of successful processing.

We need to modify the generated migration, in priv/repo/migrations, to create the projection_versions table as detailed in the Commanded projections project README:

# priv/repo/migrations/20170610130729_create_projection_versions.exs
defmodule Conduit.Repo.Migrations.CreateProjectionVersions do
  use Ecto.Migration

  def change do
    create table(:projection_versions, primary_key: false) do
      add :projection_name, :text, primary_key: true
      add :last_seen_event_number, :bigint

      timestamps()
    end
  end
end
Creating a user projection

Now we need to create our User schema module, a database migration to create the accounts_users table, and a corresponding projector module.

When we ran the Phoenix resource generator to create the accounts context, we also asked it to create a user schema and specified its fields. It generated a database migration for us in priv/repo/migrations. By default Phoenix schemas use auto-incrementing integer fields as the table primary key. As we’re using UUIDs to identify our user aggregate we need to amend the schema and migration to use the uuid data type.

We’ll add two unique indexes to the accounts_users table, on username and email, to support efficient querying on those fields.

# priv/repo/migrations/20170607124956_create_accounts_user.exs
defmodule Conduit.Repo.Migrations.CreateConduit.Accounts.User do
  use Ecto.Migration

  def change do
    create table(:accounts_users, primary_key: false) do
      add :uuid, :uuid, primary_key: true
      add :username, :string
      add :email, :string
      add :hashed_password, :string
      add :bio, :string
      add :image, :string

      timestamps()
    end

    create unique_index(:accounts_users, [:username])
    create unique_index(:accounts_users, [:email])
  end
end

The user projection schema is modified to use Ecto’s binary_id as the primary key data type:

# lib/conduit/accounts/projections/user.ex
defmodule Conduit.Accounts.Projections.User do
  use Ecto.Schema

  @primary_key {:uuid, :binary_id, autogenerate: false}

  schema "accounts_users" do
    field :username, :string, unique: true
    field :email, :string, unique: true
    field :hashed_password, :string
    field :bio, :string
    field :image, :string

    timestamps()
  end
end

Then we migrate our database:

$ mix ecto.migrate

Finally, we create a projector module to insert a user projection each time a user is registered. This uses the project macro, provided by the Commanded Ecto projections library, to match each user registered domain event and insert a new User projection into the database.

# lib/conduit/accounts/projectors/user.ex
defmodule Conduit.Accounts.Projectors.User do
  use Commanded.Projections.Ecto, name: "Accounts.Projectors.User"

  alias Conduit.Accounts.Events.UserRegistered
  alias Conduit.Accounts.Projections.User

  project %UserRegistered{} = registered do
    Ecto.Multi.insert(multi, :user, %User{
      uuid: registered.user_uuid,
      username: registered.username,
      email: registered.email,
      hashed_password: registered.hashed_password,
      bio: nil,
      image: nil,
    })
  end
end

The project macro exposes an Ecto.Multi struct, as multi, that we can use to chain together many database operations. They are executed within a single database transaction to ensure all changes succeed, or fail, together.

Include projector in supervision tree

To start and register the projector module as an event handler we need to include it within our application’s supervision tree. We will create a supervisor module per context responsible for handling its processes. The following supervisor, created in lib/conduit/accounts/supervisor.ex, will start the user projector:

# lib/conduit/accounts/supervisor.ex
defmodule Conduit.Accounts.Supervisor do
  use Supervisor

  alias Conduit.Accounts
  
  def start_link do
    Supervisor.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_arg) do
    Supervisor.init([
      Accounts.Projectors.User
    ], strategy: :one_for_one)
  end
end

In lib/application.ex, we add the Conduit.Accounts.Supervisor supervisor module to the top level application supervisor:

# lib/conduit/application.ex
defmodule Conduit.Application do
  use Application

  def start(_type, _args) do
    import Supervisor.Spec

    children = [
      # Start the Ecto repository
      supervisor(Conduit.Repo, []),

      # Start the endpoint when the application starts
      supervisor(ConduitWeb.Endpoint, []),

      # Accounts supervisor
      supervisor(Conduit.Accounts.Supervisor, []),
    ]

    opts = [strategy: :one_for_one, name: Conduit.Supervisor]
    Supervisor.start_link(children, opts)
  end

  # Tell Phoenix to update the endpoint configuration
  # whenever the application is updated.
  def config_change(changed, _new, removed) do
    ConduitWeb.Endpoint.config_change(changed, removed)
    :ok
  end
end
Reset storage between test execution

To ensure test independence we must clear the event store and read store test databases between each test execution. We already have a Conduit.DataCase module, generated by Phoenix, to use for integration tests accessing the database. This can be extended to clear both databases; so we add reset_eventstore/0 and reset_readstore/0 functions to do just that.

For the event store, we take advantage of the EventStore.Storage.Initializer.reset!/1 function to reset the database structure, removing any events, streams, and clearing all subscriptions.

The read model database is manually reset by executing a truncate table SQL statement specifying each projection table to clear. We must remember to add any additional tables to this statement as we build our application to also reset them during test execution.

defmodule Conduit.DataCase do
  use ExUnit.CaseTemplate

  using do
    quote do
      alias Conduit.Repo

      import Ecto
      import Ecto.Changeset
      import Ecto.Query
      import Conduit.Factory
      import Conduit.DataCase
    end
  end

  setup _tags do
    Application.stop(:conduit)
    Application.stop(:commanded)
    Application.stop(:eventstore)

    reset_eventstore()
    reset_readstore()

    Application.ensure_all_started(:conduit)

    :ok
  end

  defp reset_eventstore do
    {:ok, conn} =
      EventStore.configuration()
      |> EventStore.Config.parse()
      |> Postgrex.start_link()

    EventStore.Storage.Initializer.reset!(conn)
  end

  defp reset_readstore do
    readstore_config = Application.get_env(:conduit, Conduit.Repo)

    {:ok, conn} = Postgrex.start_link(readstore_config)

    Postgrex.query!(conn, truncate_readstore_tables(), [])
  end

  defp truncate_readstore_tables do
"""
TRUNCATE TABLE
  accounts_users,
  projection_versions
RESTART IDENTITY;
"""
  end
end
Returning to the accounts integration test

We have now done almost enough to make our register user test in the accounts context pass. The remaining change is to return the User projection from the register_user/1 function.

In this scenario, we could attempt to return a %User{} struct populated from the parameters passed to the register_user/1 function. The concern with this approach is the additional duplicate code we must write, and the potential for it getting out of sync during future changes. Instead we’ll take advantage of Commanded’s support for strongly consistent command dispatch.

The Commanded consistency model is opt-in, with the default consistency being :eventual. We need to define our user projector as strongly consistent:

defmodule Conduit.Accounts.Projectors.User do
  use Commanded.Projections.Ecto,
    name: "Accounts.Projectors.User",
    consistency: :strong

  # .. projection code omitted
end

Returning to the accounts context, we will update the register_user/1 function to dispatch the command using consistency: :strong:

# lib/conduit/accounts/accounts.ex
defmodule Conduit.Accounts do
  @moduledoc """
  The boundary for the Accounts system.
  """

  alias Conduit.Accounts.Commands.RegisterUser
  alias Conduit.Accounts.Projections.User
  alias Conduit.Repo
  alias Conduit.Router

  @doc """
  Register a new user.
  """
  def register_user(attrs \\ %{}) do
    uuid = UUID.uuid4()

    register_user =
      attrs
      |> assign(:user_uuid, uuid)
      |> RegisterUser.new()

    with :ok <- Router.dispatch(register_user, consistency: :strong) do
      get(User, uuid)
    else
      reply -> reply
    end
  end

  defp get(schema, uuid) do
    case Repo.get(schema, uuid) do
      nil -> {:error, :not_found}
      projection -> {:ok, projection}
    end
  end

  defp assign(attrs, key, value), do: Map.put(attrs, key, value)
end

Now when we receive an :ok reply from command dispatch we can be assured that the user projection has been updated with our newly registered user. Allowing us to query the projection and return the populated %User{}. Let’s run the accounts test to check our changes:

$ mix test test/conduit/accounts/accounts_test.exs

Success, we have a passing test.

We still have one other failing test, but that’s useful as it directs us towards what needs to be worked on next: adding command validation.

Command validation

We want to build our application to ensure that most commands are successfully handled by an aggregate. The first way to achieve this is to limit which commands can be dispatched by only allowing acceptable commands to be shown to the end user in the UI. The second level of protection before a command reaches an aggregate is command validation; all commands should be validated before being passed to the aggregate.

There are three different levels of command validation that apply to an application:

  1. Command property validation: mandatory fields, data format, min/max ranges.
  2. Domain validation rules: prevent duplicate usernames, application state based validation logic.
  3. Business invariants: protection against invalid state changes.
Command property validation

Superficial command field validation is the simplest to apply. You add rules to each command property specifying its data type, whether it’s mandatory or optional, and apply basic range checking (e.g. date must be in the future). You can even apply cross field validation (e.g. start date must be before end date). These rules apply to the command itself, requiring no external information.

Property validation helps guard against common errors, such as the user forgetting to fill out a mandatory field, by applying the rules before accepting the command and rejecting upon validation failure. These rules can be applied at the user interface to help assist the user.

Domain validation rules

In our user registration feature we have a rule that usernames must be unique. To enforce this rule we must check that a given username does not yet exist when executing the register user command. This information will need to be read from a read model. We cannot enforce this rule within our user aggregate because each aggregate instance runs completely independent from any other. It’s not possible to have a command that affects, or uses, multiple aggregates since an aggregate is itself the transaction boundary.

You could decide that this invariant was important enough to warrant protection by using an aggregate whose purpose is to record and assign unique usernames. Its job would be to enforce uniqueness by acting as a gatekeeper to the user registration. A command, such as reserve username, could be used to claim the username. The aggregate would publish a domain event with the newly assigned username on success, allowing an event handler to then register the user with the guaranteed unique username.

In Elixir a GenServer process can be successfully used to enforce uniqueness as concurrent requests to a process are handled serially. The process would allow a username to be claimed or reserved during command dispatch, preventing any later request from using the same username. The only caveat to this approach is that the GenServer’s in-memory state must be persisted to storage so that it can be restarted with the list of already taken usernames.

Business invariants

An aggregate root must protect itself against commands that would cause an invariant to be broken. This includes attempting to execute a command that is invalid for the aggregate’s current state. An example would be attempting to rename an article that has been deleted. In this scenario the aggregate would return an {:error, reason} tagged tuple from the command execute/2 function.

For certain business operations you might decide to return a domain event indicating an error, rather than preventing the command execution. An example would be attempting to withdraw money from a bank account where the amount requested is larger than the account balance. Retail banks earn interest or fees when an account goes overdrawn, so rather than reject the withdraw money command, a bank account aggregate might instead allow the withdrawal and also return an account overdrawn domain event.

Applying command property validation

For command field validation we will be using Vex.

An extensible data validation library for Elixir.

Can be used to check different data types for compliance with criteria.

Ships with built-in validators to check for attribute presence, absence, inclusion, exclusion, format, length, acceptance, and by a custom function. You can easily define new validators and override existing ones.

Vex

We’ll add the vex package to our dependencies in mix.exs:

defp deps do
  [
    {:vex, "~> 0.6"},
    # ...
  ]
end

Fetch mix dependencies and compile:

$ mix do deps.get, deps.compile

Then we add validation rules for each of the fields in the command:

defmodule Conduit.Accounts.Commands.RegisterUser do
  defstruct [
    :user_uuid,
    :username,
    :email,
    :hashed_password,
  ]

  use ExConstructor
  use Vex.Struct

  validates :user_uuid, uuid: true
  validates :username, presence: true, string: true
  validates :email, presence: true, string: true
  validates :hashed_password, presence: true, string: true
end

For the uuid field we will use a custom validator that attempts to parse the given string as a UUID:

# lib/conduit/support/validators/uuid.ex
defmodule Conduit.Support.Validators.Uuid do
  use Vex.Validator

  def validate(value, _options) do
    Vex.Validators.By.validate(value, [function: &valid_uuid?/1, allow_nil: false, allow_blank: false])
  end

  defp valid_uuid?(uuid) do
    case UUID.info(uuid) do
      {:ok, _} -> true
      {:error, _} -> false
    end
  end
end

To validate string fields, such as username and email, we will use another custom validator:

# lib/conduit/support/validators/string.ex
defmodule Conduit.Support.Validators.String do
  use Vex.Validator

  def validate(nil, _options), do: :ok
  def validate("", _options), do: :ok
  def validate(value, _options) do
    Vex.Validators.By.validate(value, [function: &String.valid?/1])
  end
end

Then register these validators in config/config.exs:

config :vex,
  sources: [
    Conduit.Support.Validators,
    Vex.Validators
  ]

Once registered, we can verify a validator is configured using iex -S mix:

iex(1)> Vex.validator(:uuid)
Conduit.Support.Validators.Uuid

With the validation rules in place, we can validate a register user command as follows:

iex(1)> alias Conduit.Accounts.Commands.RegisterUser
Conduit.Accounts.Commands.RegisterUser
iex(2)> register_user = %RegisterUser{}
%Conduit.Accounts.Commands.RegisterUser{email: nil, hashed_password: nil,
 username: nil, user_uuid: nil}
iex(3)> Vex.valid?(register_user)
 false
iex(3)> Vex.results(register_user)
[{:error, :email, :presence, "must be present"}, {:ok, :email, :string},
 {:error, :hashed_password, :presence, "must be present"},
 {:ok, :hashed_password, :string},
 {:error, :username, :presence, "must be present"}, {:ok, :username, :string},
 {:error, :user_uuid, :uuid, "must be valid"}]

Validating dispatched commands

We’ve defined our command validation rules, now we need to apply them during command dispatch.

Commanded provides middleware as an the extension point to include cross-cutting concerns into command dispatch. This can be used to add in command validation, authorization, logging, and other behaviour that you want to be called for every command the router dispatches. You define your own middleware modules and register them in your command router. They are executed before, and after success or failure, of every dispatched command.

We will write a middleware module that implements the Commanded.Middleware behaviour. It uses the Vex.valid?/1 and Vex.errors/1 functions to validate commands before dispatch:

# lib/conduit/support/middleware/validate.ex
defmodule Conduit.Support.Middleware.Validate do
  @behaviour Commanded.Middleware

  alias Commanded.Middleware.Pipeline
  import Pipeline

  def before_dispatch(%Pipeline{command: command} = pipeline) do
    case Vex.valid?(command) do
      true -> pipeline
      false -> failed_validation(pipeline)
    end
  end

  def after_dispatch(pipeline), do: pipeline
  def after_failure(pipeline), do: pipeline

  defp failed_validation(%Pipeline{command: command} = pipeline) do
    errors = command |> Vex.errors() |> merge_errors()

    pipeline
    |> respond({:error, :validation_failure, errors})
    |> halt
  end

  defp merge_errors(errors) do
    errors
    |> Enum.group_by(
      fn {_error, field, _type, _message} -> field end,
      fn {_error, _field, _type, message} -> message end)
    |> Map.new()
  end
end

This middleware will return an {error, :validation_failure, errors} tagged tuple should a command fail validation. The errors will contain the collection of validation failures, per field, that can be returned and shown to the client.

The validation middleware module is registered in the router:

# lib/conduit/router.ex
defmodule Conduit.Router do
  use Commanded.Commands.Router

  alias Conduit.Accounts.Aggregates.User
  alias Conduit.Accounts.Commands.RegisterUser
  alias Conduit.Support.Middleware.Validate

  middleware Validate

  dispatch [RegisterUser], to: User, identity: :user_uuid
end

Testing user registration validation

Returning to our accounts test module, which includes our failing test:

# test/conduit/accounts/accounts_test.exs
defmodule Conduit.AccountsTest do
  use Conduit.DataCase

  alias Conduit.Accounts
  alias Conduit.Accounts.Projections.User

  describe "register user" do
    @tag :integration
    test "should succeed with valid data" do
      assert {:ok, %User{} = user} = Accounts.register_user(build(:user))

      assert user.username == "jake"
      assert user.email == "jake@jake.jake"
      assert user.hashed_password == "jakejake"
      assert user.bio == nil
      assert user.image == nil
    end

    @tag :integration
    test "should fail with invalid data and return error" do
      assert {:error, :validation_failure, errors} = Accounts.register_user(build(:user, username: ""))

      assert errors == %{username: ["can't be empty"]}
    end
  end
end

We can run the test again to check whether it passes:

$ mix test test/conduit/accounts/accounts_test.exs
Excluding tags: [:pending]
.
  1) test register user should fail with invalid data and return error (Conduit.AccountsTest)
     test/conduit/accounts/accounts_test.exs:21
     Assertion with == failed
     code:  errors == [username: ["can't be empty"]]
     left:  [username: ["must be present"]]
     right: [username: ["can't be empty"]]
     stacktrace:
       test/conduit/accounts/accounts_test.exs:24: (test)

Finished in 0.4 seconds
2 tests, 1 failure

It’s still failing, but only because the validation error message we’re expecting, “can’t be empty”, differs from the default validation error message provided by Vex, “must be present”.

We can provide our own message when registering the validation rules in the command:

# lib/conduit/accounts/commands/register_user.ex
defmodule Conduit.Accounts.Commands.RegisterUser do
  defstruct [
    :user_uuid,
    :username,
    :email,
    :hashed_password,
  ]

  use ExConstructor
  use Vex.Struct

  validates :user_uuid, uuid: true
  validates :username, presence: [message: "can't be empty"], string: true
  validates :email, presence: [message: "can't be empty"], string: true
  validates :hashed_password, presence: [message: "can't be empty"], string: true
end

Run the test again to see it succeed:

$ mix test test/conduit/accounts/accounts_test.exs
Compiling 3 files (.ex)
Excluding tags: [:pending]
..
Finished in 0.4 seconds
2 tests, 0 failures

We now have complete end-to-end user registration including command dispatch and validation, aggregate construction, domain event publishing, and read model projection. That covers the entire flow of a CQRS application from an initial command dispatch resulting in an updated read model available to query.

Enforce unique usernames

We’ve implemented basic command field validation using Vex. Now we need to move on to the second level validation: domain validation rules. Enforcing unique usernames when registering a new user will be the first that we’ll implement.

Typically domain validation will use a read model to query for application state. In our case we already have a user projection that contains the username. We even specified a unique index on the username column in the database migration:

create unique_index(:users, [:username])

The index will ensure that querying on this column is performant.

Let’s write an integration test to assert that registering the same username will fail with a useful error message:

@tag :integration
test "should fail when username already taken and return error" do
  assert {:ok, %User{}} = Accounts.register_user(build(:user))
  assert {:error, :validation_failure, errors} = Accounts.register_user(build(:user))

  assert errors == %{username: ["has already been taken"]}
end

Running this test will fail, so we need to implement the unique username validation rule. First we build a read model query to lookup the user projection by username:

# lib/conduit/accounts/queries/user_by_username.ex
defmodule Conduit.Accounts.Queries.UserByUsername do
  import Ecto.Query

  alias Conduit.Accounts.Projections.User

  def new(username) do
    from u in User,
    where: u.username == ^username
  end
end

This can be executed by passing the query to our Ecto repository: UserByUsername.new("jake") |> Conduit.Repo.one()

We use this query to expose a new public function in the accounts context: user_by_username/1:

# lib/conduit/accounts/accounts.ex (diff)
defmodule Conduit.Accounts do
  alias Conduit.Accounts.Queries.UserByUsername

  @doc """
  Get an existing user by their username, or return `nil` if not registered
  """
  def user_by_username(username) do
    username
    |> String.downcase()
    |> UserByUsername.new()
    |> Repo.one()
  end
end

Then we can check if a username already exists in the new unique username validator, added to the accounts context in lib/conduit/accounts/validators:

# lib/conduit/accounts/validators/unique_username.ex
defmodule Conduit.Accounts.Validators.UniqueUsername do
  use Vex.Validator

  alias Conduit.Accounts

  def validate(value, _options) do
    Vex.Validators.By.validate(value, [
      function: fn value -> !username_registered?(value) end,
      message: "has already been taken"
    ])
  end

  defp username_registered?(username) do
    case Accounts.user_by_username(username) do
      nil -> false
      _ -> true
    end
  end
end

Add the accounts validators to the vex config in config/config.exs:

config :vex,
  sources: [
    Conduit.Accounts.Validators,
    Conduit.Support.Validators,
    Vex.Validators
  ]

Then we can register the new validator against the username property:

# lib/conduit/accounts/commands/register_user.ex (diff)
defmodule Conduit.Accounts.Commands.RegisterUser do
  validates :username, presence: [message: "can't be empty"], string: true, unique_username: true
end

Run the accounts integration test and we now have three passing tests:

$ mix test test/conduit/accounts/accounts_test.exs
Excluding tags: [:pending]
...
Finished in 1.3 seconds
3 tests, 0 failures
Concurrent registration

We’ve included command validation to ensure unique usernames, and have tested this when registering one user after another. However, there’s a problem: attempting to register two users with the same username concurrently. The unique username validation will pass for both commands, allowing both users with an identical username to be created. This issue exists during the small period of time between registering the user and the read model being updated.

An integration test demonstrates the problem:

defmodule Conduit.AccountsTest do
  use Conduit.DataCase

  alias Conduit.Accounts
  alias Conduit.Accounts.User

  describe "register user" do
    @tag :integration
    test "should fail when registering identical username at same time and return error" do
      1..2
      |> Enum.map(fn _ -> Task.async(fn -> Accounts.register_user(build(:user)) end) end)
      |> Enum.map(&Task.await/1)
    end
  end
end

Since the issue exists only during concurrent command handling we can use another router middleware module to enforce uniqueness. In the before_dispatch/1 callback for the register user command we can attempt to claim the username. Should that fail, it indicates that another user registration for that username is currently being processed and return a validation failure.

The middleware will use a new Unique module that provides a claim/2 function. This attempts to reserve a unique value for a given context. It returns :ok on success, or {:error, :already_taken} on failure. To make the middleware reusable for other fields we define an Elixir protocol (UniqueFields) allowing commands to specify which fields are unique.

# lib/conduit/support/middleware/uniqueness.ex
defmodule Conduit.Support.Middleware.Uniqueness do
  @behaviour Commanded.Middleware

  defprotocol UniqueFields do
    @fallback_to_any true
    @doc "Returns unique fields for the command"
    def unique(command)
  end

  defimpl UniqueFields, for: Any do
    def unique(_command), do: []
  end

  alias Conduit.Support.Unique
  alias Commanded.Middleware.Pipeline

  import Pipeline

  def before_dispatch(%Pipeline{command: command} = pipeline) do
    case ensure_uniqueness(command) do
      :ok ->
        pipeline

      {:error, errors} ->
        pipeline
        |> respond({:error, :validation_failure, errors})
        |> halt()
    end
  end

  def after_dispatch(pipeline), do: pipeline
  def after_failure(pipeline), do: pipeline

  defp ensure_uniqueness(command) do
    command
    |> UniqueFields.unique()
    |> Enum.reduce_while(:ok, fn ({unique_field, error_message}, _) ->
      value = Map.get(command, unique_field)

      case Unique.claim(unique_field,  value) do
        :ok -> {:cont, :ok}
        {:error, :already_taken} -> {:halt, {:error, Keyword.new([{unique_field, error_message}])}}
      end
    end)
  end
end

For the RegisterUser command we specify the :username field must by unique by implementing the UniqueFields protocol:

defimpl Conduit.Support.Middleware.Uniqueness.UniqueFields, for: Conduit.Accounts.Commands.RegisterUser do
  def unique(_command), do: [
    {:username, "has already been taken"},
  ]
end

The new Uniqueness middleware is registered after command validation so that it will only be applied to valid commands:

# lib/conduit/router.ex
defmodule Conduit.Router do
  use Commanded.Commands.Router

  alias Conduit.Accounts.Aggregates.User
  alias Conduit.Accounts.Commands.RegisterUser
  alias Conduit.Support.Middleware.{Uniqueness,Validate}

  middleware Validate
  middleware Uniqueness

  dispatch [RegisterUser], to: User, identity: :user_uuid
end

We’ll use a GenServer to track assigned unique values. Its state is a mapping between a context, such as :username, and the already claimed values. Attempting to claim an assigned value for a context returns an {:error, :already_taken} tagged error.

# lib/conduit/support/unique.ex
defmodule Conduit.Support.Unique do
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end

  def claim(context, value) do
    GenServer.call(__MODULE__, {:claim, context, value})
  end

  def init(state), do: {:ok, state}

  def handle_call({:claim, context, value}, _from, assignments) do
    {reply, state} = case Map.get(assignments, context) do
      nil -> {:ok, Map.put(assignments, context, MapSet.new([value]))}
      values ->
        case MapSet.member?(values, value) do
          true -> {{:error, :already_taken}, assignments}
          false -> {:ok, Map.put(assignments, context, MapSet.put(values, value))}
        end
    end

    {:reply, reply, state}
  end
end

The Conduit.Support.Unique module is included in the application supervision tree, in lib/conduit/application.ex:

defmodule Conduit.Application do
  use Application

  def start(_type, _args) do
    import Supervisor.Spec

    children = [
      # ...

      # Enforce unique constraints
      worker(Conduit.Support.Unique, []),
    ]

    opts = [strategy: :one_for_one, name: Conduit.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

We now have unique usernames enforced as part of the register user command dispatch pipeline. This should prevent duplicate usernames from being registered at exactly the same time. We can verify this by running the integration tests again:

$ mix test --only integration
Including tags: [:integration]
Excluding tags: [:test, :pending]
....
Finished in 1.7 seconds
11 tests, 0 failures, 7 skipped

Additional username validation

There are two further validation rules to implement on usernames during registration:

  1. Must be lowercase.
  2. Must only contain alphanumeric characters (a-z, 0-9).

We can use a regular expression9 to enforce both of these rules.

First we add two integration tests to cover these requirements:

@tag :integration
test "should fail when username format is invalid and return error" do
  assert {:error, :validation_failure, errors} = Accounts.register_user(build(:user, username: "j@ke"))

  assert errors == %{username: ["is invalid"]}
end

@tag :integration
test "should convert username to lowercase" do
  assert {:ok, %User{} = user} = Accounts.register_user(build(:user, username: "JAKE"))

  assert user.username == "jake"
end

Vex supports regex validation using the format validator. We add this to the username validation rules in the register user command:

defmodule Conduit.Accounts.Commands.RegisterUser do
  validates :username,
    presence: [message: "can't be empty"],
    format: [with: ~r/^[a-z0-9]+$/, allow_nil: true, allow_blank: true, message: "is invalid"],
    string: true,
    unique_username: true
end

The allow_nil and allow_blank options are included as we already have validation to ensure the username is present. We don’t want duplicate error messages when it is not provided: “can’t be empty” and “is invalid”.

We need to convert the username to lowercase during registration in the Accounts context register_user/1 function. Let’s take the opportunity to make a small refactoring by moving the existing assign_uuid/2 function into the RegisterUser module. At the same time we will include a new downcase_username/1 function that does as described. These functions are chained together using the pipeline operator after constructing the RegisterUser command struct from the user supplied attributes.

defmodule Conduit.Accounts do
  @doc """
  Register a new user.
  """
  def register_user(attrs \\ %{}) do
    uuid = UUID.uuid4()

    register_user =
      attrs
      |> RegisterUser.new()
      |> RegisterUser.assign_uuid(uuid)
      |> RegisterUser.downcase_username()

    with :ok <- Router.dispatch(register_user, consistency: :strong) do
      get(User, uuid)
    else
      reply -> reply
    end
  end
end

The new functions are added to the RegisterUser command:

defmodule Conduit.Accounts.Commands.RegisterUser do
  alias Conduit.Accounts.Commands.RegisterUser

  @doc """
  Assign a unique identity for the user
  """
  def assign_uuid(%RegisterUser{} = register_user, uuid) do
    %RegisterUser{register_user | user_uuid: uuid}
  end

  @doc """
  Convert username to lowercase characters
  """
  def downcase_username(%RegisterUser{username: username} = register_user) do
    %RegisterUser{register_user | username: String.downcase(username)}
  end
end

Running the integration test suite confirms our changes are good.

Validating a user’s email address

We can now apply the same strategy to email address validation. The rules we need to enforce are that an email address:

  1. Must be unique.
  2. Must be lowercase.
  3. Must be in the desired format: contain an @ character.

The implementation will follow a similar approach to how we validated usernames.

First, we write failing tests to cover the scenarios above:

@tag :integration
test "should fail when email address already taken and return error" do
  assert {:ok, %User{}} = Accounts.register_user(build(:user, username: "jake"))
  assert {:error, :validation_failure, errors} = Accounts.register_user(build(:user, username: "jake2"))

  assert errors == %{email: ["has already been taken"]}
end

@tag :integration
test "should fail when registering identical email addresses at same time and return error" do
  1..2
  |> Enum.map(fn x -> Task.async(fn -> Accounts.register_user(build(:user, username: "user#{x}")) end)  en\
d)
  |> Enum.map(&Task.await/1)
end

@tag :integration
test "should fail when email address format is invalid and return error" do
  assert {:error, :validation_failure, errors} = Accounts.register_user(build(:user, email: "invalidemail"\
))

  assert errors == %{email: ["is invalid"]}
end

@tag :integration
test "should convert email address to lowercase" do
  assert {:ok, %User{} = user} = Accounts.register_user(build(:user, email: "JAKE@JAKE.JAKE"))

  assert user.email == "jake@jake.jake"
end

Second, extend email validation in the command:

defmodule Conduit.Accounts.Commands.RegisterUser do
  validates :email,
    presence: [message: "can't be empty"],
    format: [with: ~r/\S+@\S+\.\S+/, allow_nil: true, allow_blank: true, message: "is invalid"],
    string: true,
    unique_email: true
end

Third, we create the new unique email validator:

# lib/conduit/accounts/validators/unique_email.ex
defmodule Conduit.Accounts.Validators.UniqueEmail do
  use Vex.Validator

  alias Conduit.Accounts

  def validate(value, _options) do
    Vex.Validators.By.validate(value, [
      function: fn value -> !email_registered?(value) end,
      message: "has already been taken"
    ])
  end

  defp email_registered?(email) do
    case Accounts.user_by_email(email) do
      nil -> false
      _ -> true
    end
  end
end

This also requires a new public user_by_email/1 function in the accounts context to retrieve a user by their email address:

defmodule Conduit.Accounts do
  @doc """
  Get an existing user by their email address, or return `nil` if not registered
  """
  def user_by_email(email) when is_binary(email) do
    email
    |> String.downcase()
    |> UserByEmail.new()
    |> Repo.one()
  end
end

The UserByEmail query is a module that constructs a standard Ecto query:

# lib/conduit/accounts/queries/user_by_email.ex
defmodule Conduit.Accounts.Queries.UserByEmail do
  import Ecto.Query

  alias Conduit.Accounts.Projections.User

  def new(email) do
    from u in User,
    where: u.email == ^email
  end
end

Fourth, we extend the UniqueFields protocol implementation for the register user command to include email address:

defimpl Conduit.Support.Middleware.Uniqueness.UniqueFields, for: Conduit.Accounts.Commands.RegisterUser do
  def unique(_command), do: [
    {:email, "has already been taken"},
    {:username, "has already been taken"},
  ]
end

Last we include the RegisterUser.downcase_email/1 function in the register user pipeline:

defmodule Conduit.Accounts do
  @doc """
  Register a new user.
  """
  def register_user(attrs \\ %{}) do
    uuid = UUID.uuid4()

    register_user =
      attrs
      |> RegisterUser.new()
      |> RegisterUser.assign_uuid(uuid)
      |> RegisterUser.downcase_username()
      |> RegisterUser.downcase_email()

    with :ok <- Router.dispatch(register_user, consistency: :strong) do
      get(User, uuid)
    else
      reply -> reply
    end
  end
end

That completes the email address validation: we run the integration test suite again to confirm this with passing tests.

Hashing the user’s password

We don’t want to store a user’s password anywhere in our application. Instead we’ll use a one-way hashing function and store the password hash. To authenticate a user during login we hash the password they provide, using the same algorithm, and compare it with the stored password hash: not the actual password.

For Conduit we’ll use the bcrypt10 password hashing function as described in how to safely store a password using bcrypt. The Comeonin library provides an implementation of the bcrypt hashing function in Elixir.

Password hashing (bcrypt, pbkdf2_sha512 and one-time passwords) library for Elixir.

This library is intended to make it very straightforward for developers to check users’ passwords in as secure a manner as possible.

Comeonin

Add comeonin and bcrypt_elixir to dependencies in mix.exs:

defp deps do
  [
    # ...
    {:bcrypt_elixir, "~> 1.0"},
    {:comeonin, "~> 4.0"},
  ]
end

Fetch mix dependencies and compile:

$ mix do deps.get, deps.compile

For our test environment only we will reduce the number of bcrypt rounds so it doesn’t slow down our test suite. In config/test.exs we configure comeonin as follows:

config :comeonin, :bcrypt_log_rounds, 4

We’ll create a Conduit.Auth module to wrap the Comeonin library’s bcrypt hashing functions:

# lib/conduit/accounts/auth.ex
defmodule Conduit.Auth do
  @moduledoc """
  Authentication using the bcrypt password hashing function.
  """

  alias Comeonin.Bcrypt

  def hash_password(password), do: Bcrypt.hashpwsalt(password)
  def validate_password(password, hash), do: Bcrypt.checkpw(password, hash)
end

Then create an integration test to verify the password is being hashed and stored in the user read model. For the test assertion we use the Auth.validate_password/2 function, shown above, which hashes the provided password, jakejake, and compares with the already hashed password saved for the user, such as $2b$04$W7A/lWysNVUqeYg8vjKCXeBniHoks4jmRziKDmACO.fvqo3wdqsea. Remember that we never store the user’s password, only a one-way hash.

@tag :integration
test "should hash password" do
  assert {:ok, %User{} = user} = Accounts.register_user(build(:user))

  assert Auth.validate_password("jakejake", user.hashed_password)
end

Next we include a password field in the register user command struct, to contain the user provided password in plain text. We add a hash_password/1 function that hashes the password, stores the hash value as hashed_password, and clears the original plain text password. This prevents the user’s password from being exposed by any command auditing.

defmodule Conduit.Accounts.Commands.RegisterUser do
  defstruct [
    user_uuid: "",
    username: "",
    email: "",
    password: "",
    hashed_password: "",
  ]

  @doc """
  Hash the password, clear the original password
  """
  def hash_password(%RegisterUser{password: password} = register_user) do
    %RegisterUser{register_user |
      password: nil,
      hashed_password: Auth.hash_password(password),
    }
  end
end

The final change is to include this function in the register user command dispatch chain:

defmodule Conduit.Accounts do
  @doc """
  Register a new user.
  """
  def register_user(attrs \\ %{}) do
    uuid = UUID.uuid4()

    register_user =
      attrs
      |> RegisterUser.new()
      |> RegisterUser.assign_uuid(uuid)
      |> RegisterUser.downcase_username()
      |> RegisterUser.downcase_email()
      |> RegisterUser.hash_password()

    with :ok <- Router.dispatch(register_user, consistency: :strong) do
      get(User, uuid)
    else
      reply -> reply
    end
  end
end

We’ve now successfully hashed the user’s password during registration, helping to protect our users’ security should our deployed environment be compromised and database accessed. The Comeonin library will generate a different 16 character length salt for each hashed password by default. This is another layer of protection against hashed password dictionary and rainbow table attacks.

Completing user registration

With user registration done, at least from the accounts context, we return to our acceptance criteria defined in the UserControllerTest integration test. To specify the initial requirements and direct our development efforts we started out by writing end-to-end tests to ensure that the /api/users registration endpoint adheres to the requirements of the JSON API.

On successful registration the following response should be returned:

{
  "user": {
    "email": "jake@jake.jake",
    "token": "jwt.token.here",
    "username": "jake",
    "bio": null,
    "image": null
  }
}

For now we will skip the authentication token, that will be addressed in the next chapter.

The integration test for successful user registration asserts against the JSON returned from a POST request to /api/users in the UserControllerTest module:

@tag :web
test "should create and return user when data is valid", %{conn: conn} do
  conn = post conn, user_path(conn, :create), user: build(:user)
  json = json_response(conn, 201)["user"]

  assert json == %{
    "bio" => nil,
    "email" => "jake@jake.jake",
    "image" => nil,
    "username" => "jake",
  }
end

Running the test still results in a failure, so there’s more work for us to do. We need to modify the user view and select a subset of the fields from our user projection to be returned as JSON data:

# lib/conduit_web/views/user_view.ex
defmodule ConduitWeb.UserView do
  use ConduitWeb, :view
  alias ConduitWeb.UserView

  def render("index.json", %{users: users}) do
    %{users: render_many(users, UserView, "user.json")}
  end

  def render("show.json", %{user: user}) do
    %{user: render_one(user, UserView, "user.json")}
  end

  def render("user.json", %{user: user}) do
    %{
      username: user.username,
      email: user.email,
      bio: user.bio,
      image: user.image,
    }
  end
end

As per the API spec we only return the username, email, bio, and image fields.

Next we need to handle the case where validation errors are returned during command dispatch. The request should fail with a 422 HTTP status code and the response body would be in the following format:

{
  "errors": {
    "username": [
      "can't be empty"
    ]
  }
}

This scenario is covered by the following test:

@tag :web
test "should not create user and render errors when data is invalid", %{conn: conn} do
  conn = post conn, user_path(conn, :create), user: build(:user, username: "")
  assert json_response(conn, 422)["errors"] == %{
    "username" => [
      "can't be empty",
    ]
  }
end

To achieve this we will use a new feature in Phoenix 1.3, the action_fallback plug for controllers to support generic error handling. Including the plug inside a controller allows you to ignore errors, and only handle the successful case. Take a look at our existing user controller, where we only pattern match on the {:ok, user} successful outcome:

# lib/conduit_web/controllers/user_controller.ex
defmodule ConduitWeb.UserController do
  use ConduitWeb, :controller

  alias Conduit.Accounts
  alias Conduit.Accounts.Projections.User

  action_fallback ConduitWeb.FallbackController

  def create(conn, %{"user" => user_params}) do
    with {:ok, %User{} = user} <- Accounts.register_user(user_params) do
      conn
      |> put_status(:created)
      |> render("show.json", user: user)
    end
  end
end

Any errors that aren’t handled within your controller can be dealt with by the configured fallback controller. We pattern match on the {:error, :validation_failure, errors} tagged error tuple returned when command dispatch fails due to a validation failure. The errors are rendered using a new validation view module and returned with an HTTP 422 “Unprocessable Entity” status code:

# lib/conduit_web/controllers/fallback_controller.ex
defmodule ConduitWeb.FallbackController do
  use ConduitWeb, :controller

  def call(conn, {:error, :validation_failure, errors}) do
    conn
    |> put_status(:unprocessable_entity)
    |> render(ConduitWeb.ValidationView, "error.json", errors: errors)
  end

  def call(conn, {:error, :not_found}) do
    conn
    |> put_status(:not_found)
    |> render(ConduitWeb.ErrorView, :"404")
  end
end

The validation view returns a map containing the errors that is rendered as JSON:

# lib/conduit_web/views/validation_view.ex
defmodule ConduitWeb.ValidationView do
  use ConduitWeb, :view

  def render("error.json", %{errors: errors}) do
    %{errors: errors}
  end
end

We can run the integration tests tagged with @web after making these changes, and the good news is they all pass:

$ mix test --only web
Including tags: [:web]
Excluding tags: [:test, :pending]

...

Finished in 2.3 seconds
18 tests, 0 failures, 15 skipped

Having completed user registration, we now move on to authentication in the next chapter.

Authentication

Authenticate a user

The API spec for authentication is as follows:

HTTP verb URL Required fields
POST /api/users/login email, password

Example request body:

{
  "user":{
    "email": "jake@jake.jake",
    "password": "jakejake"
  }
}

Example response body:

{
  "user": {
    "email": "jake@jake.jake",
    "token": "jwt.token.here",
    "username": "jake",
    "bio": null,
    "image": null
  }
}

Example failure response body:

{
  "errors": {
    "email or password": [
      "is invalid"
    ]
  }
}

The successful login response includes a JSON Web Token (JWT). This token is included in the HTTP headers on subsequent requests to authorize the user’s actions. We’ll use Guardian to authenticate users and take advantage of its support for JWT tokens.

An authentication framework for use with Elixir applications.

Guardian

Guardian provides a number of Plug modules to include within the Phoenix request handling pipeline. We’ll make use of the following three plugs for the Conduit API:

Plug Usage
Guardian.Plug.VerifyHeader Looks for a token in the Authorization header.
  Useful for APIs.
  If one is not found, this does nothing.
Guardian.Plug.EnsureAuthenticated Looks for a previously verified token.
  If one is found, continues.
  Otherwise it will call the :unauthenticated function of your handler.
Guardian.Plug.LoadResource Looks in the sub field of the token,
  fetches the resource from the configured serializer
  and makes it available via Guardian.Plug.current_resource(conn).

In mix.exs, add the guardian package as a dependency:

defp deps do
  [
    # ...
    {:guardian, "~> 0.14"},
  ]
end

Fetch and compile the mix dependencies:

$ mix do deps.get, deps.compile

Guardian requires a secret key to be generated for our application. We can use the “secret generator” mix task provided by Phoenix to do this:

$ mix phx.gen.secret
IOjbrty1eMEBzc5aczQn0FR4Gd8P9IF1cC7tqwB7ThV/uKjS5mrResG1Y0lCzTNJ

Configure Guardian in config/config.exs, including copying the key from above into secret_key:

config :guardian, Guardian,
  allowed_algos: ["HS512"],
  verify_module: Guardian.JWT,
  issuer: "Conduit",
  ttl: {30, :days},
  allowed_drift: 2000,
  verify_issuer: true,
  secret_key: "IOjbrty1eMEBzc5aczQn0FR4Gd8P9IF1cC7tqwB7ThV/uKjS5mrResG1Y0lCzTNJ",
  serializer: Conduit.Auth.GuardianSerializer

Guardian requires you to implement a serializer, as specified in the config above, to encode and decode your resources into and out of the JWT token. The only resource we’re interested in are users. We can encode the user’s UUID into the token, and later use it to fetch the user projection from the read model.

At this point we will move the existing Conduit.Auth module into its own context. This will allows us to keep authentication concerns, such as password hashing, separate from user accounts.

The Guardian serializer module is created at lib/conduit/auth/guardian_serializer.ex:

# lib/conduit/auth/guardian_serializer.ex
defmodule Conduit.Auth.GuardianSerializer do
  @moduledoc """
  Used by Guardian to serialize a JWT token
  """

  @behaviour Guardian.Serializer

  alias Conduit.Accounts
  alias Conduit.Accounts.Projections.User

  def for_token(%User{} = user), do: {:ok, "User:#{user.uuid}"}
  def for_token(_), do: {:error, "Unknown resource type"}

  def from_token("User:" <> uuid), do: {:ok, Accounts.user_by_uuid(uuid)}
  def from_token(_), do: {:error, "Unknown resource type"}
end

We need to add the user_by_uuid/1 function to the accounts context:

defmodule Conduit.Accounts do
  @doc """
  Get a single user by their UUID
  """
  def user_by_uuid(uuid) when is_binary(uuid) do
    Repo.get(User, uuid)
  end
end

The Conduit API specs show the authentication header is in the following format:

Authorization: Token jwt.token.here

So we need to prefix the JWT token with the word Token. To do this we configure the Phoenix web router, in lib/conduit_web/router.ex, and instruct Guardian to use Token as the realm:

defmodule ConduitWeb.Router do
  use ConduitWeb, :router

  pipeline :api do
    plug :accepts, ["json"]
    plug Guardian.Plug.VerifyHeader, realm: "Token"
    plug Guardian.Plug.LoadResource
  end

  # ... routes omitted
end

We will create a new session controller to support user login. It will authenticate the user from the provided email and password and return the user’s details as JSON:

# lib/conduit_web/controllers/session_controller.ex
defmodule ConduitWeb.SessionController do
  use ConduitWeb, :controller

  alias Conduit.Auth
  alias Conduit.Accounts.Projections.User

  action_fallback ConduitWeb.FallbackController

  def create(conn, %{"user" => %{"email" => email, "password" => password}}) do
    case Auth.authenticate(email, password) do
      {:ok, %User{} = user} ->
        conn
        |> put_status(:created)
        |> render(ConduitWeb.UserView, "show.json", user: user)

      {:error, :unauthenticated} ->
        conn
        |> put_status(:unprocessable_entity)
        |> render(ConduitWeb.ValidationView, "error.json", errors: %{"email or password" => ["is invalid"]\
})
    end
  end
end

An error is returned with a 422 HTTP status code and a generic “is invalid” error message for the email or password on login failure. The existing user and validation views are reused for rendering the response as JSON.

The session controller uses a new public function in the auth context: authenticate/2

This function will look for an existing user by their email address, and then compare their stored hashed password with the password provided hashed using the same bcrypt hash function. An {:error, :unauthenticated} tagged tuple is returned on failure:

# lib/conduit/auth/auth.ex
defmodule Conduit.Auth do
  @moduledoc """
  Boundary for authentication.
  Uses the bcrypt password hashing function.
  """

  alias Comeonin.Bcrypt

  alias Conduit.Accounts
  alias Conduit.Accounts.Projections.User

  def authenticate(email, password) do
    with {:ok, user} <- user_by_email(email) do
      check_password(user, password)
   else
     reply -> reply
   end
  end

  def hash_password(password), do: Bcrypt.hashpwsalt(password)
  def validate_password(password, hash), do: Bcrypt.checkpw(password, hash)

  defp user_by_email(email) do
    case Accounts.user_by_email(email) do
      nil -> {:error, :unauthenticated}
      user -> {:ok, user}
    end
  end

  defp check_password(%User{hashed_password: hashed_password} = user, password) do
    case validate_password(password, hashed_password) do
      true -> {:ok, user}
      _ -> {:error, :unauthenticated}
    end
  end
end

The POST /api/users/login action, mapped to the new session controller, is added to the router:

defmodule ConduitWeb.Router do
  use ConduitWeb, :router

  # ... pipeline omitted

  scope "/api", ConduitWeb do
    pipe_through :api

    post "/users/login", SessionController, :create
    post "/users", UserController, :create
  end
end

With the controller and routing configured we can write a web integration test to verify the functionality. In test/conduit_web/controllers/session_controller_test.exs we use the Phoenix connection test case to access helper functions for controllers.

There are three scenarios to test:

  1. Successfully authenticating an existing user and valid password.
  2. Failing to authenticate a known user when the password is incorrect.
  3. Failing to authenticate an unknown user.
# test/conduit/web/controllers/session_controller_test.exs
defmodule ConduitWeb.SessionControllerTest do
  use ConduitWeb.ConnCase

  setup %{conn: conn} do
    {:ok, conn: put_req_header(conn, "accept", "application/json")}
  end

  describe "authenticate user" do
    @tag :web
    test "creates session and renders session when data is valid", %{conn: conn} do
      register_user()

      conn = post conn, session_path(conn, :create), user: %{
        email: "jake@jake.jake",
        password: "jakejake"
      }

      assert json_response(conn, 201)["user"] == %{
        "bio" => nil,
        "email" =>
        "jake@jake.jake",
        "image" => nil,
        "username" => "jake",
      }
    end

    @tag :web
    test "does not create session and renders errors when password does not match", %{conn: conn} do
      register_user()

      conn = post conn, session_path(conn, :create), user: %{
        email: "jake@jake.jake",
        password: "invalidpassword"
      }

      assert json_response(conn, 422)["errors"] == %{
        "email or password" => [
          "is invalid"
        ]
      }
    end

    @tag :web
    test "does not create session and renders errors when user not found", %{conn: conn} do
      conn = post conn, session_path(conn, :create), user: %{
        email: "doesnotexist",
        password: "jakejake"
      }

      assert json_response(conn, 422)["errors"] == %{
        "email or password" => [
          "is invalid"
        ]
      }
    end
  end

  defp register_user, do: fixture(:user)
end

Run the new web tests, mix test --only web, to confirm that our changes are good.

Generating a JWT token

User authentication is now working, but we’ve omitted a necessary part of the user data returned as JSON from the login and register user actions. In both cases our response does not include the JWT token as shown in the example response:

{
  "user": {
    "email": "jake@jake.jake",
    "token": "jwt.token.here",
    "username": "jake"
  }
}

We need to rectify that omission by including the token in the response. First, we’ll include the token property in the session controller test. We assert that it is not empty when successfully authenticating a user:

defmodule ConduitWeb.SessionControllerTest do
  use ConduitWeb.ConnCase

  setup %{conn: conn} do
    {:ok, conn: put_req_header(conn, "accept", "application/json")}
  end

  describe "authenticate user" do
    @tag :web
    test "creates session and renders session when data is valid", %{conn: conn} do
      register_user()

      conn = post conn, session_path(conn, :create), user: %{
        email: "jake@jake.jake",
        password: "jakejake"
      }
      json = json_response(conn, 201)["user"]
      token = json["token"]

      assert json == %{
        "bio" => nil,
        "email" => "jake@jake.jake",
        "token" => token,
        "image" => nil,
        "username" => "jake",
      }
      refute token == ""
    end
  end
end

Let’s use Guardian to generate the token for us. It will use the Conduit.Auth.GuardianSerializer module we’ve already written and configured to serialize our user resource into a string for inclusion in the token.

To generate the JWT we use Guardian.encode_and_sign/2 by adding a Conduit.JWT module and wrapper function in lib/conduit_web/jwt.ex:

# lib/conduit_web/jwt.ex
defmodule ConduitWeb.JWT do
  @moduledoc """
  JSON Web Token helper functions, using Guardian
  """

  def generate_jwt(resource, type \\ :token) do
    case Guardian.encode_and_sign(resource, type) do
      {:ok, jwt, _full_claims} -> {:ok, jwt}
    end
  end
end

Since the token generation will be used in both the session and user controllers we will import the ConduitWeb.JWT module in the Phoenix controller macro, in lib/conduit_web/web.ex. This makes the generate_jwt/2 function available to use in all of our web controllers.

defmodule ConduitWeb do
  def controller do
    quote do
      use Phoenix.Controller, namespace: ConduitWeb
      import Plug.Conn
      import ConduitWeb.Router.Helpers
      import ConduitWeb.Gettext
      import ConduitWeb.JWT
    end
  end

  # ... view, router, and channel definitions omitted
end

The session controller needs to be updated to generate the JWT after authenticating the user. The JWT token is passed to the render function to make it available to the view:

# lib/conduit_web/controllers/session_controller.ex
defmodule ConduitWeb.SessionController do
  use ConduitWeb, :controller

  alias Conduit.Auth
  alias Conduit.Accounts.Projections.User

  action_fallback ConduitWeb.FallbackController

  def create(conn, %{"user" => %{"email" => email, "password" => password}}) do
    with {:ok, %User{} = user} <- Auth.authenticate(email, password),
         {:ok, jwt} <- generate_jwt(user) do
       conn
        |> put_status(:created)
        |> render(ConduitWeb.UserView, "show.json", user: user, jwt: jwt)
    else
      {:error, :unauthenticated} ->
        conn
        |> put_status(:unprocessable_entity)
        |> render(ConduitWeb.ValidationView, "error.json", errors: %{"email or password" => ["is invalid"]\
})
    end
  end
end

The render function in the user view for a single user merges the JWT token into the user data that is rendered as JSON:

# lib/conduit_web/views/user_view.ex
defmodule ConduitWeb.UserView do
  use ConduitWeb, :view
  alias ConduitWeb.UserView

  def render("index.json", %{users: users}) do
    %{users: render_many(users, UserView, "user.json")}
  end

  def render("show.json", %{user: user, jwt: jwt}) do
    %{user: user |> render_one(UserView, "user.json") |> Map.merge(%{token: jwt})}
  end

  def render("user.json", %{user: user}) do
    %{
      username: user.username,
      email: user.email,
      bio: user.bio,
      image: user.image,
    }
  end
end

Running the web tests again, mix test --only web, confirms that the token is successfully generated and included in the response.

Getting the current user

An authenticated HTTP GET request to /api/user should return a JSON representation of the current user. Authentication is determined by the presence of a valid HTTP request header containing the JWT token: Authorization: Token jwt.token.here.

We will start by adding two new tests to the user controller to verify the following scenarios:

  1. Successful authentication, with a valid JWT token, returns the current user as JSON data.
  2. An invalid request, missing a JWT token, returns a 401 Unauthorized response.

To support a valid request we must register a user and generate a JWT token for them in the test setup. The token is included in the request headers of the test connection using the authenticated_conn/1 function:

defmodule ConduitWeb.UserControllerTest do
  use ConduitWeb.ConnCase

  setup %{conn: conn} do
    {:ok, conn: put_req_header(conn, "accept", "application/json")}
  end

  describe "get current user" do
    @tag :web
    test "should return user when authenticated", %{conn: conn} do
      conn = get authenticated_conn(conn), user_path(conn, :current)
      json = json_response(conn, 200)["user"]
      token = json["token"]

      assert json == %{
        "bio" => nil,
        "email" => "jake@jake.jake",
        "token" => token,
        "image" => nil,
        "username" => "jake",
      }
      refute token == ""
    end

    @tag :web
    test "should not return user when unauthenticated", %{conn: conn} do
      conn = get conn, user_path(conn, :current)

      assert response(conn, 401) == ""
    end
  end

  def authenticated_conn(conn) do
    with {:ok, user} <- fixture(:user),
         {:ok, jwt} <- ConduitWeb.JWT.generate_jwt(user)
    do
      conn
      |> put_req_header("authorization", "Token " <> jwt)
    end
  end
end

The failing tests guide us towards our next code change, we need to register the /api/user route in the router:

defmodule ConduitWeb.Router do
  use ConduitWeb, :router

  # ... pipeline omitted

  scope "/api", ConduitWeb do
    pipe_through :api

    get "/user", UserController, :current
    post "/users/login", SessionController, :create
    post "/users", UserController, :create
  end
end

Next we add a current function to the user controller module. Before doing so we’ll take advantage of Guardian’s built in support for Phoenix controllers. Using the Guardian.Phoenix.Controller module in our controller provides easier access to the current user and their claims. The public controller functions are extended to accept two additional parameters, user and claims, as shown below.

Before: def current(conn, params) do

After: def current(conn, params, user, claims) do

We will also use two plugs provided by Guardian:

  1. Guardian.Plug.EnsureAuthenticated ensures a verified token exists.
  2. Guardian.Plug.EnsureResource guards against a resource not found.

Both plugs require us to implement an error handler module that deals with failure cases. In lib/conduit_web/error_handler.ex we provide functions for the three main error cases. They each return an appropriate HTTP error status code and an empty response body:

# lib/conduit_web/error_handler.ex
defmodule ConduitWeb.ErrorHandler do
  import Plug.Conn

  @doc """
  Return 401 for "Unauthorized" requests

  A request requires authentication but it isn't provided
  """
  def unauthenticated(conn, _params), do: respond_with(conn, :unauthorized)

  @doc """
  Return 403 for "Forbidden" requests

  A request may be valid, but the user doesn't have permissions to perform the action
  """
  def unauthorized(conn, _params), do: respond_with(conn, :forbidden)

  @doc """
  Return 401 for "Unauthorized" requests

  A request requires authentication but the resource has not been found
  """
  def no_resource(conn, _params), do: respond_with(conn, :unauthorized)

  defp respond_with(conn, status) do
    conn
    |> put_resp_content_type("application/json")
    |> send_resp(status, "")
  end
end

The Guardian plugs are configured with our error handler module and to only apply to the current controller action. This action returns the authenticated current user, and their JWT token, as JSON data:

# lib/conduit_web/controllers/user_controller.ex
defmodule ConduitWeb.UserController do
  use ConduitWeb, :controller
  use Guardian.Phoenix.Controller

  alias Conduit.Accounts
  alias Conduit.Accounts.Projections.User

  action_fallback ConduitWeb.FallbackController

  plug Guardian.Plug.EnsureAuthenticated, %{handler: ConduitWeb.ErrorHandler} when action in [:current]
  plug Guardian.Plug.EnsureResource, %{handler: ConduitWeb.ErrorHandler} when action in [:current]

  def create(conn, %{"user" => user_params}, _user, _claims) do
    with {:ok, %User{} = user} <- Accounts.register_user(user_params),
         {:ok, jwt} = generate_jwt(user) do
      conn
      |> put_status(:created)
      |> render("show.json", user: user, jwt: jwt)
    end
  end

  def current(conn, _params, user, _claims) do
    jwt = Guardian.Plug.current_token(conn)

    conn
    |> put_status(:ok)
    |> render("show.json", user: user, jwt: jwt)
  end
end

When an unauthenticated user requests /api/user the Guardian.Plug.EnsureAuthenticated plug will step in. It redirects the request to our error handler module, which responds with a 401 unauthorized status code.

Run the web tests, mix test --only web, to confirm the new route is working as per the API spec.

We’ve now built out the basic user registration and authentication features required for Conduit. Let’s move on to authoring articles in the next chapter.

Articles

Publishing an article

The API spec for creating an article is as follows:

HTTP verb URL Required fields
POST /api/articles title, description, body

Example request body:

{
  "article": {
    "title": "How to train your dragon",
    "description": "Ever wonder how?",
    "body": "You have to believe",
    "tagList": ["dragons", "training"]
  }
}

Example response body:

{
  "article": {
    "slug": "how-to-train-your-dragon",
    "title": "How to train your dragon",
    "description": "Ever wonder how?",
    "body": "You have to believe",
    "tagList": ["dragons", "training"],
    "createdAt": "2016-02-18T03:22:56.637Z",
    "updatedAt": "2016-02-18T03:48:35.824Z",
    "favorited": false,
    "favoritesCount": 0,
    "author": {
      "username": "jake",
      "bio": "I work at statefarm",
      "image": "https://i.stack.imgur.com/xHWG8.jpg",
      "following": false
    }
  }
}

We’ll use the phx.gen.json generator once again to create a new context for articles. The generator will create the blog context, schema, controller, and JSON view. We already know which fields we need for articles so we can include them, with their types, in the generator command:

$ mix phx.gen.json Blog Article articles slug:text title:text description:text body:text tag_list:array:te\
xt favorite_count:integer published_at:naive_datetime author_uuid:binary author_username:text author_bio:t\
ext author_image:text --table blog_articles

Overall, this generator will add the following files to lib/conduit:

  • Context module in lib/conduit/blog/blog.ex, serving as the public API boundary.
  • Ecto schema in lib/conduit/blog/article.ex, and a database migration to create the blog_articles table.
  • View in lib/conduit_web/views/article_view.ex.
  • Controller in lib/conduit_web/controllers/article_controller.ex.
  • Unit and integration tests in test/conduit/blog.

The only change to the generated module locations is to move the article Ecto schema into the lib/conduit/blog/projections folder, not the blog context root.

Authoring articles

Before we begin publishing articles we’ll take a small detour since we first need a way of identifying their author. We could just use our existing user aggregate and read model projection as a convenience. However, we’ve already determined that accounts and blog are separate contexts, therefore they shouldn’t necessarily share models.

Instead, we’re going to model authors as part of the blog context, segregated from users, but have them related by their identity. There will be a one-to-one mapping from user accounts to blog authors. A benefit of this separation is that the user and author models are only responsible for concerns related to their own role; the user model deals with a user’s email and password, whereas the author model will contain their bio, profile image, and can be used for tracking followers.

We’ll need to build an author aggregate, create author command and author created domain event, and use a Commanded event handler to create an author whenever a user is registered. But first let’s define an integration test that verifies an author is created after successful user registration.

The following integration test uses the assert_receive_event helper function from Commanded’s event assertions module. Here we assert that an AuthorCreated domain event is created at some point after user registration and verify it’s for the same user:

# test/conduit/blog/author_test.exs
defmodule Conduit.Blog.AuthorTest do
  use Conduit.DataCase

  import Commanded.Assertions.EventAssertions

  alias Conduit.Accounts
  alias Conduit.Accounts.Projections.User
  alias Conduit.Blog.Events.AuthorCreated

  describe "an author" do
    @tag :integration
    test "should be created when user registered" do
      assert {:ok, %User{} = user} = Accounts.register_user(build(:user))

      assert_receive_event AuthorCreated, fn event ->
        assert event.user_uuid == user.uuid
        assert event.username == user.username
      end
    end
  end
end

As mentioned above we’ll use an event handler to create the author whenever a UserRegistered event occurs. An event handler is used whenever you need to react to a domain event being created. It’s a good extension point to use for adding auxiliary concerns and integrating separate contexts.

The handler below will delegate to a create_author/1 function we will define in the new blog context. Since the handler is modelling a business process I’ve defined it in a workflows folder within the blog context and have named it after its behaviour:

# lib/conduit/blog/workflows/create_author_from_user.ex
defmodule Conduit.Blog.Workflows.CreateAuthorFromUser do
  use Commanded.Event.Handler,
    name: "Blog.Workflows.CreateAuthorFromUser",
    consistency: :strong

  alias Conduit.Accounts.Events.UserRegistered
  alias Conduit.Blog

  def handle(%UserRegistered{user_uuid: user_uuid, username: username}, _metadata) do
    with {:ok, _author} <- Blog.create_author(%{user_uuid: user_uuid, username: username}) do
      :ok
    else
      reply -> reply
    end
  end
end

In the blog context we add the new create_author/1 function to dispatch a CreateAuthor command. It has a reference to the associated user aggregate by uuid and also includes the username:

# lib/conduit/blog/blog.ex
defmodule Conduit.Blog do
  @moduledoc """
  The boundary for the Blog system.
  """

  import Ecto.Query, warn: false

  alias Conduit.Blog.Projections.Article
  alias Conduit.Blog.Commands.CreateAuthor
  alias Conduit.Blog.Projections.Author
  alias Conduit.{Repo,Router}

  @doc """
  Create an author.
  An author shares the same uuid as the user, but with a different prefix.
  """
  def create_author(%{user_uuid: uuid} = attrs) do
    create_author =
      attrs
      |> CreateAuthor.new()
      |> CreateAuthor.assign_uuid(uuid)

    with :ok <- Router.dispatch(create_author, consistency: :strong) do
      get(Author, uuid)
    else
      reply -> reply
    end
  end

  defp get(schema, uuid) do
    case Repo.get(schema, uuid) do
      nil -> {:error, :not_found}
      projection -> {:ok, projection}
    end
  end
end

The CreateAuthor command contains the author identity, associated user identity, and username fields:

# lib/conduit/blog/commands/create_author.ex
defmodule Conduit.Blog.Commands.CreateAuthor do
  defstruct [
    author_uuid: "",
    user_uuid: "",
    username: "",
  ]

  use ExConstructor
  use Vex.Struct

  alias Conduit.Blog.Commands.CreateAuthor

  validates :author_uuid, uuid: true

  validates :user_uuid, uuid: true

  validates :username,
    presence: [message: "can't be empty"],
    format: [with: ~r/^[a-z0-9]+$/, allow_nil: true, allow_blank: true, message: "is invalid"],
    string: true

  @doc """
  Assign a unique identity
  """
  def assign_uuid(%CreateAuthor{} = create_author, uuid) do
    %CreateAuthor{create_author | author_uuid: uuid}
  end
end

We use Commanded’s identity prefix feature to allow the user and author aggregates to share the same aggregate identity. In our router module we identify both aggregates by their respective field (author_uuid or user_uuid) and also provide the prefix option used to differentiate between the event streams used to store their domain events. Author aggregates are prefixed with “author-“ (e.g. author-53db6101-6725-4332-ba94-75b4d05213ab) and users by “user-“ (e.g. user-53db6101-6725-4332-ba94-75b4d05213ab). This allows an easy way of correlating an author with its associated user account, and vice versa.

# lib/conduit/router.ex
defmodule Conduit.Router do
  use Commanded.Commands.Router

  alias Conduit.Accounts.Aggregates.User
  alias Conduit.Accounts.Commands.RegisterUser
  alias Conduit.Blog.Aggregates.Author
  alias Conduit.Blog.Commands.CreateAuthor
  alias Conduit.Support.Middleware.{Uniqueness,Validate}

  middleware Validate
  middleware Uniqueness

  identify Author, by: :author_uuid, prefix: "author-"
  identify User, by: :user_uuid, prefix: "user-"

  dispatch [CreateAuthor], to: Author
  dispatch [RegisterUser], to: User
end

The author aggregate has a single execute/2 function to create an instance, returning an AuthorCreated event.

# lib/conduit/blog/aggregates/author.ex
defmodule Conduit.Blog.Aggregates.Author do
  defstruct [
    :uuid,
    :user_uuid,
    :username,
    :bio,
    :image,
  ]

  alias Conduit.Blog.Aggregates.Author
  alias Conduit.Blog.Commands.CreateAuthor
  alias Conduit.Blog.Events.AuthorCreated

  @doc """
  Creates an author
  """
  def execute(%Author{uuid: nil}, %CreateAuthor{} = create) do
    %AuthorCreated{
      author_uuid: create.author_uuid,
      user_uuid: create.user_uuid,
      username: create.username,
    }
  end

  # state mutators

  def apply(%Author{} = author, %AuthorCreated{} = created) do
    %Author{author |
      uuid: created.author_uuid,
      user_uuid: created.user_uuid,
      username: created.username,
    }
  end
end

We define an author projection (Ecto schema) and a migration to create the blog_authors table. The corresponding projector module is shown below:

# lib/conduit/blog/projectors/article.ex
defmodule Conduit.Blog.Projectors.Article do
  use Commanded.Projections.Ecto,
    name: "Blog.Projectors.Article",
    consistency: :strong

  alias Conduit.Blog.Events.AuthorCreated
  alias Conduit.Blog.Projections.Author
  alias Conduit.Repo

  project %AuthorCreated{} = author do
    Ecto.Multi.insert(multi, :author, %Author{
      uuid: author.author_uuid,
      user_uuid: author.user_uuid,
      username: author.username,
      bio: nil,
      image: nil,
    })
  end
end

The projector is named article projector as this will be used for projecting both authors and their published articles. We’ll see later why two projections are built using a single projector; it’s because we need to query an article’s author to copy their details into the article projection during publishing.

Finally, the article projector and create author workflow are included as supervised processes in a new blog supervisor:

# lib/conduit/blog/supervisor.ex
defmodule Conduit.Blog.Supervisor do
  use Supervisor

  alias Conduit.Blog

  def start_link do
    Supervisor.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_arg) do
    Supervisor.init([
      Blog.Projectors.Article,
      Blog.Workflows.CreateAuthorFromUser,
    ], strategy: :one_for_one)
  end
end

This supervisor is then added to the top level application supervisor in lib/conduit/application.ex.

With this chunk of work done we can execute our initial test to verify an author is successfully created in response to registering a user:

$ mix test test/conduit/blog/author_test.exs

By now you should be familiar with the test-driven development cycle we are following:

  1. Write failing integration tests.
  2. Build a web controller and define the public API required for the context.
  3. Implement the context public API, returning empty data.
  4. Write failing unit tests for the context.
  5. Build domain model (aggregate, commands, and events) to fulfil the context behaviour.
  6. Verify unit tests and integration tests pass.

This outside in approach helps to define the outcome we’re working towards in the integration test. Then guides us, step by step, to build the supporting code moving towards that goal.

Publish article integration test

We’ll begin with a controller integration test for the “happy path” of successfully publishing an article. A POST request to /api/articles should return a 201 response code with the article as JSON data:

# test/conduit_web/controllers/article_controller_test.exs
defmodule ConduitWeb.ArticleControllerTest do
  use ConduitWeb.ConnCase

  setup %{conn: conn} do
    {:ok, conn: put_req_header(conn, "accept", "application/json")}
  end

  describe "publish article" do
    @tag :web
    test "should create and return article when data is valid", %{conn: conn} do
      conn = post authenticated_conn(conn), article_path(conn, :create), article: build(:article)
      json = json_response(conn, 201)["article"]
      created_at = json["createdAt"]
      updated_at = json["updatedAt"]

      assert json == %{
        "slug" => "how-to-train-your-dragon",
        "title" => "How to train your dragon",
        "description" => "Ever wonder how?",
        "body" => "You have to believe",
        "tagList" => ["dragons", "training"],
        "createdAt" => created_at,
        "updatedAt" => updated_at,
        "favorited" => false,
        "favoritesCount" => 0,
        "author" => %{
          "username" => "jake",
          "bio" => nil,
          "image" => nil,
          "following" => false,
        }
      }
      refute created_at == ""
      refute updated_at == ""
    end
  end
end

The favorited and favoritesCount won’t be supported just yet, so we will fake it, until we make it and just return false and 0 respectively. We will return to build this functionality when we add the favourite articles feature.

Our test requires a new factory function, in test/support/factory.ex, to build the parameters for an article:

defmodule Conduit.Factory do
  use ExMachina

  def article_factory do
    %{
      slug: "how-to-train-your-dragon",
      title: "How to train your dragon",
      description: "Ever wonder how?",
      body: "You have to believe",
      tag_list: ["dragons", "training"],
      author_uuid: UUID.uuid4(),
    }
  end
end

The web controller test also makes use of a convenience function, authenticated_conn/1, to register a user and set their JWT token. This register a new user account and authenticates the request to be sent to the controller as the newly registered user:

# test/support/conn_helpers.ex
defmodule ConduitWeb.ConnHelpers do
  import Plug.Conn
  import Conduit.Fixture

  alias ConduitWeb.JWT

  def authenticated_conn(conn) do
    with {:ok, user} <- fixture(:user),
         {:ok, jwt} <- JWT.generate_jwt(user)
    do
      conn
      |> put_req_header("authorization", "Token " <> jwt)
    end
  end
end

Building the article controller

The integration test will initially fail because we have not yet configured a Phoenix route for the /api/articles path. We map this route to the article controller in lib/conduit_web/router.ex:

defmodule Conduit.Web.Router do
  use Conduit.Web, :router

  scope "/api", Conduit.Web do
    pipe_through :api

    post "/articles", ArticleController, :create
  end
end

Only authenticated users are allowed to publish articles. So we authenticate the request to the article controller using the two Guardian plugs Guardian.Plug.EnsureAuthenticated and Guardian.Plug.EnsureResource.

# lib/conduit_web/controllers/article_controller.ex
defmodule ConduitWeb.ArticleController do
  use ConduitWeb, :controller
  use Guardian.Phoenix.Controller

  alias Conduit.Blog
  alias Conduit.Blog.Projections.Article

  plug Guardian.Plug.EnsureAuthenticated, %{handler: ConduitWeb.ErrorHandler} when action in [:create]
  plug Guardian.Plug.EnsureResource, %{handler: ConduitWeb.ErrorHandler} when action in [:create]

  action_fallback ConduitWeb.FallbackController

  def create(conn, %{"article" => article_params}, user, _claims) do
    author = Blog.get_author!(user.uuid)

    with {:ok, %Article{} = article} <- Blog.publish_article(author, article_params) do
      conn
      |> put_status(:created)
      |> render("show.json", article: article)
    end
  end
end

The controller uses the Blog context to publish an article, using a new Blog.publish_article/2 function. It will follow our standard command dispatch pattern:

  1. Create a command from the user provided input parameters.
  2. Dispatch the command, thereby invoking its validation rules.
  3. Wait for the read model to be updated.
  4. Return the projected data from the read model.
defmodule Conduit.Blog do
  @doc """
  Publishes an article by the given author.
  """
  def publish_article(%Author{} = author, attrs \\ %{}) do
    uuid = UUID.uuid4()

    publish_article =
      attrs
      |> PublishArticle.new()
      |> PublishArticle.assign_uuid(uuid)
      |> PublishArticle.assign_author(author)
      |> PublishArticle.generate_url_slug()

    with :ok <- Router.dispatch(publish_article, consistency: :strong) do
      get(Article, uuid)
    else
      reply -> reply
    end
  end
end

You may notice that we assign the article’s author from the given user and must also generate a unique URL slug from the article title. These are important requirements for the feature, so we will write an integration test for the blog context and include tests to cover these.

# test/conduit/blog/blog_test.exs
defmodule Conduit.BlogTest do
  use Conduit.DataCase

  alias Conduit.Blog
  alias Conduit.Blog.Projections.Article

  describe "publish article" do
    setup [
      :create_author,
    ]

    @tag :integration
    test "should succeed with valid data", %{author: author} do
      assert {:ok, %Article{} = article} = Blog.publish_article(author, build(:article))

      assert article.slug == "how-to-train-your-dragon"
      assert article.title == "How to train your dragon"
      assert article.description == "Ever wonder how?"
      assert article.body == "You have to believe"
      assert article.tag_list == ["dragons", "training"]
      assert article.author_username == "jake"
      assert article.author_bio == nil
      assert article.author_image == nil
    end

    @tag :integration
    test "should generate unique URL slug", %{author: author} do
      assert {:ok, %Article{} = article1} = Blog.publish_article(author, build(:article))
      assert article1.slug == "how-to-train-your-dragon"

      assert {:ok, %Article{} = article2} = Blog.publish_article(author, build(:article))
      assert article2.slug == "how-to-train-your-dragon-2"
    end
  end

  defp create_author(_context) do
    {:ok, author} = fixture(:author)

    [author: author]
  end
end

The register_user/1 function is called before each test case. It provides a registered user to use within the tests, since a user is required to publish articles. The article parameters are built by reusing the factory function previously created for the article controller test.

Defining the publish article command

The publish article command, in lib/conduit/blog/commands/publish_article.ex, contains:

  1. A struct to hold the input data.
  2. Validation rules using the Vex library.
  3. Functions to assign the article unique identifier and its author.
  4. A function to generate a unique URL slug, using a separate Slugger module.
# lib/conduit/blog/commands/publish_article.ex
defmodule Conduit.Blog.Commands.PublishArticle do
  defstruct [
    article_uuid: "",
    author_uuid: "",
    slug: "",
    title: "",
    description: "",
    body: "",
    tag_list: [],
  ]

  use ExConstructor
  use Vex.Struct

  alias Conduit.Blog.Projections.Author
  alias Conduit.Blog.Slugger
  alias Conduit.Blog.Commands.PublishArticle

  validates :article_uuid, uuid: true

  validates :author_uuid, uuid: true

  validates :slug,
    presence: [message: "can't be empty"],
    format: [with: ~r/^[a-z0-9\-]+$/, allow_nil: true, allow_blank: true, message: "is invalid"],
    string: true,
    unique_article_slug: true

  validates :title, presence: [message: "can't be empty"], string: true

  validates :description, presence: [message: "can't be empty"], string: true

  validates :body, presence: [message: "can't be empty"], string: true

  validates :tag_list, by: &is_list/1

  @doc """
  Assign a unique identity
  """
  def assign_uuid(%PublishArticle{} = publish_article, uuid) do
    %PublishArticle{publish_article | article_uuid: uuid}
  end

  @doc """
  Assign the author
  """
  def assign_author(%PublishArticle{} = publish_article, %Author{uuid: uuid}) do
    %PublishArticle{publish_article | author_uuid: uuid}
  end

  @doc """
  Generate a unique URL slug from the article title
  """
  def generate_url_slug(%PublishArticle{title: title} = publish_article) do
    case Slugger.slugify(title) do
      {:ok, slug} -> %PublishArticle{publish_article | slug: slug}
      _ -> publish_article
    end
  end
end

defimpl Conduit.Support.Middleware.Uniqueness.UniqueFields, for: Conduit.Blog.Commands.PublishArticle do
  def unique(_command), do: [
    {:slug, "has already been taken"},
  ]
end

Generating a unique URL slug

A slug is part of the URL that identifies a page in human-readable keywords. As an example, given an article title of “Welcome to Conduit” the corresponding slug might be “welcome-to-conduit”.

We will use the Slugger package to generate a slug from an article title.

In mix.exs, add the slugger dependency:

defp deps do
  [
    # ...    
    {:slugger, "~> 0.2"},
  ]
end

Fetch and compile the mix dependencies:

$ mix do deps.get, deps.compile

One complication of URL slug generation is that each slug must be unique. A single slug can only be used one: two articles with the same title cannot share a slug.

We will wrap the slugger library with our own module, in lib/conduit/blog/slugger.ex. For each generated slug it will query the article read model to determine whether the slug has already been assigned. If it has, a suffix will be appended and retried. So “article” becomes “article-2”, “article-3”, “article-4”, and so on until an unclaimed slug is found.

# lib/conduit/blog/slugger.ex
defmodule Conduit.Blog.Slugger do
  alias Conduit.Blog

  @doc """
  Slugify the given text and ensure that it is unique.

  A slug will contain only alphanumeric characters (`a-z`, `0-9`) and the default separator character (`-`\
).

  If the generated slug is already taken, append a numeric suffix and keep incrementing until a unique slu\
g is found.

  ## Examples

    - "Example article" => "example-article", "example-article-2", "example-article-3", etc.
  """
  @spec slugify(String.t) :: {:ok, slug :: String.t} | {:error, reason :: term}
  def slugify(title) do
    title
    |> Slugger.slugify_downcase()
    |> ensure_unique_slug()
  end

  # Ensure the given slug is unique, if not increment the suffix and try again.
  defp ensure_unique_slug(slug, suffix \\ 1)
  defp ensure_unique_slug("", _suffix), do: ""
  defp ensure_unique_slug(slug, suffix) do
    suffixed_slug = suffixed(slug, suffix)

    case exists?(suffixed_slug) do
      true -> ensure_unique_slug(slug, suffix + 1)
      false -> {:ok, suffixed_slug}
    end
  end

  # Does the slug exist?
  defp exists?(slug) do
    case Blog.article_by_slug(slug) do
      nil -> false
      _ -> true
    end
  end

  defp suffixed(slug, 1), do: slug
  defp suffixed(slug, suffix), do: slug <> "-" <> to_string(suffix)
end

We need to provide a query to find an article by a slug:

# lib/conduit/blog/queries/article_by_slug.ex
defmodule Conduit.Blog.Queries.ArticleBySlug do
  import Ecto.Query

  alias Conduit.Blog.Projections.Article

  def new(slug) do
    from a in Article,
    where: a.slug == ^slug
  end
end

Made publicly available from the Blog context:

defmodule Conduit.Blog do
  @doc """
  Get an article by its URL slug, or return `nil` if not found.
  """
  def article_by_slug(slug) do
    slug
    |> String.downcase()
    |> ArticleBySlug.new()
    |> Repo.one()
  end
end

Finally, we add validation to the publish article command to ensure uniqueness:

# lib/conduit/blog/validators/unique_article_slug.ex
defmodule Conduit.Blog.Validators.UniqueArticleSlug do
  use Vex.Validator

  alias Conduit.Blog

  def validate(value, _options) do
    Vex.Validators.By.validate(value, [
      function: fn value -> !article_exists?(value) end,
      message: "has already been taken"
    ])
  end

  defp article_exists?(slug) do
    case Blog.article_by_slug(slug) do
      nil -> false
      _ -> true
    end
  end
end

Building the article aggregate

Publishing an article indicates that our domain model should comprise an article aggregate:

# lib/conduit/blog/aggregates/article.ex
defmodule Conduit.Blog.Aggregates.Article do
  defstruct [
    :uuid,
    :slug,
    :title,
    :description,
    :body,
    :tag_list,
    :author_uuid,
  ]

  alias Conduit.Blog.Aggregates.Article
  alias Conduit.Blog.Commands.PublishArticle
  alias Conduit.Blog.Events.ArticlePublished

  @doc """
  Publish an article
  """
  def execute(%Article{uuid: nil}, %PublishArticle{} = publish) do
    %ArticlePublished{
      article_uuid: publish.article_uuid,
      slug: publish.slug,
      title: publish.title,
      description: publish.description,
      body: publish.body,
      tag_list: publish.tag_list,
      author_uuid: publish.author_uuid,
    }
  end

  # state mutators

  def apply(%Article{} = article, %ArticlePublished{} = published) do
    %Article{article |
      uuid: published.article_uuid,
      slug: published.slug,
      title: published.title,
      description: published.description,
      body: published.body,
      tag_list: published.tag_list,
      author_uuid: published.author_uuid,
    }
  end
end

An aggregate may only reference other aggregates by their identity, not by reference, so we provide the author’s identity as part of the command (author_uuid). We don’t include the author’s username, or any other details, as the article does not need that information. It is only required in the read model. You will see an example of combining and denormalising data across aggregates in a read model projection when we build the article projector.

We create a unit test for the article aggregate to cover publishing an article:

# test/conduit/blog/aggregates/article_test.exs
defmodule Conduit.Blog.ArticleTest do
  use Conduit.AggregateCase, aggregate: Conduit.Blog.Aggregates.Article

  alias Conduit.Blog.Events.ArticlePublished

  describe "publish article" do
    @tag :unit
    test "should succeed when valid" do
      article_uuid = UUID.uuid4()
      author_uuid = UUID.uuid4()

      assert_events build(:publish_article, article_uuid: article_uuid, author_uuid: author_uuid), [
        %ArticlePublished{
          article_uuid: article_uuid,
          slug: "how-to-train-your-dragon",
          title: "How to train your dragon",
          description: "Ever wonder how?",
          body: "You have to believe",
          tag_list: ["dragons", "training"],
          author_uuid: author_uuid,
        }
      ]
    end
  end
end

The article published domain event defines a struct and uses the Poison JSON encoder:

# lib/conduit/blog/events/article_published.ex
defmodule Conduit.Blog.Events.ArticlePublished do
  @derive [Poison.Encoder]
  defstruct [
    :article_uuid,
    :author_uuid,
    :slug,
    :title,
    :description,
    :body,
    :tag_list,
  ]
end

Lastly, the publish article command is routed to the aggregate in lib/conduit/router.ex:

# lib/conduit/router.ex (diff)
defmodule Conduit.Router do
  alias Conduit.Blog.Aggregates.{Article,Author}
  alias Conduit.Blog.Commands.{CreateAuthor,PublishArticle}

  identify Article, by: :article_uuid, prefix: "article-"

  dispatch [PublishArticle], to: Article
end

Projecting the article read model

We’ve built the article domain model, handling writes, so let’s turn our attention to the read model projection.

In CQRS applications we aim to build read models that directly support the queries our application requires. The benefit of the separate read model is that we can have many views of our data, each perfectly suited to the query it was built for. We want performant reads, so we choose to denormalise data and minimise joins in the database.

Blog article read model

For the article read model we want to also include the author’s details. So the query becomes a simple SELECT from a single table, no joins needed. We define a database migration to create the blog_articles table, including the author:

# priv/repo/migrations/20170628134610_create_blog_article.exs
defmodule Conduit.Repo.Migrations.CreateConduit.Blog.Article do
  use Ecto.Migration

  def change do
    create table(:blog_articles, primary_key: false) do
      add :uuid, :uuid, primary_key: true
      add :slug, :text
      add :title, :text
      add :description, :text
      add :body, :text
      add :tag_list, {:array, :text}
      add :favorite_count, :integer
      add :published_at, :naive_datetime
      add :author_uuid, :uuid
      add :author_username, :text
      add :author_bio, :text
      add :author_image, :text

      timestamps()
    end

    create unique_index(:blog_articles, [:slug])
    create index(:blog_articles, [:author_uuid])
    create index(:blog_articles, [:author_username])
    create index(:blog_articles, [:published_at])
  end
end

Note we also take advantage of PostgreSQL and Ecto support for arrays for the tag_list column. We add indexes on the columns that will be used for querying to improve their performance.

The article read model defines the corresponding Ecto schema:

# lib/conduit/blog/projections/article.ex
defmodule Conduit.Blog.Projections.Article do
  use Ecto.Schema

  @primary_key {:uuid, :binary_id, autogenerate: false}

  schema "blog_articles" do
    field :slug, :string
    field :title, :string
    field :description, :string
    field :body, :string
    field :tag_list, {:array, :string}
    field :favorite_count, :integer, default: 0
    field :published_at, :naive_datetime
    field :author_uuid, :binary_id
    field :author_bio, :string
    field :author_image, :string
    field :author_username, :string

    timestamps()
  end
end
Blog author read model

We define a database migration to create the blog authors table:

# priv/repo/migrations/20170628162259_create_blog_author.exs
defmodule Conduit.Repo.Migrations.CreateConduit.Blog.Author do
  use Ecto.Migration

  def change do
    create table(:blog_authors, primary_key: false) do
      add :uuid, :uuid, primary_key: true
      add :user_uuid, :uuid
      add :username, :string
      add :bio, :string
      add :image, :string

      timestamps()
    end

    create unique_index(:blog_authors, [:user_uuid])
  end
end

A corresponding Ecto schema is built, containing the subset of the user details we’ll use for authors:

# lib/conduit/blog/projections/author.ex
defmodule Conduit.Blog.Projections.Author do
  use Ecto.Schema

  @primary_key {:uuid, :binary_id, autogenerate: false}

  schema "blog_authors" do
    field :user_uuid, :binary_id
    field :username, :string
    field :bio, :string
    field :image, :string

    timestamps()
  end
end
Projecting blog authors and articles

In the article projector we handle two domain events:

  1. UserRegistered to capture the author details.
  2. ArticlePublished to record each article.

We use Ecto.Multi.run/2 to lookup an author by their identity before creating the article read model.

# lib/conduit/blog/projectors/article.ex
defmodule Conduit.Blog.Projectors.Article do
  use Commanded.Projections.Ecto,
    name: "Blog.Projectors.Article",
    consistency: :strong

  alias Conduit.Blog.Events.{ArticlePublished,AuthorCreated}
  alias Conduit.Blog.Projections.{Article,Author}
  alias Conduit.Repo

  project %AuthorCreated{} = author do
    Ecto.Multi.insert(multi, :author, %Author{
      uuid: author.author_uuid,
      user_uuid: author.user_uuid,
      username: author.username,
      bio: nil,
      image: nil,
    })
  end

  project %ArticlePublished{} = published, %{created_at: published_at} do
    multi
    |> Ecto.Multi.run(:author, fn _changes -> get_author(published.author_uuid) end)
    |> Ecto.Multi.run(:article, fn %{author: author} ->
      article = %Article{
        uuid: published.article_uuid,
        slug: published.slug,
        title: published.title,
        description: published.description,
        body: published.body,
        tag_list: published.tag_list,
        favorite_count: 0,
        published_at: published_at,
        author_uuid: author.uuid,
        author_username: author.username,
        author_bio: author.bio,
        author_image: author.image,
      }

      Repo.insert(article)
    end)
  end

  defp get_author(uuid) do
    case Repo.get(Author, uuid) do
      nil -> {:error, :author_not_found}
      author -> {:ok, author}
    end
  end
end

A projector is guaranteed to handle events in the order they were published. Therefore we can be sure that, within the article projector, the author will have been created before they publish an article.

Publishing articles test

With the read model projection completed we can verify article publishing from the blog context by executing the tests.

$ mix test test/conduit/blog/blog_test.exs
Excluding tags: [:pending]
..
Finished in 2.1 seconds
2 tests, 0 failures

The final step is to confirm the article controller tests pass. Before doing so we must update the article view, responsible for formatting the data into the desired structure and returned as JSON data:

# lib/conduit_web/views/article_view.ex
defmodule ConduitWeb.ArticleView do
  use ConduitWeb, :view
  alias ConduitWeb.ArticleView

  def render("index.json", %{articles: articles}) do
    %{articles: render_many(articles, ArticleView, "article.json")}
  end

  def render("show.json", %{article: article}) do
    %{article: render_one(article, ArticleView, "article.json")}
  end

  def render("article.json", %{article: article}) do
    %{
      slug: article.slug,
      title: article.title,
      description: article.description,
      body: article.body,
      tagList: article.tag_list,
      createdAt: NaiveDateTime.to_iso8601(article.published_at),
      updatedAt: NaiveDateTime.to_iso8601(article.updated_at),
      favoritesCount: article.favorite_count,
      favorited: false,
      author: %{
        username: article.author_username,
        bio: article.author_bio,
        image: article.author_image,
        following: false,
      },
    }
  end
end

Then execute the article controller test:

$ mix test test/conduit_web/controllers/article_controller_test.exs
Excluding tags: [:pending]
.
Finished in 1.1 seconds
1 test, 0 failures

We’ve now successfully published an article, let’s move on to the queries we need to support.

Listing articles

Fetching and displaying articles is the principal feature of a blog. In Conduit we will support listing all articles and filtering by tag, favorited, and author. To support pagination, an offset and limit may be provided. By default, a GET /api/articles request returns the most recent articles globally.

The tag, author and favorited query parameter are used to filter results.

Filter by tag ?tag=AngularJS
Filter by author ?author=jake
Favorited by user ?favorited=jake
Limit number of articles (default is 20) ?limit=20
Offset/skip number of articles (default is 0) ?offset=0

Example response body:

{
  "articles":[{
    "slug": "how-to-train-your-dragon",
    "title": "How to train your dragon",
    "description": "Ever wonder how?",
    "body": "It takes a Jacobian",
    "tagList": ["dragons", "training"],
    "createdAt": "2016-02-18T03:22:56.637Z",
    "updatedAt": "2016-02-18T03:48:35.824Z",
    "favorited": false,
    "favoritesCount": 0,
    "author": {
      "username": "jake",
      "bio": "I work at statefarm",
      "image": "https://i.stack.imgur.com/xHWG8.jpg",
      "following": false
    }
  }, {

    "slug": "how-to-train-your-dragon-2",
    "title": "How to train your dragon 2",
    "description": "So toothless",
    "body": "It a dragon",
    "tagList": ["dragons", "training"],
    "createdAt": "2016-02-18T03:22:56.637Z",
    "updatedAt": "2016-02-18T03:48:35.824Z",
    "favorited": false,
    "favoritesCount": 0,
    "author": {
      "username": "jake",
      "bio": "I work at statefarm",
      "image": "https://i.stack.imgur.com/xHWG8.jpg",
      "following": false
    }
  }],
  "articlesCount": 2
}

List articles controller test

Once again our starting point when building a feature is to define an integration test that verifies the behaviour according to the above API spec. In this case, a GET request should return all published articles, ordered by published date with the most recent articles first.

The setup function makes use of two helpers to seed appropriate test data: register_user/1 and publish_articles/1.

defmodule ConduitWeb.ArticleControllerTest do
  use ConduitWeb.ConnCase

  setup %{conn: conn} do
    {:ok, conn: put_req_header(conn, "accept", "application/json")}
  end

  describe "list articles" do
    setup [
      :create_author,
      :publish_articles,
    ]

    @tag :web
    test "should return published articles by date published", %{conn: conn} do
      conn = get conn, article_path(conn, :index)
      json = json_response(conn, 200)
      articles = json["articles"]
      first_created_at = Enum.at(articles, 0)["createdAt"]
      first_updated_at = Enum.at(articles, 0)["updatedAt"]
      second_created_at = Enum.at(articles, 1)["createdAt"]
      second_updated_at = Enum.at(articles, 1)["updatedAt"]

      assert json == %{
        "articles" => [
          %{
            "slug" => "how-to-train-your-dragon-2",
            "title" => "How to train your dragon 2",
            "description" => "So toothless",
            "body" => "It a dragon",
            "tagList" => ["dragons", "training"],
            "createdAt" => first_created_at,
            "updatedAt" => first_updated_at,
            "favorited" => false,
            "favoritesCount" => 0,
            "author" => %{
              "username" => "jake",
              "bio" => nil,
              "image" => nil,
              "following" => false,
            }
          },
          %{
            "slug" => "how-to-train-your-dragon",
            "title" => "How to train your dragon",
            "description" => "Ever wonder how?",
            "body" => "You have to believe",
            "tagList" => ["dragons", "training"],
            "createdAt" => second_created_at,
            "updatedAt" => second_updated_at,
            "favorited" => false,
            "favoritesCount" => 0,
            "author" => %{
              "username" => "jake",
              "bio" => nil,
              "image" => nil,
              "following" => false,
            }
          },
        ],
        "articlesCount" => 2,
      }
    end
  end

  defp create_author(_context) do
    {:ok, author} = fixture(:author, user_uuid: UUID.uuid4())

    [
      author: author,
    ]
  end

  defp publish_articles(%{author: author}) do
    fixture(:article, author: author)
    fixture(:article, author: author, title: "How to train your dragon 2", description: "So toothless", bo\
dy: "It a dragon")

    []
  end
end

In the article controller, we add an index/4 function to query the latest articles from the given request params, and render the articles as JSON. We include the total count of articles matching the request query in addition to the subset of paginated articles returned.

def index(conn, params, _user, _claims) do
  {articles, total_count} = Blog.list_articles(params)
  render(conn, "index.json", articles: articles, total_count: total_count)
end

This route is mapped inside the /api scope of the web router:

scope "/api", Conduit.Web do
  pipe_through :api

  get "/articles", ArticleController, :index
end

The total count is included in the article view:

# lib/conduit_web/views/article_view.ex
defmodule ConduitWeb.ArticleView do
  use ConduitWeb, :view
  alias ConduitWeb.ArticleView

  def render("index.json", %{articles: articles, total_count: total_count}) do
    %{
      articles: render_many(articles, ArticleView, "article.json"),
      articlesCount: total_count,
    }
  end

  def render("show.json", %{article: article}) do
    %{article: render_one(article, ArticleView, "article.json")}
  end

  def render("article.json", %{article: article}) do
    %{
      slug: article.slug,
      title: article.title,
      description: article.description,
      body: article.body,
      tagList: article.tag_list,
      createdAt: NaiveDateTime.to_iso8601(article.published_at),
      updatedAt: NaiveDateTime.to_iso8601(article.updated_at),
      favoritesCount: article.favorite_count,
      favorited: false,
      author: %{
        username: article.author_username,
        bio: article.author_bio,
        image: article.author_image,
        following: false,
      },
    }
  end
end

Querying latest articles

The article controller depends upon a new function in the blog context: Blog.list_articles/1

It delegates the actual fetching of articles from the database to a new ListArticles query module.

# lib/conduit/blog/blog.ex (diff)
defmodule Conduit.Blog do
  alias Conduit.Blog.Queries.{ArticleBySlug,ListArticles}

  @doc """
  Returns most recent articles globally by default.

  Provide tag, author or favorited query parameter to filter results.
  """
  @spec list_articles(params :: map()) :: {articles :: list(Article.t), article_count :: non_neg_integer()}
  def list_articles(params \\ %{}) do
    ListArticles.paginate(params, Repo)
  end
end

Unlike previous queries, we’ll provide the query with the repo module allowing it to execute a request to the database. This is because we need to execute two queries:

  1. Find the articles matching the query, and return a subset of the request page (using limit and offset).
  2. Count the total number of articles matching the query.
Paginated articles

The entries/2 function includes the pagination, limit and offset, and orders the articles by their published date with the most recent articles first.

Article count

The count/1 function selects only the article’s uuid field and executes a database aggregation to count the rows.

The map of parameters is parsed into an Options struct using the ExConstructor library. This provides us with type checking on the available keys, and default values when not present in the user provided params.

# lib/conduit/blog/queries/list_articles.ex
defmodule Conduit.Blog.Queries.ListArticles do
  import Ecto.Query

  alias Conduit.Blog.Projections.Article

  defmodule Options do
    defstruct [
      limit: 20,
      offset: 0,
    ]

    use ExConstructor
  end

  def paginate(params, repo) do
    options = Options.new(params)

    articles = query() |> entries(options) |> repo.all()
    total_count = query() |> count() |> repo.aggregate(:count, :uuid)

    {articles, total_count}
  end

  defp query do
    from(a in Article)
  end

  defp entries(query, %Options{limit: limit, offset: offset}) do
    query
    |> order_by([a], desc: a.published_at)
    |> limit(^limit)
    |> offset(^offset)
  end

  defp count(query) do
    query |> select([:uuid])
  end
end

We’ll extend the blog test to include listing articles, ensuring that pagination is working as expected. Included in the test is a convenience function to publish multiple articles: publish_articles/1

defmodule Conduit.BlogTest do
  use Conduit.DataCase

  describe "list articles" do
    setup [
      :create_author,
      :publish_articles,
    ]

    @tag :integration
    test "should list articles by published date", %{articles: [article1, article2]} do
      assert {[article2, article1], 2} == Blog.list_articles()
    end

    @tag :integration
    test "should limit articles", %{articles: [_article1, article2]} do
      assert {[article2], 2} == Blog.list_articles(%{limit: 1})
    end

    @tag :integration
    test "should paginate articles", %{articles: [article1, _article2]} do
      assert {[article1], 2} == Blog.list_articles(%{offset: 1})
    end
  end

  defp publish_articles(%{user: user}) do
    {:ok, article1} = fixture(:article, author: user)
    {:ok, article2} = fixture(:article, author: user, title: "How to train your dragon 2", description: "S\
o toothless", body: "It a dragon")

    [
      articles: [article1, article2],
    ]
  end
end

Filter by author

Let’s start with filtering articles by their author. The test cases we’ll add cover the two scenarios where an author has, or has not, published any articles:

@tag :integration
test "should filter by author" do
  assert {[], 0} == Blog.list_articles(%{author: "unknown"})
end

@tag :integration
test "should filter by author returning only their articles", %{articles: [article1, article2]} do
  assert {[article2, article1], 2} == Blog.list_articles(%{author: "jake"})
end

For the test to pass we make a small change to the existing Ecto query, in Conduit.Blog.Queries.ListArticles, to add a WHERE clause that matches the author_username field with a given author value. The query is returned unmodified when the author is nil.

defp query(options) do
  from(a in Article)
  |> filter_by_author(options)
end

defp filter_by_author(query, %Options{author: nil}), do: query
defp filter_by_author(query, %Options{author: author}) do
  query |> where(author_username: ^author)
end

Populating the author field in the Options struct from the map of params from the request is handled by our use of ExConstructor (Options.new(params)).

Filter by tag

We’ve defined the tag_list field in the articles table as an array of text. We can use PostgreSQL’s built in support for searching inside arrays.

The SQL statement to query for a tag within an article’s tags array uses the ANY keyword:

SELECT * FROM blog_articles WHERE 'dragons' = ANY (tag_list);

This SQL is translated to the following Ecto query in our ListArticles module:

defp filter_by_tag(query, %Options{tag: nil}), do: query
defp filter_by_tag(query, %Options{tag: tag}) do
  from a in query,
  where: ^tag in a.tag_list
end

Unfortunately this is not a performant query to execute as it will require a sequential table scan.

We can analyse the query plan for our tag query as follows:

SET enable_seqscan = off;
EXPLAIN SELECT * FROM blog_articles WHERE 'dragons' = ANY (tag_list);

The result shows the query planner has chosen to execute a sequential scan on the blog_articles table:

Seq Scan on blog_articles  (cost=10000000000.00..10000000001.07 rows=1 width=316)
  Filter: ('dragons'::text = ANY (tag_list))

That’s bad news: our query will gradually degrade in performance over time as our blogging platform gains in popularity, encouraging more authors to publish their own articles. However, we can remedy this situation before it causes a problem in production by optimising the query.

Tagged articles table

One of the advantages of CQRS is that our read model can be built for the exact queries it must support. We could create a separate table for tagged articles and insert one entry for each tag assigned to a published article.

article_uuid tag author_username published_at
18e760d2-04f1-4da6-b27a-fca6d3ef1fa0 dragons jake 2017-07-28 12:00:00.000000
18e760d2-04f1-4da6-b27a-fca6d3ef1fa0 training jake 2017-07-28 12:00:00.000000
62acb90d-5ea3-4c0a-9145-2d397fc5750f cqrs ben 2017-07-30 14:00:00.000000

Using an index on the tag column would allow performant lookup.

Use PostgreSQL’s GIN index

We can take advantage of PostgreSQL’s GIN indexes, thereby we don’t need to build a separate table.

GIN indexes are inverted indexes which can handle values that contain more than one key, arrays for example.

To add a GIN index we create an Ecto database migration and specify the type of index via the :using option:

# priv/repo/migrations/20170719085120_add_index_to_blog_tags.exs
defmodule Conduit.Repo.Migrations.AddIndexToBlogTags do
  use Ecto.Migration

  def change do
    create index(:blog_articles, [:tag_list], using: "GIN")
  end
end

Then run the migration:

mix ecto.migrate

After adding the GIN index on the tag_list column we can execute the following query:

SELECT * FROM blog_articles WHERE tag_list @> '{dragons}';

The @> clause is used to match rows where the array contains the given value. It’s the same behaviour as the ANY query we initially wrote, but performs significantly better as it can use the index.

set enable_seqscan = off;
EXPLAIN SELECT * FROM blog_articles WHERE tag_list @> '{dragons}';

Now the query planner is taking advantage of our GIN index:

Bitmap Heap Scan on blog_articles  (cost=2.01..3.02 rows=1 width=316)
  Recheck Cond: (tag_list @> '{dragons}'::text[])
  ->  Bitmap Index Scan on blog_articles_tag_list_index  (cost=0.00..2.01 rows=1 width=0)
        Index Cond: (tag_list @> '{dragons}'::text[])

We now need to update the filter by tag query in the ListArticles module. We use Ecto’s fragment function to provide the exact SQL syntax required for the GIN array clause:

defp filter_by_tag(query, %Options{tag: nil}), do: query
defp filter_by_tag(query, %Options{tag: tag}) do
  from a in query,
  where: fragment("? @> ?", a.tag_list, [^tag])
end

Finally, we verify this query succeeds by adding two new integration tests to the blog test:

@tag :integration
test "should filter by tag returning only tagged articles", %{articles: [article1, _article2]} do
  assert {[article1], 1} == Blog.list_articles(%{tag: "believe"})
end

@tag :integration
test "should filter by tag" do
  assert {[], 0} == Blog.list_articles(%{tag: "unknown"})
end

Get an article

Now we can list and filter articles, the next feature is to get a single article by its unique URL slug. First up is an integration test that creates an author, publishes an article, and attempts to get the newly published article:

defmodule ConduitWeb.ArticleControllerTest do
  describe "get article" do
    setup [
      :create_author,
      :publish_article,
    ]

    @tag :web
    test "should return published article by slug", %{conn: conn} do
      conn = get conn, article_path(conn, :show, "how-to-train-your-dragon")
      json = json_response(conn, 200)
      article = json["article"]
      created_at = article["createdAt"]
      updated_at = article["updatedAt"]

      assert json == %{
        "article" => %{
          "slug" => "how-to-train-your-dragon",
          "title" => "How to train your dragon",
          "description" => "Ever wonder how?",
          "body" => "You have to believe",
          "tagList" => ["dragons", "training"],
          "createdAt" => created_at,
          "updatedAt" => updated_at,
          "favorited" => false,
          "favoritesCount" => 0,
          "author" => %{
            "username" => "jake",
            "bio" => nil,
            "image" => nil,
            "following" => false,
          }
        },
      }
    end
  end

  defp publish_article(%{author: author}) do
    {:ok, article} = fixture(:article, author: author)

    [
      article: article,
    ]
  end
end

The test fails, so let’s go and make the changes necessary for it to pass. We need to route the article slug URL in the Phoenix web router module:

defmodule ConduitWeb.Router do
  scope "/api", ConduitWeb do
    pipe_through :api

    get "/articles/:slug", ArticleController, :show
  end
end

This route is mapped to a new show/4 function in the ArticleController which retrieves the article by its unique slug and renders it as JSON:

# lib/conduit_web/controllers/article_controller.ex (diff)
defmodule ConduitWeb.ArticleController do
  def show(conn, %{"slug" => slug}, _user, _claims) do
    article = Blog.article_by_slug!(slug)
    render(conn, "show.json", article: article)
  end
end

Since we want to return a 404 HTTP error response when the article does not exist we add a Blog.article_by_slug!/1 function, note the ! suffix, which raises an error when the query returns nothing. This will be handled by the FallbackController to render the appropriate HTTP status.

defmodule Conduit.Blog do
  @doc """
  Get an article by its URL slug, or raise an `Ecto.NoResultsError` if not found
  """
  def article_by_slug!(slug),
    do: article_by_slug_query(slug) |> Repo.one!()

  defp article_by_slug_query(slug) do
    slug
    |> String.downcase()
    |> ArticleBySlug.new()
  end
end

That’s all we need to get an article from our API since most of the heavy lifting was already implemented by us earlier to support listing multiple articles.

Favorite articles

The API spec to favorite and unfavorite an article is as follows:

HTTP verb URL Required fields
POST /api/articles/:slug/favorite None
DELETE /api/articles/:slug/favorite None

There’s no request body or required fields since the URL contains the article being favorited, the HTTP verb informs us of the operation, and the authenticated user making the request is the person who’s favorite it is.

To implement this feature we’ll need to add two new commands, and associated domain events, one to favorite an article and another to unfavorite. When deciding which aggregate should be responsible for this behaviour we need to consider the invariants to be protected. In this example we’d like to ensure a user may only favorite an article once, and they may only unfavorite an article they have previously favourited. We can use the article aggregate to enforce these rules by having each article track who has favorited it.

Favorite integration test

There’s no surprises that we start building the favoriting feature by writing an integration test to specify the required behaviour. In the setup function defined in the favorite article controller test we seed an initial author, publish an article, and register a user. This user will be used to make the authenticated requests to favorite the published article.

# test/conduit_web/controllers/favorite_article_controller_test.exs
defmodule ConduitWeb.FavoriteArticleControllerTest do
  use ConduitWeb.ConnCase

  setup %{conn: conn} do
    {:ok, conn: put_req_header(conn, "accept", "application/json")}
  end

  describe "favorite article" do
    setup [
      :create_author,
      :publish_article,
      :register_user,
    ]

    @tag :web
    test "should be favorited and return article", %{conn: conn, user: user} do
      conn = post authenticated_conn(conn, user), favorite_article_path(conn, :create, "how-to-train-your-\
dragon")
      json = json_response(conn, 201)["article"]
      created_at = json["createdAt"]
      updated_at = json["updatedAt"]

      assert json == %{
        "slug" => "how-to-train-your-dragon",
        "title" => "How to train your dragon",
        "description" => "Ever wonder how?",
        "body" => "You have to believe",
        "tagList" => ["dragons", "training"],
        "createdAt" => created_at,
        "updatedAt" => updated_at,
        "favorited" => true,
        "favoritesCount" => 1,
        "author" => %{
          "username" => "jake",
          "bio" => nil,
          "image" => nil,
          "following" => false,
        }
      }
    end
  end

  describe "unfavorite article" do
    setup [
      :create_author,
      :publish_article,
      :register_user,
      :get_author,
      :favorite_article,
    ]

    @tag :web
    test "should be unfavorited and return article", %{conn: conn, user: user} do
      conn = delete authenticated_conn(conn, user), favorite_article_path(conn, :delete, "how-to-train-you\
r-dragon")
      json = json_response(conn, 201)["article"]
      created_at = json["createdAt"]
      updated_at = json["updatedAt"]

      assert json == %{
        "slug" => "how-to-train-your-dragon",
        "title" => "How to train your dragon",
        "description" => "Ever wonder how?",
        "body" => "You have to believe",
        "tagList" => ["dragons", "training"],
        "createdAt" => created_at,
        "updatedAt" => updated_at,
        "favorited" => false,
        "favoritesCount" => 0,
        "author" => %{
          "username" => "jake",
          "bio" => nil,
          "image" => nil,
          "following" => false,
        }
      }
    end
  end
end

You’ll notice that we have an assertion to check the favorited flag is correctly toggled and the favoritesCount is incremented and decremented when a user favorites or unfavorites an article. Running this test will immediately fail as we haven’t defined the favourite_article path in our Phoenix router nor built the FavoriteArticleController.

Article routing

We need to route POST and DELETE requests to the /api/articles/:slug/favorite URL. Both of these actions will require fetching the article by its slug. We could do this query in the controller, but Phoenix’s router allows us to define our own request handling pipeline and include any custom plug modules or functions. We’ll take advantage of this feature to define an :article pipeline with a plug module which attempts to load an article by the slug contained within the URL. This pipeline will be used for any requests matching the /api/articles/:slug path, including the new favorite article controller.

defmodule ConduitWeb.Router do
  alias ConduitWeb.Plugs

  pipeline :article do
    plug Plugs.LoadArticleBySlug
  end

  scope "/api", ConduitWeb do
    scope "/articles/:slug" do
      pipe_through :article

      post "/favorite", FavoriteArticleController, :create
      delete "/favorite", FavoriteArticleController, :delete
    end
  end
end

The LoadArticleBySlug plug extracts the slug from the request params and fetches the article from the database. The article is assigned to the connection, allowing it to be accessed later in a controllet action using %{assigns: %{article: article}}.

# lib/conduit_web/plugs/load_article_by_slug.ex
defmodule ConduitWeb.Plugs.LoadArticleBySlug do
  use Phoenix.Controller, namespace: ConduitWeb

  import Plug.Conn

  alias Conduit.Blog

  def init(opts), do: opts

  def call(%Plug.Conn{params: %{"slug" => slug}} = conn, _opts) do
    article = Blog.article_by_slug!(slug)

    assign(conn, :article, article)
  end
end

Favorite article controller

Let’s go ahead and build the favorite article controller. You’ll notice that we require an authenticated user for the create and delete controller actions. This allows us to lookup the author associated with the current user. It’s the author who will favorite, or unfavorite, an article. For both controller actions we return a JSON represenation of the article. As previously mentioned, the article has already been retrieved from the database and made available within the assigns map by the LoadArticleBySlug plug.

# lib/conduit_web/controllers/favorite_article_controller.ex
defmodule ConduitWeb.FavoriteArticleController do
  use ConduitWeb, :controller
  use Guardian.Phoenix.Controller

  alias Conduit.Blog
  alias Conduit.Blog.Projections.Article
  alias ConduitWeb.ArticleView

  plug Guardian.Plug.EnsureAuthenticated, %{handler: ConduitWeb.ErrorHandler} when action in [:create, :de\
lete]
  plug Guardian.Plug.EnsureResource, %{handler: ConduitWeb.ErrorHandler} when action in [:create, :delete]

  action_fallback ConduitWeb.FallbackController

  def create(%{assigns: %{article: article}} = conn, _params, user, _claims) do
    author = Blog.get_author!(user.uuid)

    with {:ok, %Article{} = article} <- Blog.favorite_article(article, author) do
      conn
      |> put_status(:created)
      |> render(ArticleView, "show.json", article: article)
    end
  end

  def delete(%{assigns: %{article: article}} = conn, _params, user, _claims) do
    author = Blog.get_author!(user.uuid)

    with {:ok, %Article{} = article} <- Blog.unfavorite_article(article, author) do
      conn
      |> put_status(:created)
      |> render(ArticleView, "show.json", article: article)
    end
  end
end

Favorite articles in Blog context

The FavoriteArticleController expects Blog.favorite_article/2 and Blog.unfavorite_article/2 functions to exist in the blog context, so we’ll add them to the public API exposed by Conduit.Blog. Both functions construct a command, dispatch it using the router, and then query the read model to return updated data. This helps us to identify the two new command we need to define: FavoriteArticle and UnfavoriteArticle.

defmodule Conduit.Blog do
  @doc """
  Favorite the article for an author
  """
  def favorite_article(%Article{uuid: article_uuid}, %Author{uuid: author_uuid}) do
    favorite_article = %FavoriteArticle{
      article_uuid: article_uuid,
      favorited_by_author_uuid: author_uuid,
    }

    with :ok <- Router.dispatch(favorite_article, consistency: :strong),
         {:ok, article} <- get(Article, article_uuid) do
      {:ok, %Article{article | favorited: true}}
    else
      reply -> reply
    end
  end

  @doc """
  Unfavorite the article for an author
  """
  def unfavorite_article(%Article{uuid: article_uuid}, %Author{uuid: author_uuid}) do
    unfavorite_article = %UnfavoriteArticle{
      article_uuid: article_uuid,
      unfavorited_by_author_uuid: author_uuid,
    }

    with :ok <- Router.dispatch(unfavorite_article, consistency: :strong),
         {:ok, article} <- get(Article, article_uuid) do
      {:ok, %Article{article | favorited: false}}
    else
      reply -> reply
    end
  end
end

You might have noticed we manually set the favorited flag on the Article schema and are wondering how does that work? The answer is that a virtual field has been added to the Ecto schema which allows us to set the value but it’s not backed by a column in the database. This is useful for fields used for setting transient values related to a single query. In this case it’s whether the author has favorited the article. We can manually set the value in these functions because we know what the actual value should be without requiring any further database interaction, such as querying for an author’s favorites. It’s a small performance optimisation.

# lib/conduit/blog/projections/article.ex (diff)
defmodule Conduit.Blog.Projections.Article do
    field :favorited, :boolean, virtual: true, default: false
end

Favorite commands and events

The favorite article command requires only the article and author identifiers, which are validated as UUIDs:

# lib/conduit/blog/commands/favorite_article.ex
defmodule Conduit.Blog.Commands.FavoriteArticle do
  defstruct [
    article_uuid: "",
    favorited_by_author_uuid: "",
  ]

  use ExConstructor
  use Vex.Struct

  validates :article_uuid, uuid: true
  validates :favorited_by_author_uuid, uuid: true
end

The article favorited domain event includes both identifiers, but also contains a favorite_count field. This is included by the article aggregate as a count of favorites so that it can be later projected into the read model.

# lib/conduit/blog/events/article_favorited.ex
defmodule Conduit.Blog.Events.ArticleFavorited do
  @derive [Poison.Encoder]
  defstruct [
    :article_uuid,
    :favorited_by_author_uuid,
    :favorite_count,
  ]
end

The unfavorite command and events follow the same pattern, so have been omitted.

Favorite article aggregate handling

As discussed earlier, the article must now track which authors and how many in total have favorited it. We add favorited_by_authors and favorite_count fields to the aggregate’s state.

When favoriting an article we must ensure the author hasn’t already favorited the article. This rule is checked by the is_favorited?/2 helper function which looks to see if the author is already in the favorited_by_authors MapSet. If the author’s identity is already present, the favorite article command returns nil. This allows the command to be idempotent; if an author requests to favorite an article they’ve already favorited then we can just ignore their request. The unfavorite command is handled similarly, except the check is to ensure the article has already been favorited by the author before unfavoriting.

We also ensure an author can only (un)favorite an existing article by pattern matching on the article’s identity field, using %Article{uuid: nil} in the execute/2 function. An error is returned when it’s nil, indicating an article that has never been published.

The favorite_count value is calculated and included in the events in the (un)favorite command handling functions. This allows the apply/2 state mutator functions to be logic free, they only need to copy the value from the event to the aggregate’s state. We want to ensure there’s minimal processing being done in the apply/2 functions.

defmodule Conduit.Blog.Aggregates.Article do
  defstruct [
    # ...
    favorited_by_authors: MapSet.new(),
    favorite_count: 0
  ]

  @doc """
  Favorite the article for an author
  """
  def execute(%Article{uuid: nil}, %FavoriteArticle{}), do: {:error, :article_not_found}
  def execute(
    %Article{uuid: uuid, favorite_count: favorite_count} = article,
    %FavoriteArticle{favorited_by_author_uuid: author_id})
  do
    case is_favorited?(article, author_id) do
      true -> nil
      false ->
        %ArticleFavorited{
          article_uuid: uuid,
          favorited_by_author_uuid: author_id,
          favorite_count: favorite_count + 1,
        }
    end
  end

  @doc """
  Unfavorite the article for the user
  """
  def execute(%Article{uuid: nil}, %UnfavoriteArticle{}), do: {:error, :article_not_found}
  def execute(
    %Article{uuid: uuid, favorite_count: favorite_count} = article,
    %UnfavoriteArticle{unfavorited_by_author_uuid: author_id})
  do
    case is_favorited?(article, author_id) do
      true ->
        %ArticleUnfavorited{
          article_uuid: uuid,
          unfavorited_by_author_uuid: author_id,
          favorite_count: favorite_count - 1,
        }
      false -> nil
    end
  end

  def apply(
    %Article{favorited_by_authors: favorited_by} = article,
    %ArticleFavorited{favorited_by_author_uuid: author_id, favorite_count: favorite_count})
  do
    %Article{article |
      favorited_by_authors: MapSet.put(favorited_by, author_id),
      favorite_count: favorite_count,
    }
  end

  def apply(
    %Article{favorited_by_authors: favorited_by} = article,
    %ArticleUnfavorited{unfavorited_by_author_uuid: author_id, favorite_count: favorite_count})
  do
    %Article{article |
      favorited_by_authors: MapSet.delete(favorited_by, author_id),
      favorite_count: favorite_count,
    }
  end

  # Is the article a favorite of the user?
  defp is_favorited?(%Article{favorited_by_authors: favorited_by}, user_uuid) do
    MapSet.member?(favorited_by, user_uuid)
  end
end

Unit testing favorites in the article aggregate

We can test the newly added behaviour to the article aggregate by extending the article unit test and reusing the assert_events/3 test helper function provided by the Conduit.AggregateCase ExUnit case template module. The assert events function takes a list of initial events, used to populate the aggregate’s state, a command to execute, and a list of expected events to be produced by the aggregate. To favorite an article we need to seed the aggregate with an article published event, produced by the factory function defined in Conduit.Factory.

# test/conduit/blog/aggregates/article_test.exs
defmodule Conduit.Blog.ArticleTest do
  use Conduit.AggregateCase, aggregate: Conduit.Blog.Aggregates.Article

  alias Conduit.Blog.Commands.{
    FavoriteArticle,
    UnfavoriteArticle
  }

  alias Conduit.Blog.Events.{
    ArticleFavorited,
    ArticleUnfavorited
  }

  describe "publish article" do
    @tag :unit
    test "should succeed when valid" do
      article_uuid = UUID.uuid4()
      author_uuid = UUID.uuid4()

      assert_events(
        build(:publish_article, article_uuid: article_uuid, author_uuid: author_uuid),
        [
          build(:article_published, article_uuid: article_uuid, author_uuid: author_uuid)
        ]
      )
    end
  end

  describe "favorite article" do
    @tag :unit
    test "should succeed when not already a favorite" do
      article_uuid = UUID.uuid4()
      author_uuid = UUID.uuid4()

      assert_events(
        build(:article_published, article_uuid: article_uuid, author_uuid: author_uuid),
        %FavoriteArticle{article_uuid: article_uuid, favorited_by_author_uuid: author_uuid},
        [
          %ArticleFavorited{
            article_uuid: article_uuid,
            favorited_by_author_uuid: author_uuid,
            favorite_count: 1
          }
        ]
      )
    end

    @tag :unit
    test "should ignore when already a favorite" do
      article_uuid = UUID.uuid4()
      author_uuid = UUID.uuid4()

      assert_events(
        build(:article_published, article_uuid: article_uuid, author_uuid: author_uuid),
        [
          %FavoriteArticle{article_uuid: article_uuid, favorited_by_author_uuid: author_uuid},
          %FavoriteArticle{article_uuid: article_uuid, favorited_by_author_uuid: author_uuid}
        ],
        []
      )
    end
  end

  describe "unfavorite article" do
    @tag :unit
    test "should succeed when a favorite" do
      article_uuid = UUID.uuid4()
      author_uuid = UUID.uuid4()

      assert_events(
        build(:article_published, article_uuid: article_uuid, author_uuid: author_uuid),
        [
          %FavoriteArticle{article_uuid: article_uuid, favorited_by_author_uuid: author_uuid},
          %UnfavoriteArticle{article_uuid: article_uuid, unfavorited_by_author_uuid: author_uuid}
        ],
        [
          %ArticleUnfavorited{
            article_uuid: article_uuid,
            unfavorited_by_author_uuid: author_uuid,
            favorite_count: 0
          }
        ]
      )
    end

    @tag :unit
    test "should ignore when not a favorite" do
      article_uuid = UUID.uuid4()
      author_uuid = UUID.uuid4()

      assert_events(
        build(:article_published, article_uuid: article_uuid, author_uuid: author_uuid),
        [
          %UnfavoriteArticle{article_uuid: article_uuid, unfavorited_by_author_uuid: author_uuid}
        ],
        []
      )
    end
  end
end

Routing favorite commands

With the article aggregate modified to handle the new favorite commands, we need to finish off by routing the commands in the Conduit.Router module:

# lib/conduit/router.ex (diff)
defmodule Conduit.Router do
  alias Conduit.Blog.Commands.{CreateAuthor,FavoriteArticle,PublishArticle,UnfavoriteArticle}

  dispatch [
    PublishArticle,
    FavoriteArticle,
    UnfavoriteArticle
  ], to: Article
end

Projecting favorite articles in the read model

We’re going to use a SQL join table to track favorited articles which we’ll later use to filter articles favorited by a user.

# lib/conduit/blog/favorited_article.ex
defmodule Conduit.Blog.Projections.FavoritedArticle do
  use Ecto.Schema

  @primary_key false

  schema "blog_favorited_articles" do
    field :article_uuid, :binary_id, primary_key: true
    field :favorited_by_author_uuid, :binary_id, primary_key: true

    timestamps()
  end
end

After creating and running a database migration to add the new blog_favorited_articles join table we can extend the article projector to handle the new favorite and unfavorite events. Whenever an article is favorited we insert a row into the join table, on unfavorite the row is deleted.

Aditionaly, we also update the article’s favorite_count field from the count included in the events. Remember that we try to keep our projection code as simple as possible, preferring to keep calculation logic in the domain model (our aggregates). We chain together the two Ecto.Multi operations which update the join table and articles table using Elixir’s pipeline syntax (|>).

# lib/conduit/blog/projectors/article.ex (diff)
defmodule Conduit.Blog.Projectors.Article do
  alias Conduit.Blog.Projections.{Article,Author,FavoritedArticle}
  alias Conduit.Blog.Events.{
    ArticleFavorited,
    ArticlePublished,
    ArticleUnfavorited,
    AuthorCreated,
  }

  @doc """
  Update favorite count when an article is favorited
  """
  project %ArticleFavorited{article_uuid: article_uuid, favorited_by_author_uuid: favorited_by_author_uuid\
, favorite_count: favorite_count} do
    multi
    |> Ecto.Multi.insert(:favorited_article, %FavoritedArticle{article_uuid: article_uuid, favorited_by_au\
thor_uuid: favorited_by_author_uuid})
    |> Ecto.Multi.update_all(:article, article_query(article_uuid), set: [
      favorite_count: favorite_count,
    ])
  end

  @doc """
  Update favorite count when an article is unfavorited
  """
  project %ArticleUnfavorited{article_uuid: article_uuid, unfavorited_by_author_uuid: unfavorited_by_autho\
r_uuid, favorite_count: favorite_count} do
    multi
    |> Ecto.Multi.delete_all(:favorited_article, favorited_article_query(article_uuid, unfavorited_by_auth\
or_uuid))
    |> Ecto.Multi.update_all(:article, article_query(article_uuid), set: [
      favorite_count: favorite_count,
    ])
  end

  defp article_query(article_uuid) do
    from(a in Article, where: a.uuid == ^article_uuid)
  end

  defp favorited_article_query(article_uuid, author_uuid) do
    from(f in FavoritedArticle, where: f.article_uuid == ^article_uuid and f.favorited_by_author_uuid == ^\
author_uuid)
  end
end

Favorite articles test

With the read model projection complete we can now run our favorite article tests and should see them pass, which they do. However we’re not quite finished yet. Remember when we favorite, or unfavorite, an article we manually set the favorited field depending upon what the expected outcome would be. This works fine for this use case, but if we later view all articles the field isn’t being set and defaults to false.

We can demonstrate this problem with the following integration test which favorites a published article and then immediately requests all articles.

describe "list articles including favorited" do
  setup [
    :create_author,
    :publish_articles,
    :register_user,
    :get_author,
    :favorite_article,
  ]

  @tag :web
  test "should return published articles by date published", %{conn: conn, user: user} do
    conn = get authenticated_conn(conn, user), article_path(conn, :index)
    json = json_response(conn, 200)
    articles = json["articles"]
    first_created_at = Enum.at(articles, 0)["createdAt"]
    first_updated_at = Enum.at(articles, 0)["updatedAt"]
    second_created_at = Enum.at(articles, 1)["createdAt"]
    second_updated_at = Enum.at(articles, 1)["updatedAt"]

    assert json == %{
      "articles" => [
        %{
          "slug" => "how-to-train-your-dragon-2",
          "title" => "How to train your dragon 2",
          "description" => "So toothless",
          "body" => "It a dragon",
          "tagList" => ["dragons", "training"],
          "createdAt" => first_created_at,
          "updatedAt" => first_updated_at,
          "favorited" => false,
          "favoritesCount" => 0,
          "author" => %{
            "username" => "jake",
            "bio" => nil,
            "image" => nil,
            "following" => false,
          }
        },
        %{
          "slug" => "how-to-train-your-dragon",
          "title" => "How to train your dragon",
          "description" => "Ever wonder how?",
          "body" => "You have to believe",
          "tagList" => ["dragons", "training", "believe"],
          "createdAt" => second_created_at,
          "updatedAt" => second_updated_at,
          "favorited" => true,
          "favoritesCount" => 1,
          "author" => %{
            "username" => "jake",
            "bio" => nil,
            "image" => nil,
            "following" => false,
          }
        },
      ],
      "articlesCount" => 2,
    }
  end
end

Our expectation is the favorited flag will be true for the article we just favorited, but running the test shows that it’s false.

1) test list articles including favorited should return published articles by date published (ConduitWeb.A\
rticleControllerTest)
     test/conduit_web/controllers/article_controller_test.exs:106

Determining whether an article is favorited or not requires us to look in the blog_favorited_articles join table. When a row exists for the article and author it’s a favorite, otherwise it’s not. To implement this conditional checking we modify the Conduit.Blog.Queries.ListArticles module and extend the query. A left join is used to compare whether a field from the joined table is not nil (not is_nil indicating no row exists) to set the virtual favorited flag.

defp include_favorited_by_author(query, nil), do: query
defp include_favorited_by_author(query, %Author{uuid: author_uuid}) do
  from a in query,
  left_join: f in FavoritedArticle, on: [article_uuid: a.uuid, favorited_by_author_uuid: ^author_uuid],
  select: %{a | favorited: not is_nil(f.article_uuid)}
end

Now we’re able to run the full test suite and see it passing.

Filter by favorite articles

One reason why an author would want to favorite an article is for bookmarking, allowing them to later view all of their favorites, or to view another author’s favorites.

Let’s begin by creating two integration tests to assert our desired behaviour. One test will ensure that we can view a user’s favorites, a second is used to check that no articles are returned for a user who has no favorites.

describe "list articles favorited by user" do
  setup [
    :create_author,
    :publish_articles,
    :favorite_article,
  ]

  @tag :integration
  test "should filter by favorited by user", %{articles: [article1, _article2]} do
    {articles, total_count} = Blog.list_articles(%{favorited: "jake"})

    assert articles == [%Article{article1 | favorited: false}]
    assert total_count == 1
  end

  @tag :integration
  test "should filter by favorited by user without favorites" do
    assert {[], 0} == Blog.list_articles(%{favorited: "anotheruser"})
  end
end

To implement this feature we’ll need to extend the query in the Conduit.Blog.Queries.ListArticles module to accept an optional favorited username. We’re using ExConstructor for the Options struct, so we only need to add a new favorited field and it will automatically be populated from the HTTP request query string (such as ?favorited=jake).

Unfortunately we cannot filter by an author’s username just yet because our blog_favorited_articles join table only includes the article and author identities. We must add the username to the table, as favorited_by_username, and populate this field in the article projector from the FavoritedArticle (which luckily already includes the username).

With the favorited_by_username field now being set we can join onto the blog_favorited_articles table using the requested username to only return those articles that have been favorited by that person.

defp filter_by_favorited_by_user(query, %Options{favorited: nil}), do: query
defp filter_by_favorited_by_user(query, %Options{favorited: favorited}) do
  from a in query,
  join: f in FavoritedArticle, on: [article_uuid: a.uuid, favorited_by_username: ^favorited]
end

Run the tests again to see that they pass. Now we’re finally done with favoriting articles.

With the article features implemented we take a short detour in the next chapter on tags to look at how CQRS can benefit our application by using a denormalized read model projection.

Tags

Listing tags

A GET /api/tags request returns a list of tags.

Example response body:

{
  "tags": [
    "reactjs",
    "angularjs"
  ]
}

Returning a list of tags applied to published articles isn’t difficult, but does illustrate how separating reads and writes in a CQRS application can be used to build multiple views from the same source of truth (an application’s domain events).

In a CRUD application the list of tags might be built by querying all published articles and finding the distinct tags. This query will likely degrade in performance over time as the number of published articles increases. To counteract this problem developers might choose to use a dual-write approach when publishing an article by updating the article and inserting the tag into a separate tags table. This negatively affects write latency but improves query efficiency when reading. The trade-off chosen here helps typical applications which encounter far more reads than writes.

Using CQRS we can take advantage of fast writes to the event store, with low latency, and use a read model projection to support whatever view of the is data needed for performant querying, such as by denormalizing data. We get the best of both worlds.

The ArticlePublished event already includes the tag_list field containing the list of tags for the published article. We’ll project the tags from the event into a separate tags table containing the unique tag names.

Let’s begin with a controller integration test for the new GET /api/tags endpoint and assert that the returned tags match those included in a published article.

# test/conduit_web/controllers/tag_controller_test.exs
defmodule ConduitWeb.TagControllerTest do
  use ConduitWeb.ConnCase

  setup %{conn: conn} do
    {:ok, conn: put_req_header(conn, "accept", "application/json")}
  end

  describe "list tags" do
    setup [
      :create_author,
      :publish_article,
    ]

    @tag :web
    test "should return all tags", %{conn: conn} do
      conn = get conn, tag_path(conn, :index)
      json = json_response(conn, 200)

      assert json == %{
        "tags" => [
          "dragons",
          "training",
        ]
      }
    end
  end
end

The test will fail as the route isn’t configured and the controller doesn’t exist. Following the outside-in approach we start by adding a new TagController module:

# lib/conduit_web/controllers/tag_controller.ex
defmodule ConduitWeb.TagController do
  use ConduitWeb, :controller

  alias Conduit.Blog

  action_fallback ConduitWeb.FallbackController

  def index(conn, _params) do
    tags = Blog.list_tags()
    render(conn, "index.json", tags: tags)
  end
end

Then route the /api/tags path to the controller:

# lib/conduit_web/router.ex (diff)
defmodule ConduitWeb.Router do
    get "/tags", TagController, :index
end

Our controller is using a new Blog.list_tags/1 function which we need to define as a query to find all tags and return only their name:

# lib/conduit/blog/blog.ex (diff)
defmodule Conduit.Blog do
  alias Conduit.Blog.Queries.{ArticleBySlug,ListArticles,ListTags}

  @doc """
  List all tags.
  """
  def list_tags do
    ListTags.new() |> Repo.all() |> Enum.map(&(&1.name))
  end
end

The ListTags query module is a very simple Ecto query to find all tags, ordered by name:

# lib/conduit/blog/queries/list_tags.ex
defmodule Conduit.Blog.Queries.ListTags do
  import Ecto.Query

  alias Conduit.Blog.Projections.Tag

  def new do
    from t in Tag,
    order_by: t.name
  end
end

Now let’s define our data model to store the global tags. We only record the tag’s name and use an autogenerated field as its identity.

# lib/conduit/blog/projections/tag.ex
defmodule Conduit.Blog.Projections.Tag do
  use Ecto.Schema

  @primary_key {:uuid, :binary_id, autogenerate: true}

  schema "blog_tags" do
    field :name, :string

    timestamps()
  end
end

The database migration to create the new blog_tags also includes a unique index on the name column. This ensures that tags must be unique, we cannot have any duplicates.

# priv/repo/migrations/20170719200152_create_blog_tag.exs
defmodule Conduit.Repo.Migrations.CreateBlogTag do
  use Ecto.Migration

  def change do
    create table(:blog_tags, primary_key: false) do
      add :uuid, :uuid, primary_key: true
      add :name, :string

      timestamps()
    end

    create unique_index(:blog_tags, [:name])
  end
end

Projecting tags into the read model

Projecting the list of tags from the article published event into the new tags table is the most complex addition for this feature. We’ll use Ecto’s conflict_target and on_conflict options to handle inserts of existing tags (enforced by the unique index on :name we included in the migration above). This approach simplifies the projection code as we don’t need to query first to check whether the tag already exists. We let the database uniqueness constraint take care of the invariant and ignore any name conflicts. We also need to use Enum.reduce/3 to include an insert operation for each tag in the tag list.

# lib/conduit/blog/projectors/tag.ex
defmodule Conduit.Blog.Projectors.Tag do
  use Commanded.Projections.Ecto, name: "Blog.Projectors.Tag"

  alias Conduit.Blog.Projections.Tag
  alias Conduit.Blog.Events.ArticlePublished

  project %ArticlePublished{tag_list: tag_list} do
    Enum.reduce(tag_list, multi, fn (tag, multi) ->
      Ecto.Multi.insert(multi, "tag-#{tag}", %Tag{name: tag},
        on_conflict: :nothing,
        conflict_target: :name)
    end)
  end
end

The Blog.Projectors.Tag projector module is added to Conduit.Blog.Supervisor to ensure it is started with the application.

The tag controller test will now pass and this small feature is complete. We’ve added a global tag list of all published articles without impacting the performance of publishing articles and have implemented an efficient tag query.

Frequently asked questions

How do I structure my CQRS/ES application?

Application structure is described in the Accounts chapter.

How do I deal with eventually consistent read model projections?

Dealing with eventual consistency is explained in the Accounts chapter.

Appendix I

Conduit API specs

Authentication header

Authorization: Token jwt.token.here

JSON objects returned by API

User

Used for authentication.

{
  "user": {
    "email": "jake@jake.jake",
    "token": "jwt.token.here",
    "username": "jake",
    "bio": "I work at statefarm",
    "image": null
  }
}

Profile

{
  "profile": {
    "username": "jake",
    "bio": "I work at statefarm",
    "image": "https://static.productionready.io/images/smiley-cyrus.jpg",
    "following": false
  }
}

Single article

{
  "article": {
    "slug": "how-to-train-your-dragon",
    "title": "How to train your dragon",
    "description": "Ever wonder how?",
    "body": "It takes a Jacobian",
    "tagList": ["dragons", "training"],
    "createdAt": "2016-02-18T03:22:56.637Z",
    "updatedAt": "2016-02-18T03:48:35.824Z",
    "favorited": false,
    "favoritesCount": 0,
    "author": {
      "username": "jake",
      "bio": "I work at statefarm",
      "image": "https://i.stack.imgur.com/xHWG8.jpg",
      "following": false
    }
  }
}

Multiple articles

{
  "articles":[{
    "slug": "how-to-train-your-dragon",
    "title": "How to train your dragon",
    "description": "Ever wonder how?",
    "body": "It takes a Jacobian",
    "tagList": ["dragons", "training"],
    "createdAt": "2016-02-18T03:22:56.637Z",
    "updatedAt": "2016-02-18T03:48:35.824Z",
    "favorited": false,
    "favoritesCount": 0,
    "author": {
      "username": "jake",
      "bio": "I work at statefarm",
      "image": "https://i.stack.imgur.com/xHWG8.jpg",
      "following": false
    }
  }, {

    "slug": "how-to-train-your-dragon-2",
    "title": "How to train your dragon 2",
    "description": "So toothless",
    "body": "It a dragon",
    "tagList": ["dragons", "training"],
    "createdAt": "2016-02-18T03:22:56.637Z",
    "updatedAt": "2016-02-18T03:48:35.824Z",
    "favorited": false,
    "favoritesCount": 0,
    "author": {
      "username": "jake",
      "bio": "I work at statefarm",
      "image": "https://i.stack.imgur.com/xHWG8.jpg",
      "following": false
    }
  }],
  "articlesCount": 2
}

Single comment

{
  "comment": {
    "id": 1,
    "createdAt": "2016-02-18T03:22:56.637Z",
    "updatedAt": "2016-02-18T03:22:56.637Z",
    "body": "It takes a Jacobian",
    "author": {
      "username": "jake",
      "bio": "I work at statefarm",
      "image": "https://i.stack.imgur.com/xHWG8.jpg",
      "following": false
    }
  }
}

Multiple comments

{
  "comments": [{
    "id": 1,
    "createdAt": "2016-02-18T03:22:56.637Z",
    "updatedAt": "2016-02-18T03:22:56.637Z",
    "body": "It takes a Jacobian",
    "author": {
      "username": "jake",
      "bio": "I work at statefarm",
      "image": "https://i.stack.imgur.com/xHWG8.jpg",
      "following": false
    }
  }]
}

List of tags

{
  "tags": [
    "reactjs",
    "angularjs"
  ]
}

Errors and status codes

If a request fails any validations, expect a 422 and errors in the following format:

{
  "errors":{
    "body": [
      "can't be empty"
    ]
  }
}
Other status codes
401 Unauthorized requests, when a request requires authentication but it isn’t provided.
403 Forbidden requests, when a request may be valid but the user doesn’t have permissions to perform the action.
404 Not found requests, when a resource can’t be found to fulfill the request.

Endpoints

Authentication

POST /api/users/login

Example request body: JSON { "user":{ "email": "jake@jake.jake", "password": "jakejake" } }

No authentication required, returns a user.

Required fields: email, password

Registration

POST /api/users

Example request body: JSON { "user":{ "username": "Jacob", "email": "jake@jake.jake", "password": "jakejake" } }

No authentication required, returns a user.

Required fields: email, username, password

Get current user

GET /api/user

Authentication required, returns a user that’s the current user

Update user

PUT /api/user

Example request body: JSON { "user":{ "email": "jake@jake.jake", "bio": "I like to skateboard", "image": "https://i.stack.imgur.com/xHWG8.jpg" } }

Authentication required, returns the user.

Accepted fields: email, username, password, image, bio

Get profile

GET /api/profiles/:username

Authentication optional, returns a profile.

Follow user

POST /api/profiles/:username/follow

Authentication required, returns profile.

No additional parameters required

Unfollow user

DELETE /api/profiles/:username/follow

Authentication required, returns profile.

No additional parameters required

List articles

GET /api/articles

Returns most recent articles globally by default.

Query parameters

Provide tag, author or favorited query parameter to filter results.

Filter by tag ?tag=AngularJS
Filter by author ?author=jake
Favorited by user ?favorited=jake
Limit number of articles (default is 20) ?limit=20
Offset/skip number of articles (default is 0) ?offset=0

Authentication optional, will return multiple articles, ordered by most recent first.

Feed articles

GET /api/articles/feed

Can also take limit and offset query parameters like list articles.

Authentication required, will return multiple articles created by followed users, ordered by most recent first.

Get article

GET /api/articles/:slug

No authentication required, will return single article.

Create Article

POST /api/articles

Example request body:

{
  "article": {
    "title": "How to train your dragon",
    "description": "Ever wonder how?",
    "body": "You have to believe",
    "tagList": ["reactjs", "angularjs", "dragons"]
  }
}

Authentication required, will return an article.

Required fields: title, description, body

Optional fields: tagList as an array of Strings

Update Article

PUT /api/articles/:slug

Example request body:

{
  "article": {
    "title": "Did you train your dragon?"
  }
}

Authentication required, returns the updated article.

Optional fields: title, description, body

The slug also gets updated when the title is changed.

Delete article

DELETE /api/articles/:slug

Authentication required

Add comments to an article

POST /api/articles/:slug/comments

Example request body:

{
  "comment": {
    "body": "His name was my name too."
  }
}

Authentication required, returns the created comment.

Required fields: body

Get comments from an article

GET /api/articles/:slug/comments

Authentication optional, returns multiple comments.

Delete comment

DELETE /api/articles/:slug/comments/:id

Authentication required

Favourite article

POST /api/articles/:slug/favorite

Authentication required, returns the article.

No additional parameters required

Unfavourite article

DELETE /api/articles/:slug/favorite

Authentication required, returns the article.

No additional parameters required

Get tags

GET /api/tags

No authentication required, returns a list of tags.

Notes

1CRUD is an abbreviation of Create, Read, Update, and Delete.

2A pure function always evaluates the same result value given the same argument value.

3JSON Web Tokens are an open, industry standard method for representing claims securely between two parties.

4Putting contexts in context

5Contexts in Phoenix v1.3

6Commanded is MIT licensed, a permissive free software license. This allows reuse within proprietary software, and for commercial purposes.

7A universally unique identifier (UUID) is a 128-bit number used to identify information in computer systems.

8Domain specific language

9Regular expression, regex or regexp, is a sequence of characters that define a search pattern in a string.

10bcrypt is a password hashing function designed by Niels Provos and David Mazières, based on the Blowfish cipher.