Pure implementation
Like in the previous section I will spare you the details of the sbt setup. We will be using the following set of libraries:
- http4s
- Doobie (as database layer)
- Flyway for database migrations (or evolutions)
- Circe for JSON codecs
- Refined for using refined types
- the PostgreSQL JDBC driver
- pureconfig (for proper configuration loading)
Pure configuration handling
Last time we simply loaded our configuration via the typesafe config library but can’t we do a bit better here? The answer is yes by using the pureconfig1 library. First we start by implementing the necessary parts of our configuration as data types.
1 final case class ApiConfig(host: NonEmptyString, port: PortNumber)
2
3 object ApiConfig {
4 implicit val configReader: ConfigReader[ApiConfig] =
5 deriveReader[ApiConfig]
6 }
7
8 final case class DatabaseConfig(driver: NonEmptyString,
9 url: DatabaseUrl,
10 user: DatabaseLogin,
11 pass: DatabasePassword)
12
13 object DatabaseConfig {
14 implicit val configReader: ConfigReader[DatabaseConfig] =
15 deriveReader[DatabaseConfig]
16 }
As we can see the code is pretty simple. The implicits in the companion objects are needed for pureconfig to actually map from a configuration to your data types. As you can see we are using a function deriveReader which will derive (like in mathematics) the codec (Yes, it is similar to a JSON codec thus the name.) for us.
Below is an example of deriving a Order instance using the kittens 2library. It uses shapeless under the hood and provides automatic and semi automatic derivation for a lot of type class instances from Cats like Eq, Order, Show, Functor and so on.
1 import cats._
2 import cats.derived
3
4 implicit val order: Order[Translation] = {
5 import derived.auto.order._
6 derived.semi.order[Translation]
7 }
Models
Because we have already written our models we just re-use them here. The only thing we change is the semi automatic derivation of the JSON codecs. We just need to import the appropriate circe package and call the derive functions.
1 import io.circe._
2 import io.circe.generic.semiauto._
3
4 implicit val decode: Decoder[Product] = deriveDecoder[Product]
5 implicit val encode: Encoder[Product] = deriveEncoder[Product]
6 implicit val decode: Decoder[Translation] = deriveDecoder[Translation]
7 implicit val encode: Encoder[Translation] = deriveEncoder[Translation]
Database layer
In general the same applies to the database layer as we have already read in the “impure” section.
Migrations
For the sake of simplicity we will stick to Flyway for our database migrations. However we will wrap the migration code in a different way (read Encapsulate it properly within an IO to defer side effects.). While we’re at it we may just as well write our migration code using the interpreter pattern (it became famous under the name “tagless final” in Scala).
1 trait DatabaseMigrator[F[_]] {
2 def migrate(url: DatabaseUrl,
3 user: DatabaseLogin,
4 pass: DatabasePassword): F[Int]
5 }
We define a trait which describes the functionality desired by our interpreter and use a higher kinded type parameter to be able to abstract over the type. But now let’s continue with our Flyway interpreter.
1 final class FlywayDatabaseMigrator extends DatabaseMigrator[IO] {
2 override def migrate(url: DatabaseUrl,
3 user: DatabaseLogin,
4 pass: DatabasePassword): IO[Int] =
5 IO {
6 val flyway: Flyway = Flyway.configure()
7 .dataSource(url, user, pass)
8 .load()
9 flyway.migrate()
10 }
11 }
As we can see, the implementation is pretty simple and we just wrap our code into an IO monad to constrain the effect. Having the migration code settled we can move on to the repository.
If we take a closer look at the method definition of Flyway.migrate, we see this:
1 public int migrate() throws FlywayException
While IO will gladly defer side effects for us it won’t stop enclosed code from throwing exceptions. This is not that great. So what can we do about it?
Having an instance of MonadError in scope we could just use the .attempt function provided by it. But is this enough or better does this provide a sensible solution for us? Let’s play a bit on the REPL.
1 @ import cats._, cats.effect._, cats.implicits._
2 @ val program = for {
3 _ <- IO(println("one"))
4 _ <- IO(println("two"))
5 x <- IO.pure(42)
6 } yield x
7 @ program.attempt.unsafeRunSync match {
8 case Left(e) =>
9 println(e.getMessage)
10 -1
11 case Right(r) => r
12 }
13 one
14 two
15 res3: Int = 42
16 @ val program = for {
17 _ <- IO(println("one"))
18 _ <- IO(throw new Error("BOOM!"))
19 x <- IO.pure(42)
20 } yield x
21 @ program.attempt.unsafeRunSync match {
22 case Left(e) =>
23 println(e.getMessage)
24 -1
25 case Right(r) => r
26 }
27 one
28 BOOM!
29 res5: Int = -1
This looks like we just have to use MonadError then. Hurray, we don’t need to change our code in the migrator. As model citizens of the functional programming camp we just defer the responsibility upwards to the calling site.
Doobie
As we already started with using a tagless final approach we might as well continue with it and define a base for our repository.
1 trait Repository[F[_]] {
2 def loadProduct(id: ProductId): F[Seq[(ProductId, LanguageCode, ProductName)]]
3
4 def loadProducts(): Stream[F, (ProductId, LanguageCode, ProductName)]
5
6 def saveProduct(p: Product): F[Int]
7
8 def updateProduct(p: Product): F[Int]
9 }
There is nothing exciting here except that we feel brave now and try to use proper refined types in our database functions. This is possible due to the usage of the doobie-refined module. To be able to map the UUID data type (and others) we also need to include the doobie-postgresql module. For convenience we are still using ProductId instead of UUID in our definition. In addition we wire the return type of loadProducts to be a fs2.Stream because we want to achieve pure functional streaming here. :-)
So let’s see what a repository using doobie looks like.
1 final class DoobieRepository[F[_]: Sync](tx: Transactor[F])
2 extends Repository[F] {
3
4 override def loadProduct(id: ProductId) = ???
5
6 override def loadProducts() = ???
7
8 override def saveProduct(p: Product) = ???
9
10 override def updateProduct(p: Product) = ???
11 }
We keep our higher kinded type as abstract as we can but we want it to be able to suspend our side effects. Therefore we require an implicit Sync.3
If we look at the detailed function definitions further below, the first big difference is that with doobie you write plain SQL queries. You can do this with Slick too4 but with doobie it is the only way. If you’re used to object relational mapping (ORM) or other forms of query compilers then this may seem strange at first. But: “In data processing it seems, all roads eventually lead back to SQL!”5 ;-)
We won’t discuss the benefits or drawbacks here but in general I also lean towards the approach of using the de facto lingua franca for database access because it was made for this and so far no query compiler was able to beat hand crafted SQL in terms of performance. Another benefit is that if you ask a database guru for help, she will be much more able to help you with plain SQL queries than with some meta query which is compiled into something that you have no idea of.
1 override def loadProduct(id: ProductId) =
2 sql"""SELECT products.id, names.lang_code, names.name
3 FROM products
4 JOIN names ON products.id = names.product_id
5 WHERE products.id = $id"""
6 .query[(ProductId, LanguageCode, ProductName)]
7 .to[Seq]
8 .transact(tx)
The loadProduct function simply returns all rows for a single product from the database like its Slick counterpart in the impure variant. The parameter will be correctly interpolated by Doobie therefore we don’t need to worry about SQL injections here. We specify the type of the query, instruct Doobie to transform it into a sequence and give it to the transactor.
1 override def loadProducts() =
2 sql"""SELECT products.id, names.lang_code, names.name
3 FROM products
4 JOIN names ON products.id = names.product_id
5 ORDER BY products.id"""
6 .query[(ProductId, LanguageCode, ProductName)]
7 .stream
8 .transact(tx)
Our loadProducts function is equivalent to the first one but it returns the data for all products sorted by product and as a stream using the fs2 library which provides pure functional streaming.
1 override def saveProduct(p: Product): F[Int] = {
2 val namesSql =
3 "INSERT INTO names (product_id, lang_code, name) VALUES (?, ?, ?)"
4 val namesValues = p.names.map(t => (p.id, t.lang, t.name))
5 val program = for {
6 pi <- sql"INSERT INTO products (id) VALUES(${p.id})".update.run
7 ni <- Update[(ProductId, LanguageCode, ProductName)](namesSql)
8 .updateMany(namesValues)
9 } yield pi + ni
10 program.transact(tx)
11 }
When saving a product we use monadic notation for our program to have it short circuit in the case of failure. Doobie will also put all commands into a database transaction. The function itself will try to create the “master” entry into the products table and save all translations afterwards.
1 override def updateProduct(p: Product): F[Int] = {
2 val namesSql =
3 "INSERT INTO names (product_id, lang_code, name) VALUES (?, ?, ?)"
4 val namesValues = p.names.map(t => (p.id, t.lang, t.name))
5 val program = for {
6 dl <- sql"DELETE FROM names WHERE product_id = ${p.id}".update.run
7 ts <- Update[(ProductId, LanguageCode, ProductName)](namesSql)
8 .updateMany(namesValues)
9 } yield dl + ts
10 program.transact(tx)
11 }
The updateProduct function uses also monadic notation like the saveProduct function we talked about before. The difference is that it first deletes all known translations before saving the given ones.
http4s routes
The routing DSL of http4s differs from the one of Akka-HTTP. Although I like the latter one more it poses no problem to model out a base for our routes.
1 val productRoutes: HttpRoutes[IO] = HttpRoutes.of[IO] {
2 case GET -> Root / "product" / id =>
3 ???
4 case PUT -> Root / "product" / id =>
5 ???
6 }
7 val productsRoutes: HttpRoutes[IO] = HttpRoutes.of[IO] {
8 case GET -> Root / "products" =>
9 ???
10 case POST -> Root / "products" =>
11 ???
12 }
As we can see the DSL is closer to Scala syntax and quite easy to read. But before we move on to the details of each route let’s think about how we can model this a bit more abstract. While it is fine to have our routes bound to IO it would be better to have more flexibility here. We have several options here but for starters we just extract our routes into their own classes like in the following schema.
1 final class ProductRoutes[F[_]: Sync](repo: Repository[F])
2 extends Http4sDsl[F] {
3
4 val routes: HttpRoutes[F] = HttpRoutes.of[F] {
5 case GET -> Root / "product" / UUIDVar(id) =>
6 ???
7 case req @ PUT -> Root / "product" / UUIDVar(id) =>
8 ???
9 }
10 }
11
12 final class ProductsRoutes[F[_]: Sync](repo: Repository[F])
13 extends Http4sDsl[F] {
14
15 val routes: HttpRoutes[F] = HttpRoutes.of[F] {
16 case GET -> Root / "products" =>
17 ???
18 case req @ POST -> Root / "products" =>
19 ???
20 }
21 }
So far they only need the repository to access and manipulate data. Now let’s take on the single route implementations.
1 final class ProductRoutes[F[_]: Sync](repo: Repository[F])
2 extends Http4sDsl[F] {
3 implicit def decodeProduct = jsonOf
4 implicit def encodeProduct = jsonEncoderOf
5
6 val routes: HttpRoutes[F] = HttpRoutes.of[F] {
7 case GET -> Root / "product" / UUIDVar(id) =>
8 for {
9 rows <- repo.loadProduct(id)
10 resp <- Ok(Product.fromDatabase(rows))
11 } yield resp
12 case req @ PUT -> Root / "product" / UUIDVar(id) =>
13 for {
14 p <- req.as[Product]
15 _ <- repo.updateProduct(p)
16 r <- NoContent()
17 } yield r
18 }
19 }
First we need to bring JSON codecs in scope for http4s thus the implicit definitions on top of the file. In the route for loading a single product we simply load the database rows which we pipe through our helper function to construct a proper Product and return that.
The update route (via PUT) transforms the request body into a Product and gives that to the update function of the repository. Finally a NoContent response is returned.
1 final class ProductsRoutes[F[_]: Sync](repo: Repository[F])
2 extends Http4sDsl[F] {
3 implicit def decodeProduct = jsonOf
4 implicit def encodeProduct = jsonEncoderOf
5
6 val routes: HttpRoutes[F] = HttpRoutes.of[F] {
7 case GET -> Root / "products" =>
8 val ps: Stream[F, Product] = repo.loadProducts
9 .map(cs => Product.fromDatabase(List(cs)))
10 .collect {
11 case Some(p) => p
12 }
13 Ok(ps)
14 case req @ POST -> Root / "products" =>
15 for {
16 p <- req.as[Product]
17 _ <- repo.saveProduct(p)
18 r <- NoContent()
19 } yield r
20 }
21 }
Our first take on the routes for products looks pretty complete already. Again we need implicit definitions for our JSON codecs to be able to serialize and de-serialize our entities. The POST route for creating a product is basically the same as the update route from the previous part. We create a Product from the request body, pass it to the save function of the repository and return a 205 NoContent response.
The GET route for returning all products calls the appropriate repository function which returns a stream which we map over using our helper function. Afterwards we use collect to convert our stream from Option[Product] to a stream of Product which we pass to the Ok function of http4s.
To solve this we need to dive into the fs2 API and leverage it’s power to merge our products back together. So let’s see how we do.
Streaming - Take 1
Because we believe ourselves to be clever we pick the simple sledge hammer approach and just run some accumulator on the stream. So what do we need? A helper function and some code changes on the stream (e.g. in the route).
1 def merge(ps: List[Product])(p: Product): List[Product] =
2 ps.headOption.fold(List(p)) { h =>
3 if (h.id === p.id)
4 h.copy(names = h.names ::: p.names) :: ps.drop(1)
5 else
6 p :: ps
7 }
So this function will take a list (that may be empty) and a product and will merge the top most element (the head) of the list with the given one. It will return an updated list that either contains an updated head element or a new head. Leaving aside the question of who guarantees that the relevant list element will always be the head, we may use it.
1 case GET -> Root / "products" =>
2 val ps: Stream[F, Product] = repo.loadProducts
3 .map(cs => Product.fromDatabase(List(cs)))
4 .collect {
5 case Some(p) => p
6 }
7 .fold(List.empty[Product])((acc, p) => Product.merge(acc)(p))
8 Ok(ps)
Looks so simple, does it? Just a simple fold which uses our accumulator and we should be settled. But life is not that simple…
1 found : fs2.Stream[F,List[com.wegtam.books.pfhais.pure.models.Product]]
2 required: fs2.Stream[F,com.wegtam.books.pfhais.pure.models.Product]
3 .fold(List.empty[Product])((acc, p) => Product.merge(acc)(p))
4 ^
The compiler complains that we have changed the type of the stream and rightly so. So let’s fix that compiler error.
Let’s take a look again and think about what it means to change a stream of products into a stream of a list of products. It means that we will be building the whole thing in memory! Well if we wanted that we could have skipped streaming at all. So back to the drawing board.
Streaming - Take 2
We need to process our stream of database columns (or products if we use the converter like before) in such a way that all related entities will be grouped into one product and emitted as such. After browsing the documentation of fs2 we stumble upon a function called groupAdjacentBy so we try that one.
1 case GET -> Root / "products" =>
2 val ps = repo.loadProducts
3 .groupAdjacentBy(_._1)
4 .map {
5 case (id, rows) => Product.fromDatabase(rows.toList)
6 }
7 .collect {
8 case Some(p) => p
9 }
10 Ok(ps)
Okay, this does not look complicated and it even compiles - Hooray! :-)
So let’s break it apart piece by piece. The group function of fs2 will partition the input depending on the given function into chunks. A Chunk is used internally by fs2 for all kinds of stuff. You may compare it to a sub-stream of Akka-Streams. However the documentation labels it as: Strict, finite sequence of values that allows index-based random access of elements.
Having our chunks we can map over each one converting it into a list which is then passed to our helper function fromDatabase to create proper products. Last but not least we need to collect our entities to get from an Option[Product] to a stream of Product.
JSON trouble
Now that we have a proper streaming solution we try it out but what do we get when we expect a list of products?
1 % http :53248/products
2 HTTP/1.1 200 OK
3 Content-Type: application/json
4 Transfer-Encoding: chunked
5
6 {
7 "id":"8773899b-fcfa-401f-af3e-b188ebb0c00c",
8 "names":[
9 {"lang":"de","name":"Erdbeere"},
10 {"lang":"en","name":"Strawberry"}
11 ]
12 }
13 {
14 "id":"983aaf86-abe4-44af-9896-d8f2d2c5f82c",
15 "names":[
16 {"lang":"de","name":"Gurke"},
17 {"lang":"en","name":"Cucumber"}
18 ]
19 }
Well, whatever this is, it is not JSON! It might look like it, but it isn’t. However quite often you can see such things in the wild (read in production).
If we think about it then this sounds like a bug in http4s and indeed we find an issue7 for it. Because the underlying problem is not as trivial as it first sounds maybe we should try to work around the issue.
The fs2 API offers concatenation of streams and the nifty intersperse function to insert elements between emitted ones. So let’s give it a try.
1 case GET -> Root / "products" =>
2 val prefix = Stream.eval("[".pure[F])
3 val suffix = Stream.eval("]".pure[F])
4 val ps = repo.loadProducts
5 .groupAdjacentBy(_._1)
6 .map {
7 case (id, rows) => Product.fromDatabase(rows.toList)
8 }
9 .collect {
10 case Some(p) => p
11 }
12 .map(_.asJson.noSpaces)
13 .intersperse(",")
14 @SuppressWarnings(Array("org.wartremover.warts.Any"))
15 val result: Stream[F, String] = prefix ++ ps ++ suffix
16 Ok(result)
First we create streams for the first and last JSON that we need to emit. Please note that we cannot simply use a String here but have to lift it into our HKT F. The usage of pure is okay because we simply lift a fixed value. Then we extend our original stream processing by explicitly converting our products to JSON and inserting the delimiter (a comma) manually using the intersperse function. In the end we simply concatenate our streams and return the result.
Our solution is quite simple, having the downside that we need to suppress a warning from the wartremover8 tool. This is somewhat annoying but can happen. If we remove the annotation, we’ll get a compiler error:
1 [error] ... [wartremover:Any] Inferred type containing Any
2 [error] val result: Stream[F, String] = prefix ++ ps ++ suffix
3 [error] ^
4 [error] ... [wartremover:Any] Inferred type containing Any
5 [error] val result: Stream[F, String] = prefix ++ ps ++ suffix
6 [error] ^
7 [error] two errors found
So let’s check if we have succeeded:
1 % http :53248/products
2 HTTP/1.1 200 OK
3 Content-Type: text/plain; charset=UTF-8
4 Transfer-Encoding: chunked
5
6 [
7 {
8 "id": "8773899b-fcfa-401f-af3e-b188ebb0c00c",
9 "names": [
10 {
11 "lang": "de",
12 "name": "Erdbeere"
13 },
14 {
15 "lang": "en",
16 "name": "Strawberry"
17 }
18 ]
19 },
20 {
21 "id": "983aaf86-abe4-44af-9896-d8f2d2c5f82c",
22 "names": [
23 {
24 "lang": "de",
25 "name": "Gurke"
26 },
27 {
28 "lang": "en",
29 "name": "Cucumber"
30 }
31 ]
32 }
33 ]
This looks good, so we congratulations: We are done with our routes!
Starting the application
Within our main entry point we simply initialise all needed components and wire them together. We’ll step through each part in this section. The first thing you’ll notice is that we use the IOApp provided by the Cats effect library9.
1 object Pure extends IOApp {
2 @SuppressWarnings(Array("org.wartremover.warts.Any"))
3 def run(args: List[String]): IO[ExitCode] = ???
4 }
Yet again we need to suppress a warning from wartremover here. But let’s continue to initialising the database connection.
1 val migrator: DatabaseMigrator[IO] = new FlywayDatabaseMigrator
2
3 val program = for {
4 (apiConfig, dbConfig) <- IO {
5 val cfg = ConfigFactory.load
6 (loadConfigOrThrow[ApiConfig](cfg, "api"),
7 loadConfigOrThrow[DatabaseConfig](cfg, "database"))
8 }
9 ms <- migrator.migrate(dbConfig.url, dbConfig.user, dbConfig.pass)
10 tx = Transactor
11 .fromDriverManager[IO](dbConfig.driver,
12 dbConfig.url,
13 dbConfig.user,
14 dbConfig.pass)
15 repo = new DoobieRepository(tx)
We create our database migrator explicitly wired to the IO data type. Now we start with a for comprehension in which we load our configuration via pureconfig yet again within an IO. After successful loading of the configuration we continue with migrating the database. Finally we create the transactor needed by Doobie and the database repository.
1 val program = for {
2 // ...
3 productRoutes = new ProductRoutes(repo)
4 productsRoutes = new ProductsRoutes(repo)
5 routes = productRoutes.routes <+> productsRoutes.routes
6 httpApp = Router("/" -> routes).orNotFound
7 server = BlazeServerBuilder[IO].bindHttp(apiConfig.port,
8 apiConfig.host).withHttpApp(httpApp)
9 fiber = server.resource.use(_ => IO(StdIn.readLine())).as(ExitCode.Succes\
10 s)
11 } yield fiber
Here we create our routes via the classes, combine them (via <+> operator) and create the http4s app explicitly using an IO thus wiring our abstract routes to IO. The service will - like the impure one - run until you press enter. But it won’t run yet. ;-)
1 program.attempt.unsafeRunSync match {
2 case Left(e) =>
3 IO {
4 println("*** An error occured! ***")
5 if (e != null) {
6 println(e.getMessage)
7 }
8 ExitCode.Error
9 }
10 case Right(r) => r
11 }
If you remember playing around with MonadError then you’ll recognize the attempt here. We attempt to run our program and execute possible side effects via the unsafeRunSync method from Cats effect. But to provide a proper return type for the IOApp we need to evaluate the return value which is either an error or a proper exit code. In case of an error we print it out on the console (no fancy logging here) and explicitly set an error code as the return value.
As it seems we are done with our pure service! Or are we? Let’s see what we need add to both services if we want to test them.