9. Wiring up the Application
To finish, we will apply what we have learnt to wire up the example application, and implement an HTTP client and server using the http4s pure FP library.
The source code to the drone-dynamic-agents application is available along
with the book’s source code at https://github.com/fommil/fpmortals under the
examples folder. It is not necessary to be at a computer to read this chapter,
but many readers may prefer to explore the codebase in addition to this text.
Some parts of the application have been left unimplemented, as an exercise to
the reader. See the README for further instructions.
9.1 Overview
Our main application only requires an implementation of the DynAgents algebra.
trait DynAgents[F[_]] {
def initial: F[WorldView]
def update(old: WorldView): F[WorldView]
def act(world: WorldView): F[WorldView]
}
Everything else is implementation detail. Recall that our main loop should look something like
state = initial()
while True:
state = update(state)
state = act(state)
To implement DynAgents we require Drone and Machines algebras, which
require a JsonClient, LocalClock and OAuth2 algebras, etc etc. It is helpful
to get a complete picture of all the algebras, modules and interpreters of the
application. This is the layout of the files in our application:
├── dda
│ ├── algebra.scala
│ ├── DynAgents.scala
│ ├── main.scala
│ └── interpreters
│ ├── DroneModule.scala
│ └── GoogleMachinesModule.scala
├── http
│ ├── encoding
│ │ ├── UrlEncoded.scala
│ │ ├── UrlEncodedWriter.scala
│ │ ├── UrlQuery.scala
│ │ └── UrlQueryWriter.scala
│ ├── interpreters
│ │ └── BlazeJsonClient.scala
│ ├── JsonClient.scala
│ ├── oauth2
│ │ ├── Access.scala
│ │ ├── Auth.scala
│ │ └── Refresh.scala
│ │ └── interpreters
│ │ └── BlazeUserInteraction.scala
│ └── OAuth2JsonClient.scala
├── os
│ └── Browser.scala
└── time
├── Epoch.scala
├── LocalClock.scala
└── Sleep.scala
The signatures of all the algebras can be summarised as
trait Sleep[F[_]] {
def sleep(time: FiniteDuration): F[Unit]
}
trait LocalClock[F[_]] {
def now: F[Epoch]
}
trait JsonClient[F[_]] {
def get[A: JsDecoder](
uri: String Refined Url,
headers: IList[(String, String)]
): F[A]
def post[P: UrlEncodedWriter, A: JsDecoder](
uri: String Refined Url,
payload: P,
headers: IList[(String, String)]
): F[A]
}
trait Auth[F[_]] {
def authenticate: F[CodeToken]
}
trait Access[F[_]] {
def access(code: CodeToken): F[(RefreshToken, BearerToken)]
}
trait Refresh[F[_]] {
def bearer(refresh: RefreshToken): F[BearerToken]
}
trait OAuth2JsonClient[F[_]] {
// same methods as JsonClient, but doing OAuth2 transparently
}
trait UserInteraction[F[_]] {
def start: F[String Refined Url]
def open(uri: String Refined Url): F[Unit]
def stop: F[CodeToken]
}
trait Drone[F[_]] {
def getBacklog: F[Int]
def getAgents: F[Int]
}
trait Machines[F[_]] {
def getTime: F[Epoch]
def getManaged: F[NonEmptyList[MachineNode]]
def getAlive: F[MachineNode ==>> Epoch]
def start(node: MachineNode): F[Unit]
def stop(node: MachineNode): F[Unit]
}
Note that some signatures from previous chapters have been refactored to use scalaz data types, now that we know why they are superior to the stdlib.
The data types are:
@xderiving(Order, Arbitrary)
final case class Epoch(millis: Long) extends AnyVal
@deriving(Order, Show)
final case class MachineNode(id: String)
@deriving(Equal, Show)
final case class CodeToken(token: String, redirect_uri: String Refined Url)
@xderiving(Equal, Show, ConfigReader)
final case class RefreshToken(token: String) extends AnyVal
@deriving(Equal, Show, ConfigReader)
final case class BearerToken(token: String, expires: Epoch)
@deriving(ConfigReader)
final case class OAuth2Config(token: RefreshToken, server: ServerConfig)
@deriving(ConfigReader)
final case class AppConfig(drone: BearerToken, machines: OAuth2Config)
We derive useful typeclasses using scalaz-deriving and Magnolia. The
ConfigReader typeclass is from the pureconfig library and is used to read
runtime configuration from HOCON property files.
And without going into the detail of how to implement the algebras, we need to
know the dependency graph of our DynAgentsModule. Let’s unravel the thread…
final class DynAgentsModule[F[_]: Applicative](
D: Drone[F],
M: Machines[F]
) extends DynAgents[F] { ... }
final class DroneModule[F[_]](
H: OAuth2JsonClient[F]
) extends Drone[F] { ... }
final class GoogleMachinesModule[F[_]](
H: OAuth2JsonClient[F]
) extends Machines[F] { ... }
There are two modules implementing OAuth2JsonClient, one that will use the OAuth2 Refresh algebra (for Google) and another that reuses a non-expiring BearerToken (for Drone).
final class OAuth2JsonClientModule[F[_]](
token: RefreshToken
)(
H: JsonClient[F],
T: LocalClock[F],
A: Refresh[F]
)(
implicit F: MonadState[F, BearerToken]
) extends OAuth2JsonClient[F] { ... }
final class BearerJsonClientModule[F[_]: Monad](
bearer: BearerToken
)(
H: JsonClient[F]
) extends OAuth2JsonClient[F] { ... }
So far we have seen requirements for F to have an Applicative[F], Monad[F]
and MonadState[F, BearerToken]. All of these requirements can be satisfied by
using StateT[Task, BearerToken, ?] as our application’s context.
However, some of our algebras only have one interpreter, using Task
final class LocalClockTask extends LocalClock[Task] { ... }
final class SleepTask extends Sleep[Task] { ... }
But recall that our algebras can provide a liftM on their companion, see Chapter 7.4 on the Monad Transformer Library, allowing us to lift a LocalClock[Task] into our desired StateT[Task, BearerToken, ?] context, and everything is consistent.
Unfortunately, that is not the end of the story. Things get more complicated
when we go to the next layer. Our JsonClient has an interpreter using a different context, an EitherT[Task, JsonClient.Error, ?]
final class BlazeJsonClient[F[_]](H: Client[Task])(
implicit
F: MonadError[F, JsonClient.Error],
I: MonadIO[F, Throwable]
) extends JsonClient[F] { ... }
object BlazeJsonClient {
def apply[F[_]](
implicit
F: MonadError[F, JsonClient.Error],
I: MonadIO[F, Throwable]
): Task[JsonClient[F]] = ...
}
Note that the BlazeJsonClient constructor returns a Task[JsonClient[F]], not
a JsonClient[F]. This is because the act of creating the client is effectful:
mutable connection pools are created and managed internally by http4s.
We must not forget that we need to provide a RefreshToken for
GoogleMachinesModule. We could ask the user to do all the legwork, but we are
nice and provide a separate one-shot application that uses the Auth and
Access algebras. The AuthModule and AccessModule implementations bring in
additional dependencies, but thankfully no change to the application’s F[_]
context.
final class AuthModule[F[_]: Monad](
config: ServerConfig
)(
I: UserInteraction[F]
) extends Auth[F] { ... }
final class AccessModule[F[_]: Monad](
config: ServerConfig
)(
H: JsonClient[F],
T: LocalClock[F]
) extends Access[F] { ... }
final class BlazeUserInteraction private (
pserver: Promise[Void, Server[Task]],
ptoken: Promise[Void, String]
) extends UserInteraction[Task] { ... }
object BlazeUserInteraction {
def apply(): Task[BlazeUserInteraction] = ...
}
The interpreter for UserInteraction is the most complex part of our codebase:
it starts an HTTP server, sends the user to visit a webpage in their browser,
captures a callback in the server, and then returns the result while safely
shutting down the web server.
Rather than using a StateT to manage this state, we use a Promise primitive
(from ioeffect). We should always use Promise (or IORef) instead of a
StateT when we are writing an IO interpreter since it allows us to contain
the abstraction. If we were to use a StateT, not only would it have a
performance impact on the entire application, but it would also leak internal
state management to the main application, which would become responsible for
providing the initial value. We also couldn’t use StateT in this scenario
because we need “wait for” semantics that are only provided by Promise.
9.2 Main
The ugliest part of FP is making sure that monads are all aligned and this tends
to happen in the Main entrypoint. The problem boils down to making sure that
all the monad contexts are aligned.
Our main loop is
state = initial()
while True:
state = update(state)
state = act(state)
and the good news is that the actual code will look like
for {
old <- F.get
updated <- A.update(old)
changed <- A.act(updated)
_ <- F.put(changed)
_ <- S.sleep(10.seconds)
} yield ()
where F holds the state of the world in a MonadState[F, WorldView]. We can
put this into a method called .step and repeat it forever by calling
.step[F].forever[Unit].
We begin the journey to get to this code. There are two approaches we can take,
and we will explore both. The first, and simplest, is to construct one monad
stack that all algebras are compatible with. Everything gets a .liftM added to
it to lift it into the larger stack. Let’s explore this approach for the
--machines mode of operation.
The code we want to write is
def auth(name: String): Task[Unit] = {
for {
config <- readConfig[ServerConfig](name + ".server")
ui <- BlazeUserInteraction()
auth = new AuthModule(config)(ui)
codetoken <- auth.authenticate
client <- BlazeJsonClient
clock = new LocalClockTask
access = new AccessModule(config)(client, clock)
token <- access.access(codetoken)
_ <- putStrLn(s"got token: $token")
} yield ()
}.run
where .readConfig and .putStrLn are library calls. We can think of them as
Task interpreters of algebras that read the application’s runtime
configuration and print a string to the screen.
But this code does not compile, for two reasons. Firstly, we need to consider
what our monad stack is going to be. The BlazeJsonClient constructor returns a
Task but the JsonClient methods require a MonadError[...,
JsonClient.Error]. This can be provided by EitherT. We can therefore
construct the common monad stack for the entire for comprehension as
type H[a] = EitherT[Task, JsonClient.Error, a]
Unfortunately this means we must .liftM everything that returns a Task,
which adds quite a lot of boilerplate. Unfortunately, the .liftM method does
not take a type of shape H[_], it takes a type of shape H[_[_], _], so we
need to create a type alias to help out the compiler:
type HT[f[_], a] = EitherT[f, JsonClient.Error, a]
type H[a] = HT[Task, a]
we can now call .liftM[HT] when we receive a Task
for {
config <- readConfig[ServerConfig](name + ".server").liftM[HT]
ui <- BlazeUserInteraction().liftM[HT]
auth = new AuthModule(config)(ui)
codetoken <- auth.authenticate.liftM[HT]
client <- BlazeJsonClient[H].liftM[HT]
clock = new LocalClockTask
access = new AccessModule(config)(client, clock)
token <- access.access(codetoken)
_ <- putStrLn(s"got token: $token").liftM[HT]
} yield ()
But this still doesn’t compile, because clock is a LocalClock[Task] and AccessModule requires a LocalClock[H]. We simply add the necessary .liftM boilerplate to the companion of LocalClock and can then lift the entire algebra
clock = LocalClock.liftM[Task, HT](new LocalClockTask)
access = new AccessModule[H](config)(client, clock)
and now everything compiles!
The second approach to wiring up an application is more complex, but necessary when there are conflicts in the monad stack, such as we need in our main loop. If we perform an analysis we find that the following are needed:
-
MonadError[F, JsonClient.Error]for uses of theJsonClient -
MonadState[F, BearerToken]for uses of theOAuth2JsonClient -
MonadState[F, WorldView]for our main loop
Unfortunately, the two MonadState requirements are in conflict. We could
construct a data type that captures all the state of the program, but that is a
leaky abstraction. Instead, we nest our for comprehensions and provide state
where it is needed.
We now need to think about three layers, which we will call F, G, H
type HT[f[_], a] = EitherT[f, JsonClient.Error, a]
type GT[f[_], a] = StateT[f, BearerToken, a]
type FT[f[_], a] = StateT[f, WorldView, a]
type H[a] = HT[Task, a]
type G[a] = GT[H, a]
type F[a] = FT[G, a]
Now some bad news about .liftM… it only works for one layer at a time. If we
have a Task[A] and we want an F[A], we have to go through each step and type
ta.liftM[HT].liftM[GT].liftM[FT]. Likewise, when lifting algebras we have to
call liftM multiple times. To get a Sleep[F], we have to type
val S: Sleep[F] = {
import Sleep.liftM
liftM(liftM(liftM(new SleepTask)))
}
and to get a LocalClock[G] we do two lifts
val T: LocalClock[G] = {
import LocalClock.liftM
liftM(liftM(new LocalClockTask))
}
The main application then becomes
for {
config <- readConfig[AppConfig]
blaze <- BlazeJsonClient[G]
_ <- {
val bearerClient = new BearerJsonClientModule(bearer)(blaze)
val drone = new DroneModule(bearerClient)
val refresh = new RefreshModule(config.machines.server)(blaze, T)
val oauthClient =
new OAuth2JsonClientModule(config.machines.token)(blaze, T, refresh)
val machines = new GoogleMachinesModule(oauthClient)
val agents = new DynAgentsModule(drone, machines)
for {
start <- agents.initial
_ <- {
val fagents = DynAgents.liftM[G, FT](agents)
step(fagents, S).forever[Unit]
}.run(start)
} yield ()
}.eval(bearer).run
} yield ()
where the outer loop is using Task, the middle loop is using G, and the
inner loop is using F.
The calls to .run(start) and .eval(bearer) are where we provide the initial
state for the StateT parts of our application. The .run is to reveal the
EitherT error.
We can call these two application entry points from our SafeApp
object Main extends SafeApp {
def run(args: List[String]): IO[Void, ExitStatus] = {
if (args.contains("--machines")) auth("machines")
else if (args.contains("--drone")) auth("drone")
else agents(BearerToken("<invalid>", Epoch(0)))
}.attempt[Void].map {
case \/-(_) => ExitStatus.ExitNow(0)
case -\/(err) => ExitStatus.ExitNow(1)
}
}
and then run it!
> runMain fommil.dda.Main --machines
[info] Running (fork) fommil.dda.Main --machines
...
[info] Service bound to address /127.0.0.1:46687
...
[info] Created new window in existing browser session.
...
[info] Headers(Host: localhost:46687, Connection: keep-alive, User-Agent: Mozilla/5.0 ...)
...
[info] POST https://www.googleapis.com/oauth2/v4/token
...
[info] got token: "<elided>"
Yay!
9.3 Blaze
We implement the HTTP client and server with the third party library http4s.
The interpreters for their client and server algebras are called Blaze.
We need the following dependencies
val http4sVersion = "0.18.16"
libraryDependencies ++= Seq(
"org.http4s" %% "http4s-dsl" % http4sVersion,
"org.http4s" %% "http4s-blaze-server" % http4sVersion,
"org.http4s" %% "http4s-blaze-client" % http4sVersion
)
9.3.1 BlazeJsonClient
We’ll need some imports
import org.http4s
import org.http4s.{ EntityEncoder, MediaType }
import org.http4s.headers.`Content-Type`
import org.http4s.client.Client
import org.http4s.client.blaze.{ BlazeClientConfig, Http1Client }
The Client module can be summarised as
final class Client[F[_]](
shutdown: F[Unit]
)(implicit F: MonadError[F, Throwable]) {
def fetch[A](req: Request[F])(f: Response[F] => F[A]): F[A] = ...
...
}
where Request and Response are data types:
final case class Request[F[_]](
method: Method
uri: Uri,
headers: Headers,
body: EntityBody[F]
) {
def withBody[A](a: A)
(implicit F: Monad[F], A: EntityEncoder[F, A]): F[Request[F]] = ...
...
}
final case class Response[F[_]](
status: Status,
headers: Headers,
body: EntityBody[F]
)
made of
final case class Headers(headers: List[Header])
final case class Header(name: String, value: String)
final case class Uri( ... )
object Uri {
// not total, only use if your string is guaranteed to be a URL
def unsafeFromString(s: String): Uri = ...
...
}
final case class Status(code: Int) {
def isSuccess: Boolean = ...
...
}
type EntityBody[F[_]] = fs2.Stream[F, Byte]
The EntityBody type is just an alias to Stream from the fs2 library. The
Stream data type is very powerful and can be thought of as an effectful, lazy,
pull-based stream of data. It is implemented as a Free monad with exception
catching and interruption. Stream takes two type parameters: an effect type
and a content type, and has an efficient internal representation for batching
the data. For example, although we are using Stream[F, Byte], it is actually
wrapping the raw Array[Byte] that arrives over the network.
We need to convert our header and URL representations into the versions required by http4s:
def convert(headers: IList[(String, String)]): http4s.Headers =
http4s.Headers(
headers.foldRight(List[http4s.Header]()) {
case ((key, value), acc) => http4s.Header(key, value) :: acc
}
)
def convert(uri: String Refined Url): http4s.Uri =
http4s.Uri.unsafeFromString(uri.value) // we already validated our String
Both our .get and .post methods require a conversion from the http4s
Response type into an A. We can factor this out into a single function,
.handler
import JsonClient.Error
final class BlazeJsonClient[F[_]] private (H: Client[Task])(
implicit
F: MonadError[F, Error],
I: MonadIO[F, Throwable]
) extends JsonClient[F] {
...
def handler[A: JsDecoder](resp: http4s.Response[Task]): Task[Error \/ A] = {
if (!resp.status.isSuccess)
Task.now(JsonClient.ServerError(resp.status.code).left)
else
for {
text <- resp.body.through(fs2.text.utf8Decode).compile.foldMonoid
res = JsParser(text)
.flatMap(_.as[A])
.leftMap(JsonClient.DecodingError(_))
} yield res
}
}
The .through(fs2.text.utf8Decode) is to convert a Stream[Task, Byte] into a
Stream[Task, String], with .compile.foldMonoid interpreting it with our
Task and combining all the parts using the Monoid[String], giving us a
Task[String].
We then parse the string as JSON and use the JsDecoder[A] to create the
required output.
This is our implementation of .get
def get[A: JsDecoder](
uri: String Refined Url,
headers: IList[(String, String)]
): F[A] =
I.liftIO(
H.fetch(
http4s.Request[Task](
uri = convert(uri),
headers = convert(headers)
)
)(handler[A])
)
.flatMap {
case -\/(err) => F.raiseError(err)
case \/-(success) => F.point(success)
}
.get is all plumbing: we convert our input types into the http4s.Request,
then call .fetch on the Client with our handler. This gives us back a
Task[Error \/ A], but we need to return a F[A]. Therefore we use the
MonadIO.liftIO to create a F[Error \/ A] and then .flatMap over the
disjunction to push the error into the F, using MonadError.raiseError.
Unfortunately, if we try to compile this code it will fail. The error will look something like
[error] BlazeJsonClient.scala:95:64: could not find implicit value for parameter
[error] F: cats.effect.Sync[scalaz.ioeffect.Task]
[error] text <- resp.body.through(fs2.text.utf8Decode).compile.foldMonoid
[error] ^
Basically something about a missing cats instance. The reason for this failure
is that http4s is using the Cats library, instead of Scalaz. Cats is a much
smaller library than Scalaz and amounts to a renaming of a subset of typeclasses
and datatypes. Thankfully, scalaz-ioeffect provides a compatibility layer and
the shims project provides seamless (until it isn’t) implicit conversions
between scalaz and cats types. We can get our code to compile with these
dependencies:
libraryDependencies ++= Seq(
"com.codecommit" %% "shims" % "1.4.0",
"org.scalaz" %% "scalaz-ioeffect-cats" % "2.10.1"
)
and these imports
import shims._
import scalaz.ioeffect.catz._
The implementation of .post is similar but we must also provide an instance of
EntityEncoder[Task, String Refined UrlEncoded]
Thankfully, the EntityEncoder typeclass provides convenience methods to let us
simply derive one from the existing String encoder
implicit val encoder: EntityEncoder[Task, String Refined UrlEncoded] =
EntityEncoder[Task, String]
.contramap[String Refined UrlEncoded](_.value)
.withContentType(
`Content-Type`(MediaType.`application/x-www-form-urlencoded`)
)
The only difference between .get and .post is the way we construct our http4s.Request
http4s.Request[Task](
method = http4s.Method.POST,
uri = convert(uri),
headers = convert(headers)
)
.withBody(payload.toUrlEncoded)
and the final piece is the constructor, which is a case of calling Http1Client
with a configuration object
object BlazeJsonClient {
def apply[F[_]](
implicit
F: MonadError[F, JsonClient.Error],
I: MonadIO[F, Throwable]
): Task[JsonClient[F]] =
Http1Client(BlazeClientConfig.defaultConfig).map(new BlazeJsonClient(_))
}
9.3.2 BlazeUserInteraction
We need to spin up an HTTP server, which is a lot easier than it sounds. First, the imports
import org.http4s._
import org.http4s.dsl._
import org.http4s.server.Server
import org.http4s.server.blaze._
We need to create a dsl for our effect type, which we then import
private val dsl = new Http4sDsl[Task] {}
import dsl._
Now we can use the http4s dsl to create HTTP endpoints. Rather than describe everything that can be done, we will simply implement the endpoint which should look familiar to any other HTTP DSL in Scala
private object Code extends QueryParamDecoderMatcher[String]("code")
private val service: HttpService[Task] = HttpService[Task] {
case GET -> Root :? Code(code) => ...
}
The difference, of course, is that this is entirely pure. The return type of each pattern match should be a Task[Response[Task]]. In our implementation we want
to take the code and put it into the ptoken promise:
final class BlazeUserInteraction private (
pserver: Promise[Throwable, Server[Task]],
ptoken: Promise[Throwable, String]
) extends UserInteraction[Task] {
...
private val service: HttpService[Task] = HttpService[Task] {
case GET -> Root :? Code(code) =>
ptoken.complete(code) >> Ok(
"That seems to have worked, go back to the console."
)
}
...
}
but the definition of our services routes is not enough, we need to launch a
server, which we do with BlazeBuilder
private val launch: Task[Server[Task]] =
BlazeBuilder[Task].bindHttp(0, "localhost").mountService(service, "/").start
Binding to port 0 lets the operating system assign a ephemeral port. We can
discover which port it’s actually running on by querying the server.address
field.
Our implementation of the .start and .stop methods is now straightforward
def start: Task[String Refined Url] =
for {
server <- launch
updated <- pserver.complete(server)
_ <- if (updated) Task.unit
else server.shutdown *> Task.failMessage("a server was already up")
} yield mkUrl(server)
def stop: Task[CodeToken] =
for {
server <- pserver.get
token <- ptoken.get
_ <- IO.sleep(1.second) *> server.shutdown
} yield CodeToken(token, mkUrl(server))
private def mkUrl(s: Server[Task]): String Refined Url = {
val port = s.address.getPort
Refined.unsafeApply(s"http://localhost:${port}/")
}
The 1.second sleep is necessary to avoid shutting down the server before the
response is sent back to the browser. IO doesn’t mess around when it comes to
concurrency performance!
Finally, to create a BlazeUserInteraction, we just need the two uninitialised
promises
object BlazeUserInteraction {
def apply(): Task[BlazeUserInteraction] = {
for {
p1 <- Promise.make[Void, Server[Task]].widenError[Throwable]
p2 <- Promise.make[Void, String].widenError[Throwable]
} yield new BlazeUserInteraction(p1, p2)
}
}
We could use IO[Void, ?] instead, but since the rest of our application is
using Task (i.e. IO[Throwable, ?]), we .widenError to avoid introducing
any boilerplate that would distract us.
9.4 Thank You
And that’s it! Congratulations on reaching the end. If you learnt something from this book, please tell your friends. This book does not have a marketing department, so word of mouth is the only way that readers find out about it.
Please consider getting involved in Scalaz by joining the gitter chat room. From there you can ask for advice, help newcomers (you’re basically an expert now), and contribute to the next release.