3. Application Design

In this chapter we will write the business logic and tests for a purely functional server application. The source code for this application is included under the example directory along with the book’s source, however it is recommended not to read the source code until the final chapter as there will be significant refactors as we learn more about FP.

3.1 Specification

Our application will manage a just-in-time build farm on a shoestring budget. It will listen to a Drone Continuous Integration server, and spawn worker agents using Google Container Engine (GKE) to meet the demand of the work queue.

Drone receives work when a contributor submits a github pull request to a managed project. Drone assigns the work to its agents, each processing one job at a time.

The goal of our app is to ensure that there are enough agents to complete the work, with a cap on the number of agents, whilst minimising the total cost. Our app needs to know the number of items in the backlog and the number of available agents.

Google can spawn nodes, each can host multiple drone agents. When an agent starts up, it registers itself with drone and drone takes care of the lifecycle (including keep-alive calls to detect removed agents).

GKE charges a fee per minute of uptime, rounded up to the nearest hour for each node. One does not simply spawn a new node for each job in the work queue, we must re-use nodes and retain them until their 59th minute to get the most value for money.

Our app needs to be able to start and stop nodes, as well as check their status (e.g. uptimes, list of inactive nodes) and to know what time GKE believes it to be.

In addition, there is no API to talk directly to an agent so we do not know if any individual agent is performing any work for the drone server. If we accidentally stop an agent whilst it is performing work, it is inconvenient and requires a human to restart the job.

Contributors can manually add agents to the farm, so counting agents and nodes is not equivalent. We don’t need to supply any nodes if there are agents available.

The failure mode should always be to take the least costly option.

Both Drone and GKE have a JSON over REST API with OAuth 2.0 authentication.

3.2 Interfaces / Algebras

Let’s codify the architecture diagram from the previous section. Firstly, we need a need a simple data type to capture a millisecond timestamp because such a simple thing does not exist in either the Java or Scala standard libraries:

  import scala.concurrent.duration._
  
  final case class Epoch(millis: Long) extends AnyVal {
    def +(d: FiniteDuration): Epoch = Epoch(millis + d.toMillis)
    def -(e: Epoch): FiniteDuration = (millis - e.millis).millis
  }

In FP, an algebra takes the place of an interface in Java, or the set of valid messages for an Actor in Akka. This is the layer where we define all side-effecting interactions of our system.

There is tight iteration between writing the business logic and the algebra: it is a good level of abstraction to design a system.

  package algebra
  
  import scalaz.NonEmptyList
  
  trait Drone[F[_]] {
    def getBacklog: F[Int]
    def getAgents: F[Int]
  }
  
  final case class MachineNode(id: String)
  trait Machines[F[_]] {
    def getTime: F[Epoch]
    def getManaged: F[NonEmptyList[MachineNode]]
    def getAlive: F[Map[MachineNode, Epoch]]
    def start(node: MachineNode): F[MachineNode]
    def stop(node: MachineNode): F[MachineNode]
  }

We’ve used NonEmptyList, easily created by calling .toNel on the stdlib’s List (returning an Option[NonEmptyList]), otherwise everything should be familiar.

3.3 Business Logic

Now we write the business logic that defines the application’s behaviour, considering only the happy path.

First, the imports

  package logic
  
  import scalaz._, Scalaz._
  import algebra._

We need a WorldView class to hold a snapshot of our knowledge of the world. If we were designing this application in Akka, WorldView would probably be a var in a stateful Actor.

WorldView aggregates the return values of all the methods in the algebras, and adds a pending field to track unfulfilled requests.

  final case class WorldView(
    backlog: Int,
    agents: Int,
    managed: NonEmptyList[MachineNode],
    alive: Map[MachineNode, Epoch],
    pending: Map[MachineNode, Epoch],
    time: Epoch
  )

Now we are ready to write our business logic, but we need to indicate that we depend on Drone and Machines.

We can write the interface for the business logic

  trait DynAgents[F[_]] {
    def initial: F[WorldView]
    def update(old: WorldView): F[WorldView]
    def act(world: WorldView): F[WorldView]
  }

and implement it with a module. A module depends only on other modules, algebras and pure functions, and can be abstracted over F. If an implementation of an algebraic interface is tied to a specific type, e.g. IO, it is called an interpreter.

  final class DynAgentsModule[F[_]: Monad](D: Drone[F], M: Machines[F])
    extends DynAgents[F] {

The Monad context bound means that F is monadic, allowing us to use map, pure and, of course, flatMap via for comprehensions.

We have access to the algebra of Drone and Machines as D and M, respectively. Using a single capital letter name is a common naming convention for monad and algebra implementations.

Our business logic will run in an infinite loop (pseudocode)

  state = initial()
  while True:
    state = update(state)
    state = act(state)

We must write three functions: initial, update and act, all returning an F[WorldView].

3.3.1 initial

In initial we call all external services and aggregate their results into a WorldView. We default the pending field to an empty Map.

  def initial: F[WorldView] = for {
    db <- D.getBacklog
    da <- D.getAgents
    mm <- M.getManaged
    ma <- M.getAlive
    mt <- M.getTime
  } yield WorldView(db, da, mm, ma, Map.empty, mt)

Recall from Chapter 1 that flatMap (i.e. when we use the <- generator) allows us to operate on a value that is computed at runtime. When we return an F[_] we are returning another program to be interpreted at runtime, that we can then flatMap. This is how we safely chain together sequential side-effecting code, whilst being able to provide a pure implementation for tests. FP could be described as Extreme Mocking.

3.3.2 update

update should call initial to refresh our world view, preserving known pending actions.

If a node has changed state, we remove it from pending and if a pending action is taking longer than 10 minutes to do anything, we assume that it failed and forget that we asked to do it.

  def update(old: WorldView): F[WorldView] = for {
    snap <- initial
    changed = symdiff(old.alive.keySet, snap.alive.keySet)
    pending = (old.pending -- changed).filterNot {
      case (_, started) => (snap.time - started) >= 10.minutes
    }
    update = snap.copy(pending = pending)
  } yield update
  
  private def symdiff[T](a: Set[T], b: Set[T]): Set[T] =
    (a union b) -- (a intersect b)

Pure functions don’t need test mocks, they have explicit inputs and outputs, so we could move all pure code into standalone methods on a stateless object, testable in isolation. We’re happy testing only the public methods, preferring that our business logic is easy to read.

3.3.3 act

The act method is slightly more complex, so we’ll split it into two parts for clarity: detection of when an action needs to be taken, followed by taking action. This simplification means that we can only perform one action per invocation, but that is reasonable because we can control the invocations and may choose to re-run act until no further action is taken.

We write the scenario detectors as extractors for WorldView, which is nothing more than an expressive way of writing if / else conditions.

We need to add agents to the farm if there is a backlog of work, we have no agents, we have no nodes alive, and there are no pending actions. We return a candidate node that we would like to start:

  private object NeedsAgent {
    def unapply(world: WorldView): Option[MachineNode] = world match {
      case WorldView(backlog, 0, managed, alive, pending, _)
           if backlog > 0 && alive.isEmpty && pending.isEmpty
             => Option(managed.head)
      case _ => None
    }
  }

If there is no backlog, we should stop all nodes that have become stale (they are not doing any work). However, since Google charge per hour we only shut down machines in their 58th+ minute to get the most out of our money. We return the non-empty list of nodes to stop.

As a financial safety net, all nodes should have a maximum lifetime of 5 hours.

  private object Stale {
    def unapply(world: WorldView): Option[NonEmptyList[MachineNode]] =
      world match {
        case WorldView(backlog, _, _, alive, pending, time) if alive.nonEmpty =>
          (alive -- pending.keys).collect {
            case (n, started)
                if backlog == 0 && (time - started).toMinutes % 60 >= 58 =>
              n
            case (n, started) if (time - started) >= 5.hours => n
          }.toList.toNel
  
        case _ => None
      }
  }

Now that we have detected the scenarios that can occur, we can write the act method. When we schedule a node to be started or stopped, we add it to pending noting the time that we scheduled the action.

  def act(world: WorldView): F[WorldView] = world match {
    case NeedsAgent(node) =>
      for {
        _ <- M.start(node)
        update = world.copy(pending = Map(node -> world.time))
      } yield update
  
    case Stale(nodes) =>
      nodes.foldLeftM(world) { (world, n) =>
        for {
          _ <- M.stop(n)
          update = world.copy(pending = world.pending + (n -> world.time))
        } yield update
      }
  
    case _ => world.pure[F]
  }

Because NeedsAgent and Stale do not cover all possible situations, we need a catch-all case _ to do nothing. Recall from Chapter 2 that .pure creates the for’s (monadic) context from a value.

foldLeftM is like foldLeft over nodes, but each iteration of the fold may return a monadic value. In our case, each iteration of the fold returns F[WorldView].

The M is for Monadic and you will find more of these lifted methods that behave as one would expect, taking monadic values in place of values.

3.4 Unit Tests

The FP approach to writing applications is a designer’s dream: you can delegate writing the implementations of algebras to your team members while focusing on making your business logic meet the requirements.

Our application is highly dependent on timing and third party webservices. If this was a traditional OOP application, we’d create mocks for all the method calls, or test actors for the outgoing mailboxes. FP mocking is equivalent to providing an alternative implementation of dependency algebras. The algebras already isolate the parts of the system that need to be mocked — everything else is pure.

We’ll start with some test data

  object Data {
    val node1   = MachineNode("1243d1af-828f-4ba3-9fc0-a19d86852b5a")
    val node2   = MachineNode("550c4943-229e-47b0-b6be-3d686c5f013f")
    val managed = NonEmptyList(node1, node2)
  
    val time1: Epoch = epoch"2017-03-03T18:07:00Z"
    val time2: Epoch = epoch"2017-03-03T18:59:00Z" // +52 mins
    val time3: Epoch = epoch"2017-03-03T19:06:00Z" // +59 mins
    val time4: Epoch = epoch"2017-03-03T23:07:00Z" // +5 hours
  
    val needsAgents = WorldView(5, 0, managed, Map.empty, Map.empty, time1)
  }
  import Data._

We implement algebras by extending Drone and Machines with a specific monadic context, Id being the simplest.

Our “mock” implementations simply play back a fixed WorldView. We’ve isolated the state of our system, so we can use var to store the state:

  class Mutable(state: WorldView) {
    var started, stopped: Int = 0
  
    private val D: Drone[Id] = new Drone[Id] {
      def getBacklog: Int = state.backlog
      def getAgents: Int = state.agents
    }
  
    private val M: Machines[Id] = new Machines[Id] {
      def getAlive: Map[MachineNode, Epoch] = state.alive
      def getManaged: NonEmptyList[MachineNode] = state.managed
      def getTime: Epoch = state.time
      def start(node: MachineNode): MachineNode = { started += 1 ; node }
      def stop(node: MachineNode): MachineNode = { stopped += 1 ; node }
    }
  
    val program = new DynAgentsModule[Id](D, M)
  }

When we write a unit test (here using FlatSpec from scalatest), we create an instance of Mutable and then import all of its members.

Our implicit drone and machines both use the Id execution context and therefore interpreting this program with them returns an Id[WorldView] that we can assert on.

In this trivial case we just check that the initial method returns the same value that we use in the static implementations:

  "Business Logic" should "generate an initial world view" in {
    val mutable = new Mutable(needsAgents)
    import mutable._
  
    program.initial shouldBe needsAgents
  }

We can create more advanced tests of the update and act methods, helping us flush out bugs and refine the requirements:

  it should "remove changed nodes from pending" in {
    val world = WorldView(0, 0, managed, Map(node1 -> time3), Map.empty, time3)
    val mutable = new Mutable(world)
    import mutable._
  
    val old = world.copy(alive = Map.empty,
                         pending = Map(node1 -> time2),
                         time = time2)
    program.update(old) shouldBe world
  }
  
  it should "request agents when needed" in {
    val mutable = new Mutable(needsAgents)
    import mutable._
  
    val expected = needsAgents.copy(
      pending = Map(node1 -> time1)
    )
  
    program.act(needsAgents) shouldBe expected
  
    mutable.stopped shouldBe 0
    mutable.started shouldBe 1
  }

It would be boring to go through the full test suite. Convince yourself with a thought experiment that the following tests are easy to implement using the same approach:

  • not request agents when pending
  • don’t shut down agents if nodes are too young
  • shut down agents when there is no backlog and nodes will shortly incur new costs
  • not shut down agents if there are pending actions
  • shut down agents when there is no backlog if they are too old
  • shut down agents, even if they are potentially doing work, if they are too old
  • ignore unresponsive pending actions during update

All of these tests are synchronous and isolated to the test runner’s thread (which could be running tests in parallel). If we’d designed our test suite in Akka, our tests would be subject to arbitrary timeouts and failures would be hidden in logfiles.

The productivity boost of simple tests for business logic cannot be overstated. Consider that 90% of an application developer’s time interacting with the customer is in refining, updating and fixing these business rules. Everything else is implementation detail.

3.5 Parallel

The application that we have designed runs each of its algebraic methods sequentially. But there are some obvious places where work can be performed in parallel.

3.5.1 initial

In our definition of initial we could ask for all the information we need at the same time instead of one query at a time.

As opposed to flatMap for sequential operations, scalaz uses Apply syntax for parallel operations:

  ^^^^(D.getBacklog, D.getAgents, M.getManaged, M.getAlive, M.getTime)

which can also use infix notation:

  (D.getBacklog |@| D.getAgents |@| M.getManaged |@| M.getAlive |@| M.getTime)

If each of the parallel operations returns a value in the same monadic context, we can apply a function to the results when they all return. Rewriting update to take advantage of this:

  def initial: F[WorldView] =
    ^^^^(D.getBacklog, D.getAgents, M.getManaged, M.getAlive, M.getTime) {
      case (db, da, mm, ma, mt) => WorldView(db, da, mm, ma, Map.empty, mt)
    }

3.5.2 act

In the current logic for act, we are stopping each node sequentially, waiting for the result, and then proceeding. But we could stop all the nodes in parallel and then update our view of the world.

A disadvantage of doing it this way is that any failures will cause us to short-circuit before updating the pending field. But that’s a reasonable tradeoff since our update will gracefully handle the case where a node is shut down unexpectedly.

We need a method that operates on NonEmptyList that allows us to map each element into an F[MachineNode], returning an F[NonEmptyList[MachineNode]]. The method is called traverse, and when we flatMap over it we get a NonEmptyList[MachineNode] that we can deal with in a simple way:

  for {
    stopped <- nodes.traverse(M.stop)
    updates = stopped.map(_ -> world.time).toList.toMap
    update = world.copy(pending = world.pending ++ updates)
  } yield update

Arguably, this is easier to understand than the sequential version.

3.5.3 Parallel Interpretation

Marking something as suitable for parallel execution does not guarantee that it will be executed in parallel: that is the responsibility of the implementation. Not to state the obvious: parallel execution is supported by Future, but not Id.

Of course, we need to be careful when implementing algebras such that they can perform operations safely in parallel, perhaps requiring protecting internal state with concurrency locks or actors.

3.6 Summary

  1. algebras define the interface between systems.
  2. modules define pure logic and depend on algebras and other modules.
  3. Test implementations can mock out the side-effecting parts of the system, enabling a high level of test coverage for the business logic.
  4. algebraic methods can be performed in parallel by taking their product or traversing sequences (caveat emptor, revisited later).