Saturday, December 26, 2015

Divided We Win: an event sourcing / CQRS prospective on write and read models separation. Queries.

Quite a while ago we have started to explore command query responsibility segregation (CQRS) architecture as an alternative way to develop distributed systems. Last time we have covered only commands and events but not queries. The goal of this blog post is to fill the gap and discuss the ways to handle queries following CQRS architecture.

We will start where we left off last time, with the sample application which was able to handle commands and persist events in the journal. In order to support read path, or queries, we are going to introduce data store. For the sake of keeping things simple, let it be in-memory H2 database. The data access layer is going to be handled by awesome Slick library.

To begin with, we have to come up with a simple data model for the User class, managed by UserAggregate persistent actor. In this regards, the Users class is a typical mapping of the relational table:

class Users(tag: Tag) extends Table[User](tag, "USERS") {
  def id = column[String]("ID", O.PrimaryKey)
  def email = column[String]("EMAIL", O.Length(512))
  def uniqueEmail = index("USERS_EMAIL_IDX", email, true)
  def * = (id, email) <> (User.tupled, User.unapply)
}
It is important to notice at this point that we enforce uniqueness constraint on User's email. We will come back to this subtle detail later, during the integration with UserAggregate persistent actor. Next thing we need is a service to manage data store access, namely persisting and querying Users. As we are in the Akka universe, obviously it is going to be an actor as well. Here it is:
case class CreateSchema()
case class FindUserByEmail(email: String)
case class UpdateUser(id: String, email: String)
case class FindAllUsers()

trait Persistence {
  val users = TableQuery[Users]  
  val db = Database.forConfig("db")
}

class PersistenceService extends Actor with ActorLogging with Persistence {
  import scala.concurrent.ExecutionContext.Implicits.global
   
  def receive = {
    case CreateSchema => db.run(DBIO.seq(users.schema.create))
      
    case UpdateUser(id, email) => {
      val query = for { user <- users if user.id === id } yield user.email
      db.run(users.insertOrUpdate(User(id, email)))
    }
    
    case FindUserByEmail(email) => {
      val replyTo = sender
      db.run(users.filter( _.email === email.toLowerCase).result.headOption) 
        .onComplete { replyTo ! _ }
    }
    
    case FindAllUsers => {
      val replyTo = sender
      db.run(users.result) onComplete { replyTo ! _ }
    }
  }
}
Please notice that PersistenceService is regular untyped Akka actor, not a persistent one. To keep things focused, we are going to support only four kind of messages:
  • CreateSchema to initialize database schema
  • UpdateUser to update user's email address
  • FindUserByEmail to query user by its email address
  • FindAllUsers to query all users in the data store

Good, the data store services are ready but nothing really fills them with data. Moving on to the next step, we will refactor UserAggregate, more precisely the way it handles UserEmailUpdate command. At its current implementation, user's email update happens unconditionally. But remember, we imposed uniqueness constraints on emails so we are going to change command logic to account for that: before actually performing the update we will run the query against read model (data store) to make sure no user which such email is already registered.

val receiveCommand: Receive = {
  case UserEmailUpdate(email) => 
    try {
      val future = (persistence ? FindUserByEmail(email)).mapTo[Try[Option[User]]]
      val result = Await.result(future, timeout.duration) match {
        case Failure(ex) => Error(id, ex.getMessage)
        case Success(Some(user)) if user.id != id => Error(id, s"Email '$email' already registered")
        case _ => persist(UserEmailUpdated(id, email)) { event =>
          updateState(event)
          persistence ! UpdateUser(id, email)
        }
        Acknowledged(id)
      }
        
      sender ! result
  } catch {
    case ex: Exception if NonFatal(ex) => sender ! Error(id, ex.getMessage) 
  }
}
Pretty simple, but not really idiomatic: the Await.result does not look as it belongs to this code. My first attempt was to use future / map / recover / pipeTo pipeline to keep the flow completely asynchronous. However, the side effect I have observed is that in this case the persist(UserEmailUpdated(id, email)) { event => ... } block is supposed to be executed in another thread most of the time (if result is not ready) but it didn't, very likely because of thread context switch. So the Await.result was here to the rescue.

Now every time user's email update happens, along with persisting the event we are going to record this fact in the data store as well. Nice, we are getting one step closer.

The last thing we have to consider is how to populate the data store from the event journal? Another experimental module from Akka Persistence portfolio is of great help here, Akka Persistence Query. Among other features, it provides the ability to query the event journal by persistence identifiers and this is what we are going to do in order to populate data store from the journal. Not a surprise, there would be UserJournal actor responsible for that.

case class InitSchema()

class UserJournal(persistence: ActorRef) extends Actor with ActorLogging {
  def receive = {
    case InitSchema => {
      val journal = PersistenceQuery(context.system)
        .readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
      val source = journal.currentPersistenceIds()
      
      implicit val materializer = ActorMaterializer()
      source
        .runForeach { persistenceId => 
          journal.currentEventsByPersistenceId(persistenceId, 0, Long.MaxValue)
            .runForeach { event => 
              event.event match {
                case UserEmailUpdated(id, email) => persistence ! UpdateUser(id, email)
              }
            }
        }
    }
  }
}
Basically, the code is simple enough but let us reiterate a bit on what it does. First thing, we ask the journal about all persistence identifiers it has using currentPersistenceIds method. Secondly, for every persistence identifier we query all the events from the journal. Because there is only one event in our case, UserEmailUpdated, we just directly transform it into data store services UpdateUser message.

Awesome, we are mostly done! The simplest thing at the end is to add another endpoint to UserRoute which returns the list of the existing users by querying the read model.

pathEnd {
  get {
    complete {
      (persistence ? FindAllUsers).mapTo[Try[Vector[User]]] map { 
        case Success(users) => 
          HttpResponse(status = OK, entity = users.toJson.compactPrint)
        case Failure(ex) => 
          HttpResponse(status = InternalServerError, entity = ex.getMessage)
      }
    }
  }
}
We are ready to give our revamped CQRS sample application a test drive! Once it is up and running, let ensure that our journal and data store are empty.
$ curl -X GET http://localhost:38080/api/v1/users
[]
Make sense, as we haven't created any users yet. Let us do that by updating two users with different email addresses and querying the read model again.
$ curl -X PUT http://localhost:38080/api/v1/users/123 -d email=a@b.com
Email updated: a@b.com

$ curl -X PUT http://localhost:38080/api/v1/users/124 -d email=a@c.com
Email updated: a@c.com

$ curl -X GET http://localhost:38080/api/v1/users
[
  {"id":"123","email":"a@b.com"},
  {"id":"124","email":"a@c.com"}
]
As expected, this time the result is different and we can see two users are being returned. The moment of truth, let us try to update the email of user with 124 with the one of user with 123.
$ curl -X PUT http://localhost:38080/api/v1/users/123 -d email=a@c.com
Email 'a@c.com' already registered
Teriffic, this is what we wanted! The read (or query) model is very helpful and works just fine. Please notice when we restart the application, the read data store should be repopulated from the journal and return the previously created users:
$ curl -X GET http://localhost:38080/api/v1/users
[
  {"id":"123","email":"a@b.com"},
  {"id":"124","email":"a@c.com"}
]

With this post coming to the end, we are wrapping up the introduction into CQRS architecture. Although one may say many things are left out of scope, I hope the examples presented along our journey were useful to illustrate the idea and CQRS could be an interesting option to consider for your next projects.

As always, the complete source code is available on GitHub. Many thanks to Regis Leray and Esfandiar Amirrahimi, two brilliant developers, for helping out a lot with this series of blog posts.

Monday, October 5, 2015

Creating sample HTTPS server for fun and profit

Often during development or/and testing against real-life scenarios we, developers, are facing a need to run a full-fledged HTTPS server, possibly doing some mocking at the same time. On JVM platform, it used to be not trivial at all unless you know the right tool for the job. In this post we are going to create a skeleton of fully operational HTTPS server using terrific Spray framework and Scala language.

To begin with, we need to generate x509 certificate and private key respectively. Luckily, it is very easy to do using openssl command line tool.

openssl req 
    -x509 
    -sha256 
    -newkey rsa:2048 
    -keyout certificate.key 
    -out certificate.crt 
    -days 1024 
    -nodes

As we are on JVM platform, our essential goal is to have a Java keystore (JKS), a repository of security certificates. However, to import our newly generated certificate into JKS, we have to export it in PKCS #12 format and then create keystore out of it. Again, openssl on the resque.

openssl pkcs12 
    -export 
    -in certificate.crt 
    -inkey certificate.key 
    -out server.p12 
    -name sample-https-server 
    -password pass:change-me-please
Please note that the archive server.p12 is protected by a password. Now, the final step involves the command line tool from JDK distribution called keytool.
keytool -importkeystore 
    -srcstorepass change-me-please 
    -destkeystore sample-https-server.jks 
    -deststorepass change-me-please 
    -srckeystore server.p12 
    -srcstoretype PKCS12 
    -alias sample-https-server
The result is password-protected sample-https-server.jks keystore which we can use in our HTTPS server application to configure SSL context. Spray has very good documentation and plenty of examples available, one of those is sample SslConfiguration which we can use to configure KeyManager, TrustManager and SSLContext.
trait SslConfiguration {
  // If there is no SSLContext in scope implicitly, the default SSLContext is 
  // going to be used. But we want non-default settings so we are making 
  // custom SSLContext available here.
  implicit def sslContext: SSLContext = {
    val keyStoreResource = "/sample-https-server.jks"
    val password = "change-me-please"

    val keyStore = KeyStore.getInstance("jks")
    keyStore.load(getClass.getResourceAsStream(keyStoreResource), password.toCharArray)
    val keyManagerFactory = KeyManagerFactory.getInstance("SunX509")
    keyManagerFactory.init(keyStore, password.toCharArray)
    val trustManagerFactory = TrustManagerFactory.getInstance("SunX509")
    trustManagerFactory.init(keyStore)
    val context = SSLContext.getInstance("TLS")
    context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom)
    context
  }

  // If there is no ServerSSLEngineProvider in scope implicitly, 
  // the default one is going to be used. But we would like to configure
  // cipher suites and protocols  so we are making a custom ServerSSLEngineProvider
  // available here.
  implicit def sslEngineProvider: ServerSSLEngineProvider = {
    ServerSSLEngineProvider { engine =>
      engine.setEnabledCipherSuites(Array("TLS_RSA_WITH_AES_128_CBC_SHA"))
      engine.setEnabledProtocols(Array( "TLSv1", "TLSv1.1", "TLSv1.2" ))
      engine
    }
  }
}
There are a few points to highlight here. First of all, usage of our own keystore created previously (which for convenience we are loading as classpath resource):
val keyStoreResource = "/sample-https-server.jks"
val password = "change-me-please"
Also, we are configuring TLS only (TLS v1.0, TLS v1.1 and TLS v1.2), no SSLv3 support. In addition to that, we are enabling only one cipher: TLS_RSA_WITH_AES_128_CBC_SHA. It has been done mostly for illustration, as in most cases all supported ciphers could be enabled.
engine.setEnabledCipherSuites(Array("TLS_RSA_WITH_AES_128_CBC_SHA"))
engine.setEnabledProtocols(Array( "TLSv1", "TLSv1.1", "TLSv1.2" ))

With that, we are ready to create a real HTTPS server, which thanks to Spray framework is just a couple of lines long:

class HttpsServer(val route: Route = RestService.defaultRoute) extends SslConfiguration {
  implicit val system = ActorSystem()
  implicit val timeout: Timeout = 3 seconds 

  val settings = ServerSettings(system).copy(sslEncryption = true)
  val handler = system.actorOf(Props(new RestService(route)), name = "handler")

  def start(port: Int) = Await.ready(
    IO(Http) ? Http.Bind(handler, interface = "localhost", port = port, settings = Some(settings)), 
    timeout.duration)
      
  def stop() = {
    IO(Http) ? Http.CloseAll
    system.stop(handler)
  }
}

Any HTTPS server which does nothing at all is not very useful. That is where route property comes into play: using Spray routing extensions, we are passing the mappings (or routes) to handle the requests straight to HTTP service actor (RestService).

class RestService(val route: Route) extends HttpServiceActor with ActorLogging {
  def receive = runRoute {
    route
  }
}
With default route being just that:
object RestService {
  val defaultRoute = path("") {
    get {
      complete {
        "OK!\n"
      }
    }
  }
}
Basically, that is all we need and our HTTPS server is ready to take a test drive! The simplest way to run it is by using Scala application.
object HttpsServer extends App {
  val server = new HttpsServer
  server.start(10999)
}

Despite being written in Scala, we can easily embed it into any Java application (using a bit non-standard for Java developers naming conventions), for example:

public class HttpsServerRunner {
    public static void main(String[] args) {
        final HttpsServer server = new HttpsServer(RestService$.MODULE$.defaultRoute());
        server.start(10999);
    }
}

Once up and running (the easiest way to do that is sbt run), the exposed default route of our simple HTTPS server could be accessed either from the browser or using curl command line client (-k command line argument turns off SSL certificate verification):

$ curl -ki https://localhost:10999

HTTP/1.1 200 OK
Server: spray-can/1.3.3
Date: Sun, 04 Oct 2015 01:25:47 GMT
Content-Type: text/plain; charset=UTF-8
Content-Length: 4

OK!
Alternatively, the certificate could be passed along with the curl command so a complete SSL certificate verification takes place, for example:
$  curl -i --cacert src/main/resources/certificate.crt  https://localhost:10999

HTTP/1.1 200 OK
Server: spray-can/1.3.3
Date: Sun, 04 Oct 2015 01:28:05 GMT
Content-Type: text/plain; charset=UTF-8
Content-Length: 4

OK!

All is looking great but could we use HTTPS server as part of integration test suite to verify / stub / mock, for example, the interactions with third-party services? The answer is, yes, absolutely, thanks to JUnit rules. Let us take a look on the simplest implementation possible of HttpsServerRule:

class HttpsServerRule(@BeanProperty val port: Int, val route: Route) 
    extends ExternalResource {
  val server = new HttpsServer(route)
  override def before() = server.start(port)
  override def after() = server.stop()
}

object HttpsServerRule {
  def apply(port: Int) = new HttpsServerRule(port, RestService.defaultRoute);
  def apply(port: Int, route: Route) = new HttpsServerRule(port, route);
}

The JUnit test case for our default implementation uses brilliant RestAssured library which provides a Java DSL for easy testing of REST services.

public class DefaultRestServiceTest {
    @Rule public HttpsServerRule server = 
        HttpsServerRule$.MODULE$.apply(65200);
 
    @Test
    public void testServerIsUpAndRunning() {
        given()
            .baseUri("https://localhost:" + server.getPort())
            .auth().certificate("/sample-https-server.jks", "change-me-please")
            .when()
            .get("/")
            .then()
            .body(containsString("OK!"));
    }
}

For sure, not much you can do with default implementation so providing the custom one is a must-have option. Luckily, we sorted that out early on by accepting the routes.

object CustomRestService {
  val route = 
    path("api" / "user" / IntNumber) { id =>
      get {
        complete {
          "a@b.com"
        }
      }
    }
}

And here is a test case for it:

public class CustomRestServiceTest {
    @Rule public HttpsServerRule server = 
        HttpsServerRule$.MODULE$.apply(65201, CustomRestService$.MODULE$.route());
 
    @Test
    public void testServerIsUpAndRunning() {
        given()
            .baseUri("https://localhost:" + server.getPort())
            .auth().certificate("/sample-https-server.jks", "change-me-please")
            .when()
            .get("/api/user/1")
            .then()
            .body(containsString("a@b.com"));
    }
}

As it turns out, creating a full-blown HTTPS server is not hard at all and could be really fun, once you know the right tool to do that. Spray framework is one of those magic tools. As many of you are aware, Spray is going to be replaced by Akka HTTP which had seen a 1.0 release recently but at the moment lacks a lot of features (including HTTPS support), keeping Spray as a viable choice.

The complete project is available on Github.

Tuesday, August 25, 2015

Divided We Win: an event sourcing / CQRS prospective on write and read models separation. Commands and Events.

In today's post we are going to unveil some very interesting (in my opinion) architecture styles: event sourcing and command query responsibility segregation (CQRS). Essentially, in both of them events are in the heart of the system design and reflect any changes of the state which are happening. It is quite different from the traditional CRUD architecture where usually only the last known state is kept, with any historical background effectively lost.

Another appealing option which CQRS brings on the table is a natural separation of the write model (state modification initiated by commands and resulted in events) and read model (query for, in most case, last know state). It gives you a lot of freedom in picking the right data storage (or storages) for serving a different application use cases and demands. But, as always, there is no free lunch: the price to pay is the increased complexity of the system.

To keep the post reasonably short, the whole discussion is split into two parts: Commands and Events (this one), and Queries (upcoming). Using very simple model of the user entity as an example, we are going to design the application which uses CQRS architecture. There are few libraries available on the JVM platform, but the one we are going to use is Akka, more precisely its two recently added components Akka Persistence and Akka HTTP. Please do not worry if Akka is somewhat new to you, the examples are going to be really basic and (hopefully) easy to follow. So let us get started!

As we mentioned earlier, the changes in our system are happening as the result of a command and may lead to zero or more events. Let us model that by defining two traits:

trait Event
trait Command

In turn, those events may trigger state changes within application entities so let us model that with a generic trait as well:

trait State[T] {
  def updateState(event: Event): State[T]
}

Quite simple so far. Now, as we are modeling a user, UserAggregate is going to constitute our model and be responsible for applying updates to a particular User, identifiable by its id. Also, every user is going to have an email property which could be changed upon receiving UserEmailUpdate command and could cause UserEmailUpdated event to be created as the result. The following code snippet defines all that in terms of state, command and event within UserAggregate companion object.

object UserAggregate {
  case class User(id: String, email: String = "") extends State[User] {
    override def updateState(event: Event): State[User] = event match {
      case UserEmailUpdated(id, email) => copy(email = email)
    }
  }

  case class UserEmailUpdate(email: String) extends Command
  case class UserEmailUpdated(id: String, email: String) extends Event
}
For the readers familiar with CQRS and event sourcing, this example may look very naïve but I think it is good enough for grasping the basics.

Having our foundational blocks defined, it is a time to look on the most interesting part: accepting commands, transforming them into persisted events and applying the state changes, all of that are responsibilities of UserAggregate. In the context of CQRS and event sourcing, persisting events is a crucial capability of the system. Events are the single source of truth and the state of any single entity could be reconstructed to any point in time by replaying all the events relevant to it. This is a moment were Akka Persistence is joining the stage.

To take one step back, Akka is a brilliant library (or even toolkit) to build distributed systems based on actor model. On top of that, Akka Persistence enriches actors with persistence capabilities, letting them to store their messages (or events) in the durable journal. One might say that journal can grow very, very large and replaying all the events to reconstruct the state could take a lot of time. It is a valid point so Akka Persistence also adds the capability to store persistent snapshots in the durable storage as well. It speeds up the things a lot as only the events happening since the last snapshot should be replayed. By default, LevelDB is a durable storage engine used by Akka Persistence but it is pluggable feature with many alternatives available.

With that, let us take a look on UserAggregate actor which is going to be responsible for managing a User entity.

class UserAggregate(id: String) extends PersistentActor with ActorLogging {
  import UserAggregate._

  override def persistenceId = id
  var state: State[User] = User(id)

  def updateState(event: Event): Unit = {
    state = state.updateState(event)
  }
  
  val receiveCommand: Receive = {
    case UserEmailUpdate(email) => {
      persist(UserEmailUpdated(id, email)) { event =>
        updateState(event)
        sender ! Acknowledged(id)
      }
    }
  }

  override def receiveRecover: Receive = {
    case event: Event => updateState(event)
    case SnapshotOffer(_, snapshot: User) => state = snapshot
  }
}

So far it is the most complicated part so let us dissect the key pieces. First of all, UserAggregate extends PersistentActor, which adds the persistent capabilities to it. Second, every persistent actor must have an unique persistenceId: it is used as an identifier inside events journal and snapshot storage. And lastly, in contrast to regular Akka actors, persistent actors do have two entry points: receiveCommand to process commands and receiveRecover to replay events.

Getting back to our example, once the UserAggregate receives UserEmailUpdate command, first of all it persists the UserEmailUpdated event in the journal using persist(...) call and then updates aggregate's state using updateState(...) call, replying with acknowledgement to the sender. To see the complete example in action, let us create a simple REST endpoint using another great project from Akka's family, Akka HTTP, emerged from terrific Spray framework.

For now, we are going to define a simple route to handle a PUT request at /api/v1/users/{id} location, where {id} essentially is a placeholder for user identifier. It will accept a single form-encoded parameter email to update user's email address.

object UserRoute {
  import scala.concurrent.ExecutionContext.Implicits.global
  import scala.language.postfixOps
  
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val timeout: Timeout = 5 seconds

  val route = {
    logRequestResult("eventsourcing-example") {
      pathPrefix("api" / "v1" / "users") {
        path(LongNumber) { id =>
          (put & formFields('email.as[String])) { email =>
            complete {
              system
                .actorSelection(s"user/user-$id")
                .resolveOne
                .recover {
                  case _: ActorNotFound => 
                    system.actorOf(Props(new UserAggregate(id.toString)), s"user-$id")
                }
                .map {                 
                  _ ? UserEmailUpdate(email) map {
                    case Acknowledged(_) => 
                      HttpResponse(status = OK, entity = "Email updated: " + email)
                    case Error(_, message) => 
                      HttpResponse(status = Conflict, entity = message)
                  }
                }
            }
          }
        }
      }
    }
  }
}
The only missed piece is the runnable class to plug the handler for this REST endpoint so let us define one, thanks to Akka HTTP it is trivial:
object Boot extends App with DefaultJsonProtocol {
  Http().bindAndHandle(route, "localhost", 38080)
}
Nothing gives more assurance that the things are really working except an easy and readable test cases, built using ScalaTest framework complemented by Akka HTTP TestKit.
class UserRouteSpec extends FlatSpec 
      with ScalatestRouteTest 
      with Matchers 
      with BeforeAndAfterAll {
  import com.example.domain.user.UserRoute
  
  implicit def executionContext = scala.concurrent.ExecutionContext.Implicits.global

  "UserRoute" should "return success on email update" in {
    Put("http://localhost:38080/api/v1/users/123", FormData("email" ->  "a@b.com")) ~> UserRoute.route ~> check {
      response.status shouldBe StatusCodes.OK
      responseAs[String] shouldBe "Email updated: a@b.com"
    }
  }
}
Lastly, let us run the complete example and use curl from command line to perform a real call to the REST endpoint, forcing all the parts of the application to work together.
$ curl -X PUT http://localhost:38080/api/v1/users/123 -d email=a@b.com
Email updated: a@b.com
Nice! The results are matching our expectations. Before we finish up with this part, there is one more thing: please notice that when application is restarted, the state will be restored for the users for which aggregates did already exist. Another way to say it using Akka Persistence specifics, if event journal has events/snapshots associated with actor's persistenceId (here is why it should be unique and permanent), the recovery process takes place by applying more recent snapshot (if any) and replaying events.

In this post we looked behind the curtain of event sourcing and CQRS, covering only the write (or command) part of the architecture. There are many things to work on like for example, email uniqueness validation, user latest state retrieval, ... which represent the read (or query) flow of the application and are going to be discussed in the consequent blog post. As for now, I hope that event sourcing and CQRS are a bit demystified and might become a part of your future or existing applications.

Final disclaimer: Akka HTTP is marked as experimental component at the moment (nonetheless the main goals stand still, the API may change slightly) while Akka Persistence just graduated out of experimental status with recent Akka's 2.4.0-RC1 release. Please be aware of that.

The complete project is available on GitHub. Many thanks to Regis Leray and Esfandiar Amirrahimi, two brilliant developers, for helping out a lot with this series of blog posts.

Thursday, April 30, 2015

ApacheCon North America 2015: where friends meet

I have had a great pleasure to attend ApacheCon NA 2015 this year, which took place in the city of Austin, TX. By all means, it was an awesome conference and there are some emerging trends and interesting ideas I would like to share in this post.

ApacheCon is somewhat different from any other conference out there. And the main difference is the diversity of projects and topics being presented. It is a strength and a weakness in the same time: companies prefer to send people to specialized conferences which are dedicated to particular area / technology (good examples of those are Strata, JavaOne, SpringOne, ...). From other side, such a diversity encourages communication where often a new endless ideas are born. I personally have had a whole bunch of terrific conversations and met a lot of great people whom I can proudly refer as my friends now.

Getting closer to the matter, this time the conference had a 7 parallel tracks squeezed into 3 full days. The vast majority of the talks were about big/fast data and containerization / cloud deployment. It was clear that people are very interested in those topics as mostly every big data talk gathered quite a numerous crowd. Among the "big names" I have seen guys from Cloudera, Red Hat, Google, eBay, Pivotal and Linkedin.

There were a lot of brilliant talks but there are a couple of them which I was able to attend and would like to mention in particular:

I was very lucky to give a talk Apache CXF, Tika and Lucene: The Power of Search the JAX-RS Way as well where I presented a very interesting combination of Apache CXF, Apache Tika, Apache Lucene and Apache Olingo working together to quickly integrate search capabilities into new or existing web APIs.

And last but not least, I have discovered that Luke project, very well-known tool in the Apache Lucene community, is looking for contributors to help to support it and move it forward. If you are interested, please join.

Huge thank you to Apache Software Foundation for organizing such a great conference this year and giving me the chance to attend it. Keep up doing this terrific work guys!

Saturday, February 28, 2015

A fresh look on accessing database on JVM platform: Slick from Typesafe

In today's post we are going to open our mind, step away from traditional Java EE / Java SE JPA-based stack (which I think is great) and take a refreshing look on how to access database in your Java applications using the new kid on the block: Slick 2.1 from Typesafe. So if JPA is so great, why bother? Well, sometimes you need to do very simple things and there is no need to bring the complete, well modeled persistence layer for that. In here Slick shines.

In the essence, Slick is database access library, not an ORM. Though it is written in Scala, the examples we are going to look at do not require any particular knowledge of this excellent language (although, it is just Scala that made Slick possible to exist). Our relational database schema will have only two tables, customers and addresses, linked by one-to-many relationships. For simplicity, H2 has been picked as an in-memory database engine.

The first question which comes up is defining database tables (schema) and, naturally, database specific DDLs are the standard way of doing that. Can we do something about it and try another approach? If you are using Slick 2.1, the answer is yes, absolutely. Let us just describe our tables as Scala classes:

// The 'customers' relation table definition
class Customers( tag: Tag ) extends Table[ Customer ]( tag, "customers" ) {
  def id = column[ Int ]( "id", O.PrimaryKey, O.AutoInc )

  def email = column[ String ]( "email", O.Length( 512, true ), O.NotNull )
  def firstName = column[ String ]( "first_name", O.Length( 256, true ), O.Nullable )
  def lastName = column[ String ]( "last_name", O.Length( 256, true ), O.Nullable )

  // Unique index for customer's email
  def emailIndex = index( "idx_email", email, unique = true )
}

Very easy and straightforward, resembling a lot typical CREATE TABLE construct. The addresses table is going to be defined the same way and reference users table by foreign key.

// The 'customers' relation table definition
class Addresses( tag: Tag ) extends Table[ Address ]( tag, "addresses" ) {
  def id = column[ Int ]( "id", O.PrimaryKey, O.AutoInc )

  def street = column[ String ]( "street", O.Length( 100, true ), O.NotNull )
  def city = column[ String ]( "city", O.Length( 50, true ), O.NotNull )
  def country = column[ String ]( "country", O.Length( 50, true ), O.NotNull )

  // Foreign key to 'customers' table
  def customerId = column[Int]( "customer_id", O.NotNull )
  def customer = foreignKey( "customer_fk", customerId, Customers )( _.id )
}

Great, leaving off some details, that is it: we have defined two database tables in pure Scala. But details are important and we are going to look closely on following two declarations: Table[ Customer ] and Table[ Address ]. Essentially, each table could be represented as a tuple with as many elements as column it has defined. For example, customers is a tuple of (Int, String, String, String), while addresses table is a tuple of (Int, String, String, String, Int). Tuples in Scala are great but they are not very convenient to work with. Luckily, Slick allows to use case classes instead of tuples by providing so called Lifted Embedding technique. Here are our Customer and Address case classes:

case class Customer( id: Option[Int] = None, email: String, 
  firstName: Option[ String ] = None, lastName: Option[ String ] = None)

case class Address( id: Option[Int] = None,  street: String, city: String, 
  country: String, customer: Customer )

The last but not least question is how Slick is going to convert from tuples to case classes and vice-versa? It would be awesome to have such conversion out-of-the box but at this stage Slick needs a bit of help. Using Slick terminology, we are going to shape * table projection (which corresponds to SELECT * FROM ... SQL construct). Let see how it looks like for customers:

// Converts from Customer domain instance to table model and vice-versa
def * = ( id.?, email, firstName.?, lastName.? ).shaped <> ( 
  Customer.tupled, Customer.unapply )

For addresses table, shaping looks a little bit more verbose due to the fact that Slick does not have a way to refer to Customer case class instance by foreign key. Still, it is pretty straightforward, we just construct temporary Customer from its identifier.

// Converts from Customer domain instance to table model and vice-versa
def * = ( id.?, street, city, country, customerId ).shaped <> ( 
  tuple => {
    Address.apply(
      id = tuple._1,
      street = tuple._2,
      city = tuple._3,
      country = tuple._4,
      customer = Customer( Some( tuple._5 ), "" )
    )
  }, {
    (address: Address) =>
      Some { (
        address.id,
        address.street,
        address.city,
        address.country,
        address.customer.id getOrElse 0 
      )
    }
  }
)

Now, when all details have been explained, how can we materialize our Scala table definitions into the real database tables? Thankfully to Slick, it is a easy as that:

implicit lazy val DB = Database.forURL( "jdbc:h2:mem:test", driver = "org.h2.Driver" )
  
DB withSession { implicit session =>
  ( Customers.ddl ++ Addresses.ddl ).create
}

Slick has many ways to query and update data in database. The most beautiful and powerful one is just using pure functional constructs of the Scala language. The easiest way of doing that is by defining companion object and implement typical CRUD operations in it. For example, here is the method which inserts new customer record into customers table:

object Customers extends TableQuery[ Customers ]( new Customers( _ ) ) {
  def create( customer: Customer )( implicit db: Database ): Customer = 
    db.withSession { implicit session =>
      val id = this.autoIncrement.insert( customer )
      customer.copy( id = Some( id ) )
    } 
}

And it could be used like this:

Customers.create( Customer( None, "tom@b.com",  Some( "Tom" ), Some( "Tommyknocker" ) ) )
Customers.create( Customer( None, "bob@b.com",  Some( "Bob" ), Some( "Bobbyknocker" ) ) )

Similarly, the family of find functions could be implemented using regular Scala for comprehension:

def findByEmail( email: String )( implicit db: Database ) : Option[ Customer ] = 
  db.withSession { implicit session =>
    ( for { customer <- this if ( customer.email === email.toLowerCase ) } 
      yield customer ) firstOption
  }
   
def findAll( implicit db: Database ): Seq[ Customer ] = 
  db.withSession { implicit session =>      
    ( for { customer <- this } yield customer ) list
  }

And here are usage examples:

val customers = Customers.findAll
val customer = Customers.findByEmail( "bob@b.com" )

Updates and deletes are a bit different though very simple as well, let us take a look on those:

def update( customer: Customer )( implicit db: Database ): Boolean = 
  db.withSession { implicit session =>
    val query = for { c <- this if ( c.id === customer.id ) } 
      yield (c.email, c.firstName.?, c.lastName.?)
    query.update(customer.email, customer.firstName, customer.lastName) > 0
  }
  
def remove( customer: Customer )( implicit db: Database ) : Boolean = 
  db.withSession { implicit session =>
    ( for { c <- this if ( c.id === customer.id ) } yield c ).delete > 0
  }

Let us see those two methods in action:

Customers.findByEmail( "bob@b.com" ) map { customer =>
  Customers.update( customer.copy( firstName = Some( "Tommy" ) ) )
  Customers.remove( customer )
}

Looks very neat. I am personally still learning Slick however I am pretty excited about it. It helps me to have things done much faster, enjoying the beauty of Scala language and functional programming. No doubts, the upcoming version 3.0 is going to bring even more interesting features, I am looking forward to it.

This post is just an introduction into world of Slick, a lot of implementation details and use cases have been left aside to keep it short and simple. However Slick's documentation is pretty good and please do not hesitate to consult it.

The complete project is available on GitHub.