Pure functional HTTP APIs in Scala
Pure functional HTTP APIs in Scala
Buy on Leanpub
Pure functional HTTP APIs in Scala

Foreword

Thank you for your interest in this book. I hope you’ll have a good time reading it and learn something from it.

About this book

This book is intended for the intermediate Scala programmer who is interested in functional programming and works mainly on the web service backend side. Ideally she has experience with libraries like Akka HTTP1 and Slick2 which are in heavy use in that area.

However maybe you have wondered if we can’t do better even though aforementioned projects are battle tested and proven.

The answer to this can be found in this book which is intended to be read from cover to cover in the given order. Within the book the following libraries will be used: Cats3, Cats Effect4, http4s5, Doobie6, Refined7, fs28 and probably others. ;-)

Copyleft Notice

This book uses the Creative Commons Attribution ShareAlike 4.0 International (CC BY-SA 4.0) license1. The code snippets in this book are licensed under CC02 which means you can use them without restriction. Excerpts from libraries maintain their license.

Thanks

I would like to thank my beloved wife and family who bear with me and make all of this possible.
Also I send a big thank you to alle the nice people from the Scala community which I’ve had the pleasure to meet.

Our use case

For better understanding we will implement a small use case in both impure and pure way. The following section will outline the specification.

Service specification

First we need to specify the exact scope and API of our service. We’ll design a service with a minimal API to keep things simple. It shall fulfil the following requirements.

The service shall provide HTTP API endpoints for:

  1. the creation of a product data type identified by a unique id
  2. adding translations for a product name by language code and unique id
  3. returning the existing translations for a product
  4. returning a list of all existing products with their translations

Data model

We will keep the model very simple to avoid going overboard with the implementation.

  1. A language code shall be defined by the ISO 639-1 (e.g. a two letter code).
  2. A translation shall contain a language code and a product name (non-empty string).
  3. A product shall contain a unique id (UUID version 4) and a list of translations.

Database

The data will be stored in a relational database (RDBMS). Therefore we need to define the tables and relations within the database.

The products table

The table products must contain only the unique id which is also the primary key.

The names table

The table names must contain a column for the product id, one for the language code and one for the name. Its primary key is the combination of the product id and the language code. All columns must not be null. The relation to the products is realised by a foreign key constraint to the products table via the product id.

HTTP API

The HTTP API shall provide the following endpoints on the given paths:

Path HTTP method Function
/products POST Create a product.
/products GET Get all products and translations.
/product/{UUID} PUT Add translations.
/product/{UUID} GET Get all translations for the product.

The data shall be encoded in JSON using the following specification:

JSON for a translation
1 {
2   "lang": "ISO-639-1 Code",
3   "name": "A non empty string."
4 }
JSON for a product
1 {
2   "id": "The-UUID-of-the-product",
3   "names": [
4     // A list of translations.
5   ]
6 }

This should be enough to get us started.

The state of the art

Within the Scala ecosystem the Akka-HTTP library is a popular choice for implementing server side backends for HTTP APIs. Another quite popular option is the Play framework but using a full blown web framework to just provide a thin API is overkill in most cases. As most services need a database the Slick library is another popular choice which completes the picture.

However while all mentioned libraries are battle tested and proven they still have problems.

Problems

In the domain of functional programming we want referential transparency which we will define in the following way:

Building on that we need pure functions which are

  1. only dependent on their input
  2. have no side effects

This means in turn that our functions will be referential transparent.

But, the mentioned libraries are built upon the Future from Scala which uses eager evaluation and breaks referential transparency. Let’s look at an example.

Future example 1
1 import scala.concurrent.Future
2 import scala.concurrent.ExecutionContext.Implicits.global
3 
4 for {
5   _ <- Future { println("Hi there!") }
6   _ <- Future { println("Hi there!") }
7 } yield ()

The code above will print the text Hi there! two times. But how about the following one?

Future example 2
1 import scala.concurrent.Future
2 import scala.concurrent.ExecutionContext.Implicits.global
3 
4 val printF = Future { println("Hi there!") }
5 
6 for {
7   _ <- printF
8   _ <- printF
9 } yield ()

Instead of printing the text two times it will print it only once even when there is no usage of printF at all (try omitting the for comprehension). This means that Future breaks referential transparency!

Maybe there is another way

If we want referential transparency, we must push the side effects to the boundaries of our system (program) which can be done by using lazy evaluation. Let’s repeat the previous example in a different way.

IO example 1
1 import cats.effect.IO
2 import cats.implicits._
3 
4 val effect = for {
5   _ <- IO(println("Hi there!"))
6   _ <- IO(println("Hi there!"))
7 } yield ()

The above code will produce no output. Only if we evaluate the variable effect which is of type IO[Unit] will the output be generated (try effect.unsafeRunSync in the REPL). Also the second approach works like expected.

IO example 2
1 import cats.effect.IO
2 import cats.implicits._
3 
4 val printF = IO(println("Hi there!"))
5 
6 val effect = for {
7   _ <- printF
8   _ <- printF
9 } yield ()

Suddenly we can much more easily reason about our code! And why is that? Well we don’t have unexpected side effects caused by code running even when it doesn’t need to. This is a sneak peak how pure code looks like. Now we only need to implement pure libraries for our use, or do we?

Luckily for us meanwhile there are several pure options available in the Scala ecosystem. We will stick to the cats family of libraries namely http4s and Doobie as replacements for Akka-HTTP and Slick. They build upon the Cats Effect library which is an implementation of an IO monad for Scala. Some other options exist but we’ll stick to the one from Cats.

To be able to contrast both ways of implementing a service we will first implement it using Akka-HTTP and Slick and will then migrate to http4s and Doobie.

Impure implementation

We’ll be using the following libraries for the impure version of the service:

  1. Akka (including Akka-HTTP and Akka-Streams)
  2. Slick (as database layer)
  3. Flyway for database migrations (or evolutions)
  4. Circe for JSON codecs and akka-http-json as wrapper
  5. Refined for using refined types
  6. the PostgreSQL JDBC driver

I’ll spare you the sbt setup as you can look that up in the code repository (e.g. the impure folder in the book repo).

Models

First we’ll implement our models which are simple and straightforward. At first we need a class to store our translations or better a single translation.

1 final case class Translation(lang: String, name: String)

Technically it is okay but we have a bad feeling about it. Using Option[String] is of no use because both fields have to be set. But a String can always be null and contain a lot of unexpected stuff (literally anything).

So let us define some refined types which we can use later on. At first we need a language code which obeys the restrictions of ISO-639-1 and we need a stronger definition for a product name. For the former we use a regular expression and for the latter we simply expect a string which is not empty.

Refined types for models
1 type LanguageCode = String Refined MatchesRegex[W.`"^[a-z]{2}$"`.T]
2 type ProductName = String Refined NonEmpty

Now we can give our translation model another try.

Translation model using refined types
1 final case class Translation(lang: LanguageCode, name: ProductName)

Much better and while we’re at it we can also write the JSON codecs using the refined module of the Circe library. We put them into the companion object of the model.

1 object Translation {
2   implicit val decode: Decoder[Translation] =
3     Decoder.forProduct2("lang", "name")(Translation.apply)
4 
5   implicit val encode: Encoder[Translation] =
6     Encoder.forProduct2("lang", "name")(t => (t.lang, t.name))
7 }

Now onwards to the product model. Because we already know of refined types we can use them from start here.

1 type ProductId = String Refined Uuid
2 final case class Product(id: ProductId, names: List[Translation])

If we look closely we realise that a List maybe empty. Which is valid for the list but not for our product because we need at least one entry. Luckily for us the cats library has us covered with the NonEmptyList data type. Including the JSON codecs this leads us to our final implementation.
Last but not least we really should be using the existing UUID data type instead of rolling our own refined string version - even when it is cool. ;-)

Product model using UUID type and NeL
 1 type ProductId = java.util.UUID
 2 final case class Product(id: ProductId, names: NonEmptyList[Translation])
 3 
 4 object Product {
 5   implicit val decode: Decoder[Product] =
 6     Decoder.forProduct2("id", "names")(Product.apply)
 7 
 8   implicit val encode: Encoder[Product] =
 9     Encoder.forProduct2("id", "names")(p => (p.id, p.names))
10 }

We kept the type name ProductId by using a type alias. This is convenient but remember that a type alias does not add extra type safety (e.g. type Foo = String will be a String).
Now we have the models covered and can move on to the database layer.

Database layer

The database layer should provide a programmatic access to the database but also should it manage changes in the database. The latter one is called migrations or evolutions. From the available options we chose Flyway as the tool to manage our database schema.

Migrations

Flyway uses raw SQL scripts which have to be put into a certain location being /db/migration (under the resources folder) in our case. Also the files have to be named like VXX__some_name.sql (XX being a number) starting with V1. Please note that there are two underscores between the version prefix and the rest of the name! Because our database schema is very simply we’re done quickly:

Flyway migration for creating the database
 1 CREATE TABLE "products" (
 2   "id" UUID NOT NULL,
 3   CONSTRAINT "products_pk" PRIMARY KEY ("id")
 4 );
 5 
 6 CREATE TABLE "names" (
 7   "product_id" UUID       NOT NULL,
 8   "lang_code"  VARCHAR(2) NOT NULL,
 9   "name"       TEXT       NOT NULL,
10   CONSTRAINT "names_pk" 
11     PRIMARY KEY ("product_id", "lang_code"),
12   CONSTRAINT "names_product_id_fk" 
13     FOREIGN KEY ("product_id") 
14     REFERENCES "products" ("id") 
15     ON DELETE CASCADE ON UPDATE CASCADE
16 );

In the code you’ll see that we additionally set comments which are omitted from the code snippet above. This might be overkill here but it is a very handy feature to have and I advice you to use it for more complicated database schemas. Because the right comment (read information) in the right place might save a lot of time when trying to understand things.

Next we move on to the programmatic part which at first needs a configuration of our database connection. With Slick you have a multitude of options but we’ll use the “Typesafe Config”1 approach.

Database configuration in application.conf
 1 database {
 2   profile = "slick.jdbc.PostgresProfile$"
 3   db {
 4     connectionPool = "HikariCP"
 5     dataSourceClass = "org.postgresql.ds.PGSimpleDataSource"
 6     properties {
 7       serverName = "localhost"
 8       portNumber = "5432"
 9       databaseName = "impure"
10       user = "impure"
11       password = "secret"
12     }
13     numThreads = 10
14   }
15 }

After we have this in place we can run the migrations via the API of Flyway. For this we have to load the configuration (we do it by creating an actor system), extract the needed information and create a JDBC url and use that with username and password to obtain a Flyway instance. On that one we simply call the method migrate() which will do the right thing. Basically it will check if the schema exists and decide to either create it, apply pending migrations or simply do nothing. The method will return the number of applied migrations.

Apply database migrations via Flyway
 1 implicit val system: ActorSystem    = ActorSystem()
 2 implicit val mat: ActorMaterializer = ActorMaterializer()
 3 implicit val ec: ExecutionContext   = system.dispatcher
 4 
 5 val url = "jdbc:postgresql://" +
 6 system.settings.config.getString("database.db.properties.serverName") +
 7 ":" + system.settings.config.getString("database.db.properties.portNumber") +
 8 "/" + system.settings.config.getString("database.db.properties.databaseName")
 9 val user = system.settings.config.getString("database.db.properties.user")
10 val pass = system.settings.config.getString("database.db.properties.password")
11 val flyway = Flyway.configure().dataSource(url, user, pass).load()
12 val _ = flyway.migrate()

Let us continue to dive into the Slick table definitions.

Slick tables

Slick offers several options for approaching the database. For our example we will be using the lifted embedding but if needed Slick also provides the ability to perform plain SQL queries.
For the lifted embedding we have to define out tables in a way Slick can understand. While this can be tricky under certain circumstances our simple model is straightforward to implement.

Slick product table definition
1 final class Products(tag: Tag) extends Table[(UUID)](tag, "products") {
2   def id = column[UUID]("id", O.PrimaryKey)
3 
4   def * = (id)
5 }
6 val productsTable = TableQuery[Products]

As you can see above we’re using simple data types (not the refined ones) to have a more easy Slick implementation. However we can also use refined types for the price of using either the slick-refined library or writing custom column mappers.
Next we’ll implement the table for the translations which will also need some constraints.

Slick translations table definition
 1 final class Names(tag: Tag) extends Table[(UUID, String, String)](tag, "names") {
 2   def productId = column[UUID]("product_id")
 3   def langCode  = column[String]("lang_code")
 4   def name      = column[String]("name")
 5 
 6   def pk = primaryKey("names_pk", (productId, langCode))
 7   def productFk =
 8     foreignKey("names_product_id_fk", productId, productsTable)(
 9       _.id,
10       onDelete = ForeignKeyAction.Cascade,
11       onUpdate = ForeignKeyAction.Cascade
12     )
13 
14   def * = (productId, langCode, name)
15 }
16 val namesTable = TableQuery[Names]

As you can see the definition of constraints is also pretty simple. Now our repository needs some functions for a more convenient access to the data.

Slick repository functions
 1 def loadProduct(id: ProductId): Future[Seq[(UUID, String, String)]] = {
 2   val program = for {
 3     (p, ns) <- productsTable
 4       .filter(_.id === id)
 5       .join(namesTable)
 6       .on(_.id === _.productId)
 7   } yield (p.id, ns.langCode, ns.name)
 8   dbConfig.db.run(program.result)
 9 }
10 
11 def loadProducts(): Future[Seq[(UUID, String, String)]] = {
12   val program = for {
13     (p, ns) <- productsTable join namesTable on (_.id === _.productId)
14   } yield (p.id, ns.langCode, ns.name)
15   dbConfig.db.run(program.result)
16 }
17 
18 def saveProduct(p: Product): Future[List[Int]] = {
19   val cp      = productsTable += (p.id)
20   val program = DBIO.sequence(cp :: saveTranslations(p).toList)
21     .transactionally
22   dbConfig.db.run(program)
23 }
24 
25 def updateProduct(p: Product): Future[List[Int]] = {
26   val deleteOld = namesTable.filter(_.productId === p.id).delete
27   val program   = DBIO.sequence(deleteOld :: saveTranslations(p).toList)
28     .transactionally
29   dbConfig.db.run(program)
30 }
31 
32 protected def saveTranslations(p: Product): NonEmptyList[DBIO[Int]] = {
33   val save = saveTranslation(p.id)(_)
34   p.names.map(t => save(t))
35 }
36 
37 protected def saveTranslation(id: ProductId)(t: Translation): DBIO[Int] =
38   namesTable.insertOrUpdate((id, t.lang, t.name))

The last two functions are helpers to enable us to create a load queries which we can compose. They are used in the saveProduct and updateProduct functions to create a list of queries that are executed as bulk while the call to transactionally ensures that they will run within a transaction. When updating a product we first delete all existing translations to allow the removal of existing translations via an update.
The loadProduct function simply returns a list of database rows from the needed join. Therefore we need a function which builds a Product type out of that.

Helper function to create a Product
 1 def fromDatabase(rows: Seq[(UUID, String, String)]): Option[Product] = {
 2   val po = for {
 3     (id, c, n) <- rows.headOption
 4     t          <- Translation.fromUnsafe(c)(n)
 5     p          <- Product(id = id, names = NonEmptyList.one(t)).some
 6   } yield p
 7   po.map(
 8     p =>
 9       rows.drop(1).foldLeft(p) { (a, cols) =>
10 	val (id, c, n) = cols
11 	Translation.fromUnsafe(c)(n).fold(a)(t =>
12 	  a.copy(names = a.names :+ t)
13 	)
14     }
15   )
16 }

So far we should have everything in place to make use of our database. Now we need to wire it all together.

Akka-HTTP routes

Defining the routes is pretty simple if you’re used to the Akka-HTTP routing DSL syntax.

Basic routes with Akka-HTTP
 1 val route = path("product" / ProductIdSegment) { id: ProductId =>
 2   get {
 3     ???
 4   } ~ put {
 5     ???
 6   }
 7 } ~ path("products") {
 8   get {
 9     ???
10   } ~
11   post {
12     ???
13   }
14 }

We will fill in the details later on. But now for starting the actual server to make use of our routes.

Starting an Akka-HTTP server
1 val host       = system.settings.config.getString("api.host")
2 val port       = system.settings.config.getInt("api.port")
3 val srv        = Http().bindAndHandle(route, host, port)
4 val pressEnter = StdIn.readLine()
5 srv.flatMap(_.unbind()).onComplete(_ => system.terminate())

The code will fire up a server using the defined routes and hostname and port from the configuration to start a server. It will run until you press enter and then terminate. Let us now visit the code for each routing endpoint. We will start with the one for returning a single product.

Returning a single product
 1 path("product" / ProductIdSegment) { id: ProductId =>
 2   get {
 3     complete {
 4       for {
 5         rows <- repo.loadProduct(id)
 6         prod <- Future { Product.fromDatabase(rows) }
 7       } yield prod
 8     }
 9   }
10 }

We load the raw product data from the repository and convert it into a proper product model. But to make the types align we have to wrap the second call in a Future otherwise we would get a compiler error. We don’t need to marshal the response because we are using the akka-http-json library which provides for example an ErrorAccumulatingCirceSupport import that handles this. Unless of course you do not have circe codecs defined for your types.

Updating a single product
1 val route = path("product" / ProductIdSegment) { id: ProductId =>
2   put {
3     entity(as[Product]) { p =>
4       complete {
5         repo.updateProduct(p)
6       }
7     }
8   }
9 }

The route for updating a product is also very simple. We’re extracting the product entity via the entity(as[T]) directive from the request body and simply give it to the appropriate repository function. Now onwards to creating a new product.

Creating a product
1 path("products") {
2   post {
3     entity(as[Product]) { p =>
4       complete {
5         repo.saveProduct(p)
6       }
7     }
8   }
9 }

As you can see the function is basically the same except that we’re calling a different function from the repository. Last but not least let us take a look at the return all products endpoint.

Return all products
 1 path("products") {
 2   get {
 3     complete {
 4       val products = for {
 5         rows <- repo.loadProducts()
 6         ps <- Future {
 7           rows.toList.groupBy(_._1).map {
 8             case (_, cols) => Product.fromDatabase(cols)
 9           }
10         }
11       } yield ps
12       products.map(_.toList.flatten)
13     }
14   }
15 }

This looks more complicated that the other endpoints. So what exactly are we doing here?
Well first we load the raw product data from the repository. Afterwards we convert it into the proper data model or to be more exact into a list of product entities.

The first thing that comes to mind is that we’re performing operations in memory. This is not different from the last time when we converted the data for a single product. Now however we’re talking about all products which may be a lot of data. Another obvious point is that we get a list of Option[Product] which we explicitly flatten at the end.

Maybe we should consider streaming the results. But we still have to group and combine the rows which belong to a single product into a product entity. Can we achieve that with streaming? Well, let’s look at our data flow.
We receive a list of 3 columns from the database in the following format: product id, language code, name. The tricky part being that multiple rows (list entries) can belong to the same product recognizable by the same value for the first column product id. At first we should simplify our problem by ensuring that the list will be sorted by the product id. This is done by adjusting the function loadProducts in the repository.

Sort the returned list of entries.
1 def loadProducts(): DatabasePublisher[(UUID, String, String)] = {
2   val program = for {
3     (p, ns) <- productsTable.join(namesTable).on(_.id === _.productId)
4                .sortBy(_._1.id)
5   } yield (p.id, ns.langCode, ns.name)
6   dbConfig.db.stream(program.result)
7 }

Now we can rely on the fact that we have seen all entries for one product if the product id in our list changes. Let’s adjust our code in the endpoint to make use of streaming now. Because Akka-HTTP is based on Akka-Streams we can simply use that.

Return all products as stream
 1 path("products") {
 2   get {
 3     implicit val jsonStreamingSupport: JsonEntityStreamingSupport =
 4       EntityStreamingSupport.json()
 5 
 6     val src = Source.fromPublisher(repo.loadProducts())
 7     val products: Source[Product, NotUsed] = src
 8       .collect(
 9 	cs =>
10 	  Product.fromDatabase(Seq(cs)) match {
11 	    case Some(p) => p
12 	}
13       )
14       .groupBy(Int.MaxValue, _.id)
15       .fold(Option.empty[Product])(
16 	(op, x) => op.fold(x.some)(p =>
17 	  p.copy(names = p.names ::: x.names).some
18 	)
19       )
20       .mergeSubstreams
21       .collect(
22 	op =>
23 	  op match {
24 	    case Some(p) => p
25 	}
26       )
27     complete(products)
28   }
29 }

Wow, this may look scary but let’s break it apart piece by piece. At first we need an implicit value which provides streaming support for JSON. Next we create a Source from the database stream. Now we implement the processing logic via the high level streams API. We collect every defined output of our helper function fromDatabase which leads to a stream of Product entities. But we have created way too many (Each product will be created as often as it has translations.). So we group our stream by the product id which creates a new stream for each product id holding only the entities for the specific product. We fold over each of these streams by merging together the list of translations (names). Afterwards we merge the streams back together and run another collect function to simply get a result stream of Product and not of Option[Product]. Last but not least the stream is passed to the complete function which will do the right thing.

Problems with the solution

The solution has two problems:

  1. The number of individual streams (and thus products) is limited to Int.MaxValue.
  2. The groupBy operator holds the references to these streams in memory opening a possible out of memory issue here.

As the first problem is simply related to the usage of groupBy we may say that we only have one problem: The usage of groupBy. ;-)
For a limited amount of data the proposed solution is perfectly fine so we will leave it as is for now.

Regarding the state of our service we have a working solution, so congratulations and let’s move on to the pure implementation.

Pure implementation

Like in the previous section I will spare you the details of the sbt setup. We will be using the following set of libraries:

  1. http4s
  2. Doobie (as database layer)
  3. Flyway for database migrations (or evolutions)
  4. Circe for JSON codecs
  5. Refined for using refined types
  6. the PostgreSQL JDBC driver
  7. pureconfig (for proper configuration loading)

Pure configuration handling

Last time we simply loaded our configuration via the typesafe config library but can’t we do a bit better here? The answer is yes by using the pureconfig1 library. First we start by implementing the necessary parts of our configuration as data types.

Configuration data types
 1 final case class ApiConfig(host: NonEmptyString, port: PortNumber)
 2 
 3 object ApiConfig {
 4   implicit val configReader: ConfigReader[ApiConfig] =
 5     deriveReader[ApiConfig]
 6 }
 7 
 8 final case class DatabaseConfig(driver: NonEmptyString,
 9                                 url: DatabaseUrl,
10                                 user: DatabaseLogin,
11                                 pass: DatabasePassword)
12 
13 object DatabaseConfig {
14   implicit val configReader: ConfigReader[DatabaseConfig] =
15     deriveReader[DatabaseConfig]
16 }

As we can see the code is pretty simple. The implicits in the companion objects are needed for pureconfig to actually map from a configuration to your data types. As you can see we are using a function deriveReader which will derive (like in mathematics) the codec (Yes, it is similar to a JSON codec thus the name.) for us.

Models

Because we have already written our models we just re-use them here. The only thing we change is the semi automatic derivation of the JSON codecs. We just need to import the appropriate circe package and call the derive functions.

Derive JSON codecs
1 import io.circe._
2 import io.circe.generic.semiauto._
3 
4 implicit val decode: Decoder[Product] = deriveDecoder[Product]
5 implicit val encode: Encoder[Product] = deriveEncoder[Product]
6 implicit val decode: Decoder[Translation] = deriveDecoder[Translation]
7 implicit val encode: Encoder[Translation] = deriveEncoder[Translation]

Database layer

In general the same applies to the database layer as we have already read in the “impure” section.

Migrations

For the sake of simplicity we will stick to Flyway for our database migrations. However we will wrap the migration code in a different way (read Encapsulate it properly within an IO to defer side effects.). While we’re at it we may just as well write our migration code using the interpreter pattern (it became famous under the name “tagless final” in Scala).

Database migrator base
1 trait DatabaseMigrator[F[_]] {
2   def migrate(url: DatabaseUrl,
3               user: DatabaseLogin,
4 	      pass: DatabasePassword): F[Int]
5 }

We define a trait which describes the functionality desired by our interpreter and use a higher kinded type parameter to be able to abstract over the type. But now let’s continue with our Flyway interpreter.

Flyway migrator interpreter
 1 final class FlywayDatabaseMigrator extends DatabaseMigrator[IO] {
 2   override def migrate(url: DatabaseUrl,
 3                        user: DatabaseLogin,
 4 		       pass: DatabasePassword): IO[Int] =
 5     IO {
 6       val flyway: Flyway = Flyway.configure()
 7         .dataSource(url, user, pass)
 8 	.load()
 9       flyway.migrate()
10     }
11 }

As we can see, the implementation is pretty simple and we just wrap our code into an IO monad to constrain the effect. Having the migration code settled we can move on to the repository.

If we take a closer look at the method definition of Flyway.migrate, we see this:

Method definition of Flyway.migrate
1 public int migrate() throws FlywayException

While IO will gladly defer side effects for us it won’t stop enclosed code from throwing exceptions. This is not that great. So what can we do about it?
Having an instance of MonadError in scope we could just use the .attempt function provided by it. But is this enough or better does this provide a sensible solution for us? Let’s play a bit on the REPL.

MonadError on the REPL
 1 @ import cats._, cats.effect._, cats.implicits._
 2 @ val program = for {
 3            _ <- IO(println("one"))
 4            _ <- IO(println("two"))
 5            x <- IO.pure(42)
 6            } yield x
 7 @ program.attempt.unsafeRunSync match {
 8            case Left(e) =>
 9              println(e.getMessage)
10              -1
11            case Right(r) => r
12            }
13 one
14 two
15 res3: Int = 42
16 @ val program = for {
17            _ <- IO(println("one"))
18            _ <- IO(throw new Error("BOOM!"))
19            x <- IO.pure(42)
20            } yield x
21 @ program.attempt.unsafeRunSync match {
22            case Left(e) =>
23              println(e.getMessage)
24              -1
25            case Right(r) => r
26            }
27 one
28 BOOM!
29 res5: Int = -1

This looks like we just have to use MonadError then. Hurray, we don’t need to change our code in the migrator. As model citizens of the functional programming camp we just defer the responsibility upwards to the calling site.

Doobie

As we already started with using a tagless final approach we might as well continue with it and define a base for our repository.

Base trait for the repository
1 trait Repository[F[_]] {
2   def loadProduct(id: ProductId): F[Seq[(ProductId, LanguageCode, ProductName)]]
3 
4   def loadProducts(): Stream[F, (ProductId, LanguageCode, ProductName)]
5 
6   def saveProduct(p: Product): F[Int]
7 
8   def updateProduct(p: Product): F[Int]
9 }

There is nothing exciting here except that we feel brave now and try to use proper refined types in our database functions. This is possible due to the usage of the doobie-refined module. To be able to map the UUID data type (and others) we also need to include the doobie-postgresql module. For convenience we are still using ProductId instead of UUID in our definition. In addition we wire the return type of loadProducts to be a fs2.Stream because we want to achieve pure functional streaming here. :-)
So let’s see what a repository using doobie looks like.

The doobie repository.
 1 final class DoobieRepository[F[_]: Sync](tx: Transactor[F])
 2   extends Repository[F] {
 3 
 4   override def loadProduct(id: ProductId) = ???
 5 
 6   override def loadProducts() = ???
 7 
 8   override def saveProduct(p: Product) = ???
 9 
10   override def updateProduct(p: Product) = ???
11 }

We keep our higher kinded type as abstract as we can but we want it to be able to suspend our side effects. Therefore we require an implicit Sync.2
If we look at the detailed function definitions further below, the first big difference is that with doobie you write plain SQL queries. You can do this with Slick too3 but with doobie it is the only way. If you’re used to object relational mapping (ORM) or other forms of query compilers then this may seem strange at first. But: “In data processing it seems, all roads eventually lead back to SQL!”4 ;-)
We won’t discuss the benefits or drawbacks here but in general I also lean towards the approach of using the de facto lingua franca for database access because it was made for this and so far no query compiler was able to beat hand crafted SQL in terms of performance. Another benefit is that if you ask a database guru for help, she will be much more able to help you with plain SQL queries than with some meta query which is compiled into something that you have no idea of.

Loading a product.
1 override def loadProduct(id: ProductId) = 
2   sql"""SELECT products.id, names.lang_code, names.name 
3 	FROM products
4 	JOIN names ON products.id = names.product_id
5 	WHERE products.id = $id"""
6     .query[(ProductId, LanguageCode, ProductName)]
7     .to[Seq]
8     .transact(tx)

The loadProduct function simply returns all rows for a single product from the database like its Slick counterpart in the impure variant. The parameter will be correctly interpolated by Doobie therefore we don’t need to worry about SQL injections here. We specify the type of the query, instruct Doobie to transform it into a sequence and give it to the transactor.

Load all products
1 override def loadProducts() =
2   sql"""SELECT products.id, names.lang_code, names.name
3       FROM products
4       JOIN names ON products.id = names.product_id
5       ORDER BY products.id"""
6     .query[(ProductId, LanguageCode, ProductName)]
7     .stream
8     .transact(tx)

Our loadProducts function is equivalent to the first one but it returns the data for all products sorted by product and as a stream using the fs2 library which provides pure functional streaming.

Save a product
 1 override def saveProduct(p: Product): F[Int] = {
 2   val namesSql = 
 3     "INSERT INTO names (product_id, lang_code, name) VALUES (?, ?, ?)"
 4   val namesValues = p.names.map(t => (p.id, t.lang, t.name))
 5   val program = for {
 6     pi <- sql"INSERT INTO products (id) VALUES(${p.id})".update.run
 7     ni <- Update[(ProductId, LanguageCode, ProductName)](namesSql)
 8             .updateMany(namesValues)
 9   } yield pi + ni
10   program.transact(tx)
11 }

When saving a product we use monadic notation for our program to have it short circuit in the case of failure. Doobie will also put all commands into a database transaction. The function itself will try to create the “master” entry into the products table and save all translations afterwards.

Update a product
 1 override def updateProduct(p: Product): F[Int] = {
 2   val namesSql =
 3     "INSERT INTO names (product_id, lang_code, name) VALUES (?, ?, ?)"
 4   val namesValues = p.names.map(t => (p.id, t.lang, t.name))
 5   val program = for {
 6     dl <- sql"DELETE FROM names WHERE product_id = ${p.id}".update.run
 7     ts <- Update[(ProductId, LanguageCode, ProductName)](namesSql)
 8             .updateMany(namesValues)
 9   } yield dl + ts
10   program.transact(tx)
11 }

The updateProduct function uses also monadic notation like the saveProduct function we talked about before. The difference is that it first deletes all known translations before saving the given ones.

http4s routes

The routing DSL of http4s differs from the one of Akka-HTTP. Although I like the latter one more it poses no problem to model out a base for our routes.

Base for http4s routes
 1 val productRoutes: HttpRoutes[IO] = HttpRoutes.of[IO] {
 2   case GET -> Root / "product" / id =>
 3     ???
 4   case PUT -> Root / "product" / id =>
 5     ???
 6 }
 7 val productsRoutes: HttpRoutes[IO] = HttpRoutes.of[IO] {
 8   case GET -> Root / "products" =>
 9     ???
10   case POST -> Root / "products" =>
11     ???
12 }

As we can see the DSL is closer to Scala syntax and quite easy to read. But before we move on to the details of each route let’s think about how we can model this a bit more abstract. While it is fine to have our routes bound to IO it would be better to have more flexibility here. We have several options here but for starters we just extract our routes into their own classes like in the following schema.

Routing classes
 1 final class ProductRoutes[F[_]: Sync](repo: Repository[F])
 2   extends Http4sDsl[F] {
 3 
 4   val routes: HttpRoutes[F] = HttpRoutes.of[F] {
 5     case GET -> Root / "product" / UUIDVar(id) =>
 6       ???
 7     case req @ PUT -> Root / "product" / UUIDVar(id) =>
 8       ???
 9   }
10 }
11 
12 final class ProductsRoutes[F[_]: Sync](repo: Repository[F]) 
13   extends Http4sDsl[F] {
14 
15   val routes: HttpRoutes[F] = HttpRoutes.of[F] {
16     case GET -> Root / "products" =>
17       ???
18     case req @ POST -> Root / "products" =>
19       ???
20   }
21 }

So far they only need the repository to access and manipulate data. Now let’s take on the single route implementations.

Product routes
 1 final class ProductRoutes[F[_]: Sync](repo: Repository[F])
 2   extends Http4sDsl[F] {
 3   implicit def decodeProduct = jsonOf
 4   implicit def encodeProduct = jsonEncoderOf
 5 
 6   val routes: HttpRoutes[F] = HttpRoutes.of[F] {
 7     case GET -> Root / "product" / UUIDVar(id) =>
 8       for {
 9         rows <- repo.loadProduct(id)
10         resp <- Ok(Product.fromDatabase(rows))
11       } yield resp
12     case req @ PUT -> Root / "product" / UUIDVar(id) =>
13       for {
14         p <- req.as[Product]
15         _ <- repo.updateProduct(p)
16         r <- NoContent()
17       } yield r
18   }
19 }

First we need to bring JSON codecs in scope for http4s thus the implicit definitions on top of the file. In the route for loading a single product we simply load the database rows which we pipe through our helper function to construct a proper Product and return that.
The update route (via PUT) transforms the request body into a Product and gives that to the update function of the repository. Finally a NoContent response is returned.

Products routes (1st try)
 1 final class ProductsRoutes[F[_]: Sync](repo: Repository[F])
 2   extends Http4sDsl[F] {
 3   implicit def decodeProduct = jsonOf
 4   implicit def encodeProduct = jsonEncoderOf
 5 
 6   val routes: HttpRoutes[F] = HttpRoutes.of[F] {
 7     case GET -> Root / "products" =>
 8       val ps: Stream[F, Product] = repo.loadProducts
 9         .map(cs => Product.fromDatabase(List(cs)))
10         .collect {
11           case Some(p) => p
12         }
13       Ok(ps)
14     case req @ POST -> Root / "products" =>
15       for {
16         p <- req.as[Product]
17         _ <- repo.saveProduct(p)
18         r <- NoContent()
19       } yield r
20   }
21 }

Our first take on the routes for products looks pretty complete already. Again we need implicit definitions for our JSON codecs to be able to serialize and de-serialize our entities. The POST route for creating a product is basically the same as the update route from the previous part. We create a Product from the request body, pass it to the save function of the repository and return a 205 NoContent response.
The GET route for returning all products calls the appropriate repository function which returns a stream which we map over using our helper function. Afterwards we use collect to convert our stream from Option[Product] to a stream of Product which we pass to the Ok function of http4s.

To solve this we need to dive into the fs2 API and leverage it’s power to merge our products back together. So let’s see how we do.

Streaming - Take 1

Because we believe ourselves to be clever we pick the simple sledge hammer approach and just run some accumulator on the stream. So what do we need? A helper function and some code changes on the stream (e.g. in the route).

Merging products
1 def merge(ps: List[Product])(p: Product): List[Product] =
2   ps.headOption.fold(List(p)) { h =>
3     if (h.id === p.id)
4       h.copy(names = h.names ::: p.names) :: ps.drop(1)
5     else
6       p :: ps
7   }

So this function will take a list (that may be empty) and a product and will merge the top most element (the head) of the list with the given one. It will return an updated list that either contains an updated head element or a new head. Leaving aside the question of who guarantees that the relevant list element will always be the head, we may use it.

Adapted streaming route
1 case GET -> Root / "products" =>
2   val ps: Stream[F, Product] = repo.loadProducts
3     .map(cs => Product.fromDatabase(List(cs)))
4     .collect {
5       case Some(p) => p
6     }
7     .fold(List.empty[Product])((acc, p) => Product.merge(acc)(p))
8   Ok(ps)

Looks so simple, does it? Just a simple fold which uses our accumulator and we should be settled. But life is not that simple…

Compiler error
1 found   : fs2.Stream[F,List[com.wegtam.books.pfhais.pure.models.Product]]
2 required: fs2.Stream[F,com.wegtam.books.pfhais.pure.models.Product]
3          .fold(List.empty[Product])((acc, p) => Product.merge(acc)(p))
4               ^

The compiler complains that we have changed the type of the stream and rightly so. So let’s fix that compiler error.

Let’s take a look again and think about what it means to change a stream of products into a stream of a list of products. It means that we will be building the whole thing in memory! Well if we wanted that we could have skipped streaming at all. So back to the drawing board.

Streaming - Take 2

We need to process our stream of database columns (or products if we use the converter like before) in such a way that all related entities will be grouped into one product and emitted as such.

What about tests?

TODO

Testing the impure service

TODO

Testing the pure service

TODO

Adding benchmarks

TODO

Comparison

TODO

Notes

Foreword

About this book

1https://akka.io

2http://slick.lightbend.com

3https://typelevel.org/cats/

4https://typelevel.org/cats-effect/

5https://http4s.org/

6https://tpolecat.github.io/doobie/

7https://github.com/fthomas/refined

8https://fs2.io/

Copyleft Notice

1https://creativecommons.org/licenses/by-sa/4.0/legalcode

2https://wiki.creativecommons.org/wiki/CC0

Maybe there is another way

Impure implementation

1http://slick.lightbend.com/doc/3.3.1/database.html

Pure implementation

1https://pureconfig.github.io/

2https://typelevel.org/cats-effect/typeclasses/sync.html

3http://slick.lightbend.com/doc/3.3.1/sql.html

4https://blog.acolyer.org/2019/07/03/one-sql-to-rule-them-all/