Impure implementation

We’ll be using the following libraries for the impure version of the service:

  1. Akka (including Akka-HTTP and Akka-Streams)
  2. Slick (as database layer)
  3. Flyway for database migrations (or evolutions)
  4. Circe for JSON codecs and akka-http-json as wrapper
  5. Refined for using refined types
  6. the PostgreSQL JDBC driver

I’ll spare you the sbt setup as you can look that up in the code repository (i.e. the impure folder in the book repo).

Models

First we’ll implement our models which are simple and straightforward. At first we need a class to store our translations or better a single translation.

1 final case class Translation(lang: String, name: String)

Technically it is okay but we have a bad feeling about it. Using Option[String] is of no use because both fields have to be set. But a String can always be null and contain a lot of unexpected stuff (literally anything).

So let us define some refined types which we can use later on. At first we need a language code which obeys the restrictions of ISO-639-1 and we need a stronger definition for a product name. For the former we use a regular expression and for the latter we simply expect a string which is not empty.

Refined types for models
1 type LanguageCode = String Refined MatchesRegex[W.`"^[a-z]{2}$"`.T]
2 type ProductName = String Refined NonEmpty

Now we can give our translation model another try.

Translation model using refined types
1 final case class Translation(lang: LanguageCode, name: ProductName)

Much better and while we’re at it we can also write the JSON codecs using the refined module of the Circe library. We put them into the companion object of the model.

1 object Translation {
2   implicit val decode: Decoder[Translation] =
3     Decoder.forProduct2("lang", "name")(Translation.apply)
4 
5   implicit val encode: Encoder[Translation] =
6     Encoder.forProduct2("lang", "name")(t => (t.lang, t.name))
7 }

Now onwards to the product model. Because we already know of refined types we can use them from start here.

1 type ProductId = String Refined Uuid
2 final case class Product(id: ProductId, names: List[Translation])

If we look closely we realise that a List maybe empty. Which is valid for the list but not for our product because we need at least one entry. Luckily for us the Cats library has us covered with the NonEmptyList data type. Including the JSON codecs this leads us to our final implementation.
Last but not least we really should be using the existing UUID data type instead of rolling our own refined string version - even when it is cool. ;-)

Product model using UUID type and NeL
 1 type ProductId = java.util.UUID
 2 final case class Product(id: ProductId, names: NonEmptyList[Translation])
 3 
 4 object Product {
 5   implicit val decode: Decoder[Product] =
 6     Decoder.forProduct2("id", "names")(Product.apply)
 7 
 8   implicit val encode: Encoder[Product] =
 9     Encoder.forProduct2("id", "names")(p => (p.id, p.names))
10 }

We kept the type name ProductId by using a type alias. This is convenient but remember that a type alias does not add extra type safety (e.g. type Foo = String will be a String).

Well, maybe because a list may contain duplicate entries but the database will surely not because of unique constraints! So, let’s switch to a NonEmptySet which is also provided by Cats.

Product model using UUID and NeS
1 type ProductId = java.util.UUID
2 final case class Product(id: ProductId, names: NonEmptySet[Translation])

Now we have the models covered and can move on to the database layer.

Database layer

The database layer should provide a programmatic access to the database but also should it manage changes in the database. The latter one is called migrations or evolutions. From the available options we chose Flyway as the tool to manage our database schema.

Migrations

Flyway uses raw SQL scripts which have to be put into a certain location being /db/migration (under the resources folder) in our case. Also the files have to be named like VXX__some_name.sql (XX being a number) starting with V1. Please note that there are two underscores between the version prefix and the rest of the name! Because our database schema is very simply we’re done quickly:

Flyway migration for creating the database
 1 CREATE TABLE "products" (
 2   "id" UUID NOT NULL,
 3   CONSTRAINT "products_pk" PRIMARY KEY ("id")
 4 );
 5 
 6 CREATE TABLE "names" (
 7   "product_id" UUID       NOT NULL,
 8   "lang_code"  VARCHAR(2) NOT NULL,
 9   "name"       TEXT       NOT NULL,
10   CONSTRAINT "names_pk" 
11     PRIMARY KEY ("product_id", "lang_code"),
12   CONSTRAINT "names_product_id_fk" 
13     FOREIGN KEY ("product_id") 
14     REFERENCES "products" ("id") 
15     ON DELETE CASCADE ON UPDATE CASCADE
16 );

In the code you’ll see that we additionally set comments which are omitted from the code snippet above. This might be overkill here but it is a very handy feature to have and I advice you to use it for more complicated database schemas. Because the right comment (read information) in the right place might save a lot of time when trying to understand things.

Next we move on to the programmatic part which at first needs a configuration of our database connection. With Slick you have a multitude of options but we’ll use the “Typesafe Config”1 approach.

Database configuration in application.conf
 1 database {
 2   profile = "slick.jdbc.PostgresProfile$"
 3   db {
 4     connectionPool = "HikariCP"
 5     dataSourceClass = "org.postgresql.ds.PGSimpleDataSource"
 6     properties {
 7       serverName = "localhost"
 8       portNumber = "5432"
 9       databaseName = "impure"
10       user = "impure"
11       password = "secret"
12     }
13     numThreads = 10
14   }
15 }

After we have this in place we can run the migrations via the API of Flyway. For this we have to load the configuration (we do it by creating an actor system), extract the needed information and create a JDBC url and use that with username and password to obtain a Flyway instance. On that one we simply call the method migrate() which will do the right thing. Basically it will check if the schema exists and decide to either create it, apply pending migrations or simply do nothing. The method will return the number of applied migrations.

Apply database migrations via Flyway
 1 implicit val system: ActorSystem    = ActorSystem()
 2 implicit val mat: ActorMaterializer = ActorMaterializer()
 3 implicit val ec: ExecutionContext   = system.dispatcher
 4 
 5 val url = "jdbc:postgresql://" +
 6 system.settings.config.getString("database.db.properties.serverName") +
 7 ":" + system.settings.config.getString("database.db.properties.portNumber") +
 8 "/" + system.settings.config.getString("database.db.properties.databaseName")
 9 val user = system.settings.config.getString("database.db.properties.user")
10 val pass = system.settings.config.getString("database.db.properties.password")
11 val flyway = Flyway.configure().dataSource(url, user, pass).load()
12 val _ = flyway.migrate()

Let us continue to dive into the Slick table definitions.

Slick tables

Slick offers several options for approaching the database. For our example we will be using the lifted embedding but if needed Slick also provides the ability to perform plain SQL queries.
For the lifted embedding we have to define out tables in a way Slick can understand. While this can be tricky under certain circumstances our simple model is straightforward to implement.

Slick product table definition
1 final class Products(tag: Tag) extends Table[(UUID)](tag, "products") {
2   def id = column[UUID]("id", O.PrimaryKey)
3 
4   def * = (id)
5 }
6 val productsTable = TableQuery[Products]

As you can see above we’re using simple data types (not the refined ones) to have a more easy Slick implementation. However we can also use refined types for the price of using either the slick-refined library or writing custom column mappers.
Next we’ll implement the table for the translations which will also need some constraints.

Slick translations table definition
 1 final class Names(tag: Tag) extends Table[(UUID, String, String)](tag, "names") {
 2   def productId = column[UUID]("product_id")
 3   def langCode  = column[String]("lang_code")
 4   def name      = column[String]("name")
 5 
 6   def pk = primaryKey("names_pk", (productId, langCode))
 7   def productFk =
 8     foreignKey("names_product_id_fk", productId, productsTable)(
 9       _.id,
10       onDelete = ForeignKeyAction.Cascade,
11       onUpdate = ForeignKeyAction.Cascade
12     )
13 
14   def * = (productId, langCode, name)
15 }
16 val namesTable = TableQuery[Names]

As you can see the definition of constraints is also pretty simple. Now our repository needs some functions for a more convenient access to the data.

Slick repository functions
 1 def loadProduct(id: ProductId): Future[Seq[(UUID, String, String)]] = {
 2   val program = for {
 3     (p, ns) <- productsTable
 4       .filter(_.id === id)
 5       .join(namesTable)
 6       .on(_.id === _.productId)
 7   } yield (p.id, ns.langCode, ns.name)
 8   dbConfig.db.run(program.result)
 9 }
10 
11 def loadProducts(): DatabasePublisher[(UUID, String, String)] = {
12   val program = for {
13     (p, ns) <- productsTable.join(namesTable)
14                  .on(_.id === _.productId).sortBy(_._1.id)
15   } yield (p.id, ns.langCode, ns.name)
16   dbConfig.db.stream(program.result)
17 }
18 
19 def saveProduct(p: Product): Future[List[Int]] = {
20   val cp      = productsTable += (p.id)
21   val program = DBIO.sequence(
22     cp :: saveTranslations(p).toList
23   ).transactionally
24   dbConfig.db.run(program)
25 }
26 
27 def updateProduct(p: Product): Future[List[Int]] = {
28   val program = namesTable
29     .filter(_.productId === p.id)
30     .delete
31     .andThen(DBIO.sequence(saveTranslations(p).toList))
32     .transactionally
33   dbConfig.db.run(program)
34 }
35 
36 protected def saveTranslations(p: Product): NonEmptyList[DBIO[Int]] = {
37   val save = saveTranslation(p.id)(_)
38   p.names.toNonEmptyList.map(t => save(t))
39 }
40 
41 /**
42   * Create a query to insert or update a given translation in the database.
43   *
44   * @param id The unique ID of the product.
45   * @param t  The translation to be saved.
46   * @return A composable sql query for Slick.
47   */
48 protected def saveTranslation(id: ProductId)(t: Translation): DBIO[Int] =
49   namesTable.insertOrUpdate((id, t.lang, t.name))

The last two functions are helpers to enable us to create a load queries which we can compose. They are used in the saveProduct and updateProduct functions to create a list of queries that are executed as bulk while the call to transactionally ensures that they will run within a transaction. When updating a product we first delete all existing translations to allow the removal of existing translations via an update. To be able to do so we use the andThen helper from Slick.
The loadProduct function simply returns a list of database rows from the needed join. Therefore we need a function which builds a Product type out of that.

Helper function to create a Product
 1 def fromDatabase(rows: Seq[(UUID, String, String)]): Option[Product] = {
 2   val po = for {
 3     (id, c, n) <- rows.headOption
 4     t          <- Translation.fromUnsafe(c)(n)
 5     p          <- Product(
 6                     id = id,
 7                     names = NonEmptySet.one[Translation](t)
 8                   ).some
 9   } yield p
10   po.map(
11     p =>
12       rows.drop(1).foldLeft(p) { (a, cols) =>
13         val (id, c, n) = cols
14         Translation.fromUnsafe(c)(n).fold(a)(t =>
15           a.copy(names = a.names.add(t))
16         )
17     }
18   )
19 }

But oh no! The compiler refuses to build it:

Missing cats.Order
1 [error] .../impure/models/Product.scala:45:74:
2   could not find implicit value for parameter A: 
3     cats.kernel.Order[com.wegtam.books.pfhais.impure.models.Translation]
4 [error] p <- Product(id = id, names = NonEmptySet.one[Translation](t)).some
5 [error]                                                            ^

It seems we have to provide an instance of Order for our Translation model to make Cats happy. So we have think of an ordering for our model. A simple approach would be to simply order by the language code. Let’s try this:

Providing Order for LanguageCode
1 import cats._
2 import cats.syntax.order._
3 
4 implicit val orderLanguageCode: Order[LanguageCode] = 
5   new Order[LanguageCode] {
6     def compare(x: LanguageCode, y: LanguageCode): Int =
7       x.value.compare(y.value)
8   }

You might have noticed the explicit call to .value to get the underlying string instance of our refined type. This is needed because the other option (using x.compare(y)) will compile but bless you with stack overflow errors. The reason is probably that the latter is compiled into code calling OrderOps#compare which is recursive.

Providing Order for Translation
1 import cats._
2 import cats.syntax.order._
3 
4 implicit val order: Order[Translation] =
5   new Order[Translation] {
6     def compare(x: Translation, y: Translation): Int =
7       x.lang.compare(y.lang)
8   }

So far we should have everything in place to make use of our database. Now we need to wire it all together.

Akka-HTTP routes

Defining the routes is pretty simple if you’re used to the Akka-HTTP routing DSL syntax.

Basic routes with Akka-HTTP
 1 val route = path("product" / JavaUUID) { id: ProductId =>
 2   get {
 3     ???
 4   } ~ put {
 5     ???
 6   }
 7 } ~ path("products") {
 8   get {
 9     ???
10   } ~
11   post {
12     ???
13   }
14 }

We will fill in the details later on. But now for starting the actual server to make use of our routes.

Starting an Akka-HTTP server
1 val host       = system.settings.config.getString("api.host")
2 val port       = system.settings.config.getInt("api.port")
3 val srv        = Http().bindAndHandle(route, host, port)
4 val pressEnter = StdIn.readLine()
5 srv.flatMap(_.unbind()).onComplete(_ => system.terminate())

The code will fire up a server using the defined routes and hostname and port from the configuration to start a server. It will run until you press enter and then terminate. Let us now visit the code for each routing endpoint. We will start with the one for returning a single product.

Returning a single product
 1 path("product" / JavaUUID) { id: ProductId =>
 2   get {
 3     complete {
 4       for {
 5         rows <- repo.loadProduct(id)
 6         prod <- Future { Product.fromDatabase(rows) }
 7       } yield prod
 8     }
 9   }
10 }

We load the raw product data from the repository and convert it into a proper product model. But to make the types align we have to wrap the second call in a Future otherwise we would get a compiler error. We don’t need to marshal the response because we are using the akka-http-json library which provides for example an ErrorAccumulatingCirceSupport import that handles this. Unless of course you do not have circe codecs defined for your types.

Updating a single product
1 val route = path("product" / JavaUUID) { id: ProductId =>
2   put {
3     entity(as[Product]) { p =>
4       complete {
5         repo.updateProduct(p)
6       }
7     }
8   }
9 }

The route for updating a product is also very simple. We’re extracting the product entity via the entity(as[T]) directive from the request body and simply give it to the appropriate repository function. Now onwards to creating a new product.

Creating a product
1 path("products") {
2   post {
3     entity(as[Product]) { p =>
4       complete {
5         repo.saveProduct(p)
6       }
7     }
8   }
9 }

As you can see the function is basically the same except that we’re calling a different function from the repository. Last but not least let us take a look at the return all products endpoint.

Return all products
 1 path("products") {
 2   get {
 3     complete {
 4       val products = for {
 5         rows <- repo.loadProducts()
 6         ps <- Future {
 7           rows.toList.groupBy(_._1).map {
 8             case (_, cols) => Product.fromDatabase(cols)
 9           }
10         }
11       } yield ps
12       products.map(_.toList.flatten)
13     }
14   }
15 }

This looks more complicated that the other endpoints. So what exactly are we doing here?
Well first we load the raw product data from the repository. Afterwards we convert it into the proper data model or to be more exact into a list of product entities.

The first thing that comes to mind is that we’re performing operations in memory. This is not different from the last time when we converted the data for a single product. Now however we’re talking about all products which may be a lot of data. Another obvious point is that we get a list of Option[Product] which we explicitly flatten at the end.

Maybe we should consider streaming the results. But we still have to group and combine the rows which belong to a single product into a product entity. Can we achieve that with streaming? Well, let’s look at our data flow.
We receive a list of 3 columns from the database in the following format: product id, language code, name. The tricky part being that multiple rows (list entries) can belong to the same product recognizable by the same value for the first column product id. At first we should simplify our problem by ensuring that the list will be sorted by the product id. This is done by adjusting the function loadProducts in the repository.

Sort the returned list of entries.
1 def loadProducts(): DatabasePublisher[(UUID, String, String)] = {
2   val program = for {
3     (p, ns) <- productsTable.join(namesTable).on(_.id === _.productId)
4                .sortBy(_._1.id)
5   } yield (p.id, ns.langCode, ns.name)
6   dbConfig.db.stream(program.result)
7 }

Now we can rely on the fact that we have seen all entries for one product if the product id in our list changes. Let’s adjust our code in the endpoint to make use of streaming now. Because Akka-HTTP is based on Akka-Streams we can simply use that.

Return all products as stream
 1 path("products") {
 2   get {
 3     implicit val jsonStreamingSupport: JsonEntityStreamingSupport =
 4       EntityStreamingSupport.json()
 5 
 6     val src = Source.fromPublisher(repo.loadProducts())
 7     val products: Source[Product, NotUsed] = src
 8       .collect(
 9         cs =>
10           Product.fromDatabase(Seq(cs)) match {
11             case Some(p) => p
12         }
13       )
14       .groupBy(Int.MaxValue, _.id)
15       .fold(Option.empty[Product])(
16         (op, x) => op.fold(x.some)(p =>
17           p.copy(names = p.names ::: x.names).some
18         )
19       )
20       .mergeSubstreams
21       .collect(
22         op =>
23           op match {
24             case Some(p) => p
25         }
26       )
27     complete(products)
28   }
29 }

Wow, this may look scary but let’s break it apart piece by piece. At first we need an implicit value which provides streaming support for JSON. Next we create a Source from the database stream. Now we implement the processing logic via the high level streams API. We collect every defined output of our helper function fromDatabase which leads to a stream of Product entities. But we have created way too many (Each product will be created as often as it has translations.). So we group our stream by the product id which creates a new stream for each product id holding only the entities for the specific product. We fold over each of these streams by merging together the list of translations (names). Afterwards we merge the streams back together and run another collect function to simply get a result stream of Product and not of Option[Product]. Last but not least the stream is passed to the complete function which will do the right thing.

Problems with the solution

The solution has two problems:

  1. The number of individual streams (and thus products) is limited to Int.MaxValue.
  2. The groupBy operator holds the references to these streams in memory opening a possible out of memory issue here.

As the first problem is simply related to the usage of groupBy we may say that we only have one problem: The usage of groupBy. ;-)
For a limited amount of data the proposed solution is perfectly fine so we will leave it as is for now.

Regarding the state of our service we have a working solution, so congratulations and let’s move on to the pure implementation.