Saturday, December 26, 2015

Divided We Win: an event sourcing / CQRS prospective on write and read models separation. Queries.

Quite a while ago we have started to explore command query responsibility segregation (CQRS) architecture as an alternative way to develop distributed systems. Last time we have covered only commands and events but not queries. The goal of this blog post is to fill the gap and discuss the ways to handle queries following CQRS architecture.

We will start where we left off last time, with the sample application which was able to handle commands and persist events in the journal. In order to support read path, or queries, we are going to introduce data store. For the sake of keeping things simple, let it be in-memory H2 database. The data access layer is going to be handled by awesome Slick library.

To begin with, we have to come up with a simple data model for the User class, managed by UserAggregate persistent actor. In this regards, the Users class is a typical mapping of the relational table:

class Users(tag: Tag) extends Table[User](tag, "USERS") {
  def id = column[String]("ID", O.PrimaryKey)
  def email = column[String]("EMAIL", O.Length(512))
  def uniqueEmail = index("USERS_EMAIL_IDX", email, true)
  def * = (id, email) <> (User.tupled, User.unapply)
}
It is important to notice at this point that we enforce uniqueness constraint on User's email. We will come back to this subtle detail later, during the integration with UserAggregate persistent actor. Next thing we need is a service to manage data store access, namely persisting and querying Users. As we are in the Akka universe, obviously it is going to be an actor as well. Here it is:
case class CreateSchema()
case class FindUserByEmail(email: String)
case class UpdateUser(id: String, email: String)
case class FindAllUsers()

trait Persistence {
  val users = TableQuery[Users]  
  val db = Database.forConfig("db")
}

class PersistenceService extends Actor with ActorLogging with Persistence {
  import scala.concurrent.ExecutionContext.Implicits.global
   
  def receive = {
    case CreateSchema => db.run(DBIO.seq(users.schema.create))
      
    case UpdateUser(id, email) => {
      val query = for { user <- users if user.id === id } yield user.email
      db.run(users.insertOrUpdate(User(id, email)))
    }
    
    case FindUserByEmail(email) => {
      val replyTo = sender
      db.run(users.filter( _.email === email.toLowerCase).result.headOption) 
        .onComplete { replyTo ! _ }
    }
    
    case FindAllUsers => {
      val replyTo = sender
      db.run(users.result) onComplete { replyTo ! _ }
    }
  }
}
Please notice that PersistenceService is regular untyped Akka actor, not a persistent one. To keep things focused, we are going to support only four kind of messages:
  • CreateSchema to initialize database schema
  • UpdateUser to update user's email address
  • FindUserByEmail to query user by its email address
  • FindAllUsers to query all users in the data store

Good, the data store services are ready but nothing really fills them with data. Moving on to the next step, we will refactor UserAggregate, more precisely the way it handles UserEmailUpdate command. At its current implementation, user's email update happens unconditionally. But remember, we imposed uniqueness constraints on emails so we are going to change command logic to account for that: before actually performing the update we will run the query against read model (data store) to make sure no user which such email is already registered.

val receiveCommand: Receive = {
  case UserEmailUpdate(email) => 
    try {
      val future = (persistence ? FindUserByEmail(email)).mapTo[Try[Option[User]]]
      val result = Await.result(future, timeout.duration) match {
        case Failure(ex) => Error(id, ex.getMessage)
        case Success(Some(user)) if user.id != id => Error(id, s"Email '$email' already registered")
        case _ => persist(UserEmailUpdated(id, email)) { event =>
          updateState(event)
          persistence ! UpdateUser(id, email)
        }
        Acknowledged(id)
      }
        
      sender ! result
  } catch {
    case ex: Exception if NonFatal(ex) => sender ! Error(id, ex.getMessage) 
  }
}
Pretty simple, but not really idiomatic: the Await.result does not look as it belongs to this code. My first attempt was to use future / map / recover / pipeTo pipeline to keep the flow completely asynchronous. However, the side effect I have observed is that in this case the persist(UserEmailUpdated(id, email)) { event => ... } block is supposed to be executed in another thread most of the time (if result is not ready) but it didn't, very likely because of thread context switch. So the Await.result was here to the rescue.

Now every time user's email update happens, along with persisting the event we are going to record this fact in the data store as well. Nice, we are getting one step closer.

The last thing we have to consider is how to populate the data store from the event journal? Another experimental module from Akka Persistence portfolio is of great help here, Akka Persistence Query. Among other features, it provides the ability to query the event journal by persistence identifiers and this is what we are going to do in order to populate data store from the journal. Not a surprise, there would be UserJournal actor responsible for that.

case class InitSchema()

class UserJournal(persistence: ActorRef) extends Actor with ActorLogging {
  def receive = {
    case InitSchema => {
      val journal = PersistenceQuery(context.system)
        .readJournalFor[LeveldbReadJournal](LeveldbReadJournal.Identifier)
      val source = journal.currentPersistenceIds()
      
      implicit val materializer = ActorMaterializer()
      source
        .runForeach { persistenceId => 
          journal.currentEventsByPersistenceId(persistenceId, 0, Long.MaxValue)
            .runForeach { event => 
              event.event match {
                case UserEmailUpdated(id, email) => persistence ! UpdateUser(id, email)
              }
            }
        }
    }
  }
}
Basically, the code is simple enough but let us reiterate a bit on what it does. First thing, we ask the journal about all persistence identifiers it has using currentPersistenceIds method. Secondly, for every persistence identifier we query all the events from the journal. Because there is only one event in our case, UserEmailUpdated, we just directly transform it into data store services UpdateUser message.

Awesome, we are mostly done! The simplest thing at the end is to add another endpoint to UserRoute which returns the list of the existing users by querying the read model.

pathEnd {
  get {
    complete {
      (persistence ? FindAllUsers).mapTo[Try[Vector[User]]] map { 
        case Success(users) => 
          HttpResponse(status = OK, entity = users.toJson.compactPrint)
        case Failure(ex) => 
          HttpResponse(status = InternalServerError, entity = ex.getMessage)
      }
    }
  }
}
We are ready to give our revamped CQRS sample application a test drive! Once it is up and running, let ensure that our journal and data store are empty.
$ curl -X GET http://localhost:38080/api/v1/users
[]
Make sense, as we haven't created any users yet. Let us do that by updating two users with different email addresses and querying the read model again.
$ curl -X PUT http://localhost:38080/api/v1/users/123 -d email=a@b.com
Email updated: a@b.com

$ curl -X PUT http://localhost:38080/api/v1/users/124 -d email=a@c.com
Email updated: a@c.com

$ curl -X GET http://localhost:38080/api/v1/users
[
  {"id":"123","email":"a@b.com"},
  {"id":"124","email":"a@c.com"}
]
As expected, this time the result is different and we can see two users are being returned. The moment of truth, let us try to update the email of user with 124 with the one of user with 123.
$ curl -X PUT http://localhost:38080/api/v1/users/123 -d email=a@c.com
Email 'a@c.com' already registered
Teriffic, this is what we wanted! The read (or query) model is very helpful and works just fine. Please notice when we restart the application, the read data store should be repopulated from the journal and return the previously created users:
$ curl -X GET http://localhost:38080/api/v1/users
[
  {"id":"123","email":"a@b.com"},
  {"id":"124","email":"a@c.com"}
]

With this post coming to the end, we are wrapping up the introduction into CQRS architecture. Although one may say many things are left out of scope, I hope the examples presented along our journey were useful to illustrate the idea and CQRS could be an interesting option to consider for your next projects.

As always, the complete source code is available on GitHub. Many thanks to Regis Leray and Esfandiar Amirrahimi, two brilliant developers, for helping out a lot with this series of blog posts.

Monday, October 5, 2015

Creating sample HTTPS server for fun and profit

Often during development or/and testing against real-life scenarios we, developers, are facing a need to run a full-fledged HTTPS server, possibly doing some mocking at the same time. On JVM platform, it used to be not trivial at all unless you know the right tool for the job. In this post we are going to create a skeleton of fully operational HTTPS server using terrific Spray framework and Scala language.

To begin with, we need to generate x509 certificate and private key respectively. Luckily, it is very easy to do using openssl command line tool.

openssl req 
    -x509 
    -sha256 
    -newkey rsa:2048 
    -keyout certificate.key 
    -out certificate.crt 
    -days 1024 
    -nodes

As we are on JVM platform, our essential goal is to have a Java keystore (JKS), a repository of security certificates. However, to import our newly generated certificate into JKS, we have to export it in PKCS #12 format and then create keystore out of it. Again, openssl on the resque.

openssl pkcs12 
    -export 
    -in certificate.crt 
    -inkey certificate.key 
    -out server.p12 
    -name sample-https-server 
    -password pass:change-me-please
Please note that the archive server.p12 is protected by a password. Now, the final step involves the command line tool from JDK distribution called keytool.
keytool -importkeystore 
    -srcstorepass change-me-please 
    -destkeystore sample-https-server.jks 
    -deststorepass change-me-please 
    -srckeystore server.p12 
    -srcstoretype PKCS12 
    -alias sample-https-server
The result is password-protected sample-https-server.jks keystore which we can use in our HTTPS server application to configure SSL context. Spray has very good documentation and plenty of examples available, one of those is sample SslConfiguration which we can use to configure KeyManager, TrustManager and SSLContext.
trait SslConfiguration {
  // If there is no SSLContext in scope implicitly, the default SSLContext is 
  // going to be used. But we want non-default settings so we are making 
  // custom SSLContext available here.
  implicit def sslContext: SSLContext = {
    val keyStoreResource = "/sample-https-server.jks"
    val password = "change-me-please"

    val keyStore = KeyStore.getInstance("jks")
    keyStore.load(getClass.getResourceAsStream(keyStoreResource), password.toCharArray)
    val keyManagerFactory = KeyManagerFactory.getInstance("SunX509")
    keyManagerFactory.init(keyStore, password.toCharArray)
    val trustManagerFactory = TrustManagerFactory.getInstance("SunX509")
    trustManagerFactory.init(keyStore)
    val context = SSLContext.getInstance("TLS")
    context.init(keyManagerFactory.getKeyManagers, trustManagerFactory.getTrustManagers, new SecureRandom)
    context
  }

  // If there is no ServerSSLEngineProvider in scope implicitly, 
  // the default one is going to be used. But we would like to configure
  // cipher suites and protocols  so we are making a custom ServerSSLEngineProvider
  // available here.
  implicit def sslEngineProvider: ServerSSLEngineProvider = {
    ServerSSLEngineProvider { engine =>
      engine.setEnabledCipherSuites(Array("TLS_RSA_WITH_AES_128_CBC_SHA"))
      engine.setEnabledProtocols(Array( "TLSv1", "TLSv1.1", "TLSv1.2" ))
      engine
    }
  }
}
There are a few points to highlight here. First of all, usage of our own keystore created previously (which for convenience we are loading as classpath resource):
val keyStoreResource = "/sample-https-server.jks"
val password = "change-me-please"
Also, we are configuring TLS only (TLS v1.0, TLS v1.1 and TLS v1.2), no SSLv3 support. In addition to that, we are enabling only one cipher: TLS_RSA_WITH_AES_128_CBC_SHA. It has been done mostly for illustration, as in most cases all supported ciphers could be enabled.
engine.setEnabledCipherSuites(Array("TLS_RSA_WITH_AES_128_CBC_SHA"))
engine.setEnabledProtocols(Array( "TLSv1", "TLSv1.1", "TLSv1.2" ))

With that, we are ready to create a real HTTPS server, which thanks to Spray framework is just a couple of lines long:

class HttpsServer(val route: Route = RestService.defaultRoute) extends SslConfiguration {
  implicit val system = ActorSystem()
  implicit val timeout: Timeout = 3 seconds 

  val settings = ServerSettings(system).copy(sslEncryption = true)
  val handler = system.actorOf(Props(new RestService(route)), name = "handler")

  def start(port: Int) = Await.ready(
    IO(Http) ? Http.Bind(handler, interface = "localhost", port = port, settings = Some(settings)), 
    timeout.duration)
      
  def stop() = {
    IO(Http) ? Http.CloseAll
    system.stop(handler)
  }
}

Any HTTPS server which does nothing at all is not very useful. That is where route property comes into play: using Spray routing extensions, we are passing the mappings (or routes) to handle the requests straight to HTTP service actor (RestService).

class RestService(val route: Route) extends HttpServiceActor with ActorLogging {
  def receive = runRoute {
    route
  }
}
With default route being just that:
object RestService {
  val defaultRoute = path("") {
    get {
      complete {
        "OK!\n"
      }
    }
  }
}
Basically, that is all we need and our HTTPS server is ready to take a test drive! The simplest way to run it is by using Scala application.
object HttpsServer extends App {
  val server = new HttpsServer
  server.start(10999)
}

Despite being written in Scala, we can easily embed it into any Java application (using a bit non-standard for Java developers naming conventions), for example:

public class HttpsServerRunner {
    public static void main(String[] args) {
        final HttpsServer server = new HttpsServer(RestService$.MODULE$.defaultRoute());
        server.start(10999);
    }
}

Once up and running (the easiest way to do that is sbt run), the exposed default route of our simple HTTPS server could be accessed either from the browser or using curl command line client (-k command line argument turns off SSL certificate verification):

$ curl -ki https://localhost:10999

HTTP/1.1 200 OK
Server: spray-can/1.3.3
Date: Sun, 04 Oct 2015 01:25:47 GMT
Content-Type: text/plain; charset=UTF-8
Content-Length: 4

OK!
Alternatively, the certificate could be passed along with the curl command so a complete SSL certificate verification takes place, for example:
$  curl -i --cacert src/main/resources/certificate.crt  https://localhost:10999

HTTP/1.1 200 OK
Server: spray-can/1.3.3
Date: Sun, 04 Oct 2015 01:28:05 GMT
Content-Type: text/plain; charset=UTF-8
Content-Length: 4

OK!

All is looking great but could we use HTTPS server as part of integration test suite to verify / stub / mock, for example, the interactions with third-party services? The answer is, yes, absolutely, thanks to JUnit rules. Let us take a look on the simplest implementation possible of HttpsServerRule:

class HttpsServerRule(@BeanProperty val port: Int, val route: Route) 
    extends ExternalResource {
  val server = new HttpsServer(route)
  override def before() = server.start(port)
  override def after() = server.stop()
}

object HttpsServerRule {
  def apply(port: Int) = new HttpsServerRule(port, RestService.defaultRoute);
  def apply(port: Int, route: Route) = new HttpsServerRule(port, route);
}

The JUnit test case for our default implementation uses brilliant RestAssured library which provides a Java DSL for easy testing of REST services.

public class DefaultRestServiceTest {
    @Rule public HttpsServerRule server = 
        HttpsServerRule$.MODULE$.apply(65200);
 
    @Test
    public void testServerIsUpAndRunning() {
        given()
            .baseUri("https://localhost:" + server.getPort())
            .auth().certificate("/sample-https-server.jks", "change-me-please")
            .when()
            .get("/")
            .then()
            .body(containsString("OK!"));
    }
}

For sure, not much you can do with default implementation so providing the custom one is a must-have option. Luckily, we sorted that out early on by accepting the routes.

object CustomRestService {
  val route = 
    path("api" / "user" / IntNumber) { id =>
      get {
        complete {
          "a@b.com"
        }
      }
    }
}

And here is a test case for it:

public class CustomRestServiceTest {
    @Rule public HttpsServerRule server = 
        HttpsServerRule$.MODULE$.apply(65201, CustomRestService$.MODULE$.route());
 
    @Test
    public void testServerIsUpAndRunning() {
        given()
            .baseUri("https://localhost:" + server.getPort())
            .auth().certificate("/sample-https-server.jks", "change-me-please")
            .when()
            .get("/api/user/1")
            .then()
            .body(containsString("a@b.com"));
    }
}

As it turns out, creating a full-blown HTTPS server is not hard at all and could be really fun, once you know the right tool to do that. Spray framework is one of those magic tools. As many of you are aware, Spray is going to be replaced by Akka HTTP which had seen a 1.0 release recently but at the moment lacks a lot of features (including HTTPS support), keeping Spray as a viable choice.

The complete project is available on Github.

Tuesday, August 25, 2015

Divided We Win: an event sourcing / CQRS prospective on write and read models separation. Commands and Events.

In today's post we are going to unveil some very interesting (in my opinion) architecture styles: event sourcing and command query responsibility segregation (CQRS). Essentially, in both of them events are in the heart of the system design and reflect any changes of the state which are happening. It is quite different from the traditional CRUD architecture where usually only the last known state is kept, with any historical background effectively lost.

Another appealing option which CQRS brings on the table is a natural separation of the write model (state modification initiated by commands and resulted in events) and read model (query for, in most case, last know state). It gives you a lot of freedom in picking the right data storage (or storages) for serving a different application use cases and demands. But, as always, there is no free lunch: the price to pay is the increased complexity of the system.

To keep the post reasonably short, the whole discussion is split into two parts: Commands and Events (this one), and Queries (upcoming). Using very simple model of the user entity as an example, we are going to design the application which uses CQRS architecture. There are few libraries available on the JVM platform, but the one we are going to use is Akka, more precisely its two recently added components Akka Persistence and Akka HTTP. Please do not worry if Akka is somewhat new to you, the examples are going to be really basic and (hopefully) easy to follow. So let us get started!

As we mentioned earlier, the changes in our system are happening as the result of a command and may lead to zero or more events. Let us model that by defining two traits:

trait Event
trait Command

In turn, those events may trigger state changes within application entities so let us model that with a generic trait as well:

trait State[T] {
  def updateState(event: Event): State[T]
}

Quite simple so far. Now, as we are modeling a user, UserAggregate is going to constitute our model and be responsible for applying updates to a particular User, identifiable by its id. Also, every user is going to have an email property which could be changed upon receiving UserEmailUpdate command and could cause UserEmailUpdated event to be created as the result. The following code snippet defines all that in terms of state, command and event within UserAggregate companion object.

object UserAggregate {
  case class User(id: String, email: String = "") extends State[User] {
    override def updateState(event: Event): State[User] = event match {
      case UserEmailUpdated(id, email) => copy(email = email)
    }
  }

  case class UserEmailUpdate(email: String) extends Command
  case class UserEmailUpdated(id: String, email: String) extends Event
}
For the readers familiar with CQRS and event sourcing, this example may look very naïve but I think it is good enough for grasping the basics.

Having our foundational blocks defined, it is a time to look on the most interesting part: accepting commands, transforming them into persisted events and applying the state changes, all of that are responsibilities of UserAggregate. In the context of CQRS and event sourcing, persisting events is a crucial capability of the system. Events are the single source of truth and the state of any single entity could be reconstructed to any point in time by replaying all the events relevant to it. This is a moment were Akka Persistence is joining the stage.

To take one step back, Akka is a brilliant library (or even toolkit) to build distributed systems based on actor model. On top of that, Akka Persistence enriches actors with persistence capabilities, letting them to store their messages (or events) in the durable journal. One might say that journal can grow very, very large and replaying all the events to reconstruct the state could take a lot of time. It is a valid point so Akka Persistence also adds the capability to store persistent snapshots in the durable storage as well. It speeds up the things a lot as only the events happening since the last snapshot should be replayed. By default, LevelDB is a durable storage engine used by Akka Persistence but it is pluggable feature with many alternatives available.

With that, let us take a look on UserAggregate actor which is going to be responsible for managing a User entity.

class UserAggregate(id: String) extends PersistentActor with ActorLogging {
  import UserAggregate._

  override def persistenceId = id
  var state: State[User] = User(id)

  def updateState(event: Event): Unit = {
    state = state.updateState(event)
  }
  
  val receiveCommand: Receive = {
    case UserEmailUpdate(email) => {
      persist(UserEmailUpdated(id, email)) { event =>
        updateState(event)
        sender ! Acknowledged(id)
      }
    }
  }

  override def receiveRecover: Receive = {
    case event: Event => updateState(event)
    case SnapshotOffer(_, snapshot: User) => state = snapshot
  }
}

So far it is the most complicated part so let us dissect the key pieces. First of all, UserAggregate extends PersistentActor, which adds the persistent capabilities to it. Second, every persistent actor must have an unique persistenceId: it is used as an identifier inside events journal and snapshot storage. And lastly, in contrast to regular Akka actors, persistent actors do have two entry points: receiveCommand to process commands and receiveRecover to replay events.

Getting back to our example, once the UserAggregate receives UserEmailUpdate command, first of all it persists the UserEmailUpdated event in the journal using persist(...) call and then updates aggregate's state using updateState(...) call, replying with acknowledgement to the sender. To see the complete example in action, let us create a simple REST endpoint using another great project from Akka's family, Akka HTTP, emerged from terrific Spray framework.

For now, we are going to define a simple route to handle a PUT request at /api/v1/users/{id} location, where {id} essentially is a placeholder for user identifier. It will accept a single form-encoded parameter email to update user's email address.

object UserRoute {
  import scala.concurrent.ExecutionContext.Implicits.global
  import scala.language.postfixOps
  
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  implicit val timeout: Timeout = 5 seconds

  val route = {
    logRequestResult("eventsourcing-example") {
      pathPrefix("api" / "v1" / "users") {
        path(LongNumber) { id =>
          (put & formFields('email.as[String])) { email =>
            complete {
              system
                .actorSelection(s"user/user-$id")
                .resolveOne
                .recover {
                  case _: ActorNotFound => 
                    system.actorOf(Props(new UserAggregate(id.toString)), s"user-$id")
                }
                .map {                 
                  _ ? UserEmailUpdate(email) map {
                    case Acknowledged(_) => 
                      HttpResponse(status = OK, entity = "Email updated: " + email)
                    case Error(_, message) => 
                      HttpResponse(status = Conflict, entity = message)
                  }
                }
            }
          }
        }
      }
    }
  }
}
The only missed piece is the runnable class to plug the handler for this REST endpoint so let us define one, thanks to Akka HTTP it is trivial:
object Boot extends App with DefaultJsonProtocol {
  Http().bindAndHandle(route, "localhost", 38080)
}
Nothing gives more assurance that the things are really working except an easy and readable test cases, built using ScalaTest framework complemented by Akka HTTP TestKit.
class UserRouteSpec extends FlatSpec 
      with ScalatestRouteTest 
      with Matchers 
      with BeforeAndAfterAll {
  import com.example.domain.user.UserRoute
  
  implicit def executionContext = scala.concurrent.ExecutionContext.Implicits.global

  "UserRoute" should "return success on email update" in {
    Put("http://localhost:38080/api/v1/users/123", FormData("email" ->  "a@b.com")) ~> UserRoute.route ~> check {
      response.status shouldBe StatusCodes.OK
      responseAs[String] shouldBe "Email updated: a@b.com"
    }
  }
}
Lastly, let us run the complete example and use curl from command line to perform a real call to the REST endpoint, forcing all the parts of the application to work together.
$ curl -X PUT http://localhost:38080/api/v1/users/123 -d email=a@b.com
Email updated: a@b.com
Nice! The results are matching our expectations. Before we finish up with this part, there is one more thing: please notice that when application is restarted, the state will be restored for the users for which aggregates did already exist. Another way to say it using Akka Persistence specifics, if event journal has events/snapshots associated with actor's persistenceId (here is why it should be unique and permanent), the recovery process takes place by applying more recent snapshot (if any) and replaying events.

In this post we looked behind the curtain of event sourcing and CQRS, covering only the write (or command) part of the architecture. There are many things to work on like for example, email uniqueness validation, user latest state retrieval, ... which represent the read (or query) flow of the application and are going to be discussed in the consequent blog post. As for now, I hope that event sourcing and CQRS are a bit demystified and might become a part of your future or existing applications.

Final disclaimer: Akka HTTP is marked as experimental component at the moment (nonetheless the main goals stand still, the API may change slightly) while Akka Persistence just graduated out of experimental status with recent Akka's 2.4.0-RC1 release. Please be aware of that.

The complete project is available on GitHub. Many thanks to Regis Leray and Esfandiar Amirrahimi, two brilliant developers, for helping out a lot with this series of blog posts.

Thursday, April 30, 2015

ApacheCon North America 2015: where friends meet

I have had a great pleasure to attend ApacheCon NA 2015 this year, which took place in the city of Austin, TX. By all means, it was an awesome conference and there are some emerging trends and interesting ideas I would like to share in this post.

ApacheCon is somewhat different from any other conference out there. And the main difference is the diversity of projects and topics being presented. It is a strength and a weakness in the same time: companies prefer to send people to specialized conferences which are dedicated to particular area / technology (good examples of those are Strata, JavaOne, SpringOne, ...). From other side, such a diversity encourages communication where often a new endless ideas are born. I personally have had a whole bunch of terrific conversations and met a lot of great people whom I can proudly refer as my friends now.

Getting closer to the matter, this time the conference had a 7 parallel tracks squeezed into 3 full days. The vast majority of the talks were about big/fast data and containerization / cloud deployment. It was clear that people are very interested in those topics as mostly every big data talk gathered quite a numerous crowd. Among the "big names" I have seen guys from Cloudera, Red Hat, Google, eBay, Pivotal and Linkedin.

There were a lot of brilliant talks but there are a couple of them which I was able to attend and would like to mention in particular:

I was very lucky to give a talk Apache CXF, Tika and Lucene: The Power of Search the JAX-RS Way as well where I presented a very interesting combination of Apache CXF, Apache Tika, Apache Lucene and Apache Olingo working together to quickly integrate search capabilities into new or existing web APIs.

And last but not least, I have discovered that Luke project, very well-known tool in the Apache Lucene community, is looking for contributors to help to support it and move it forward. If you are interested, please join.

Huge thank you to Apache Software Foundation for organizing such a great conference this year and giving me the chance to attend it. Keep up doing this terrific work guys!

Saturday, February 28, 2015

A fresh look on accessing database on JVM platform: Slick from Typesafe

In today's post we are going to open our mind, step away from traditional Java EE / Java SE JPA-based stack (which I think is great) and take a refreshing look on how to access database in your Java applications using the new kid on the block: Slick 2.1 from Typesafe. So if JPA is so great, why bother? Well, sometimes you need to do very simple things and there is no need to bring the complete, well modeled persistence layer for that. In here Slick shines.

In the essence, Slick is database access library, not an ORM. Though it is written in Scala, the examples we are going to look at do not require any particular knowledge of this excellent language (although, it is just Scala that made Slick possible to exist). Our relational database schema will have only two tables, customers and addresses, linked by one-to-many relationships. For simplicity, H2 has been picked as an in-memory database engine.

The first question which comes up is defining database tables (schema) and, naturally, database specific DDLs are the standard way of doing that. Can we do something about it and try another approach? If you are using Slick 2.1, the answer is yes, absolutely. Let us just describe our tables as Scala classes:

// The 'customers' relation table definition
class Customers( tag: Tag ) extends Table[ Customer ]( tag, "customers" ) {
  def id = column[ Int ]( "id", O.PrimaryKey, O.AutoInc )

  def email = column[ String ]( "email", O.Length( 512, true ), O.NotNull )
  def firstName = column[ String ]( "first_name", O.Length( 256, true ), O.Nullable )
  def lastName = column[ String ]( "last_name", O.Length( 256, true ), O.Nullable )

  // Unique index for customer's email
  def emailIndex = index( "idx_email", email, unique = true )
}

Very easy and straightforward, resembling a lot typical CREATE TABLE construct. The addresses table is going to be defined the same way and reference users table by foreign key.

// The 'customers' relation table definition
class Addresses( tag: Tag ) extends Table[ Address ]( tag, "addresses" ) {
  def id = column[ Int ]( "id", O.PrimaryKey, O.AutoInc )

  def street = column[ String ]( "street", O.Length( 100, true ), O.NotNull )
  def city = column[ String ]( "city", O.Length( 50, true ), O.NotNull )
  def country = column[ String ]( "country", O.Length( 50, true ), O.NotNull )

  // Foreign key to 'customers' table
  def customerId = column[Int]( "customer_id", O.NotNull )
  def customer = foreignKey( "customer_fk", customerId, Customers )( _.id )
}

Great, leaving off some details, that is it: we have defined two database tables in pure Scala. But details are important and we are going to look closely on following two declarations: Table[ Customer ] and Table[ Address ]. Essentially, each table could be represented as a tuple with as many elements as column it has defined. For example, customers is a tuple of (Int, String, String, String), while addresses table is a tuple of (Int, String, String, String, Int). Tuples in Scala are great but they are not very convenient to work with. Luckily, Slick allows to use case classes instead of tuples by providing so called Lifted Embedding technique. Here are our Customer and Address case classes:

case class Customer( id: Option[Int] = None, email: String, 
  firstName: Option[ String ] = None, lastName: Option[ String ] = None)

case class Address( id: Option[Int] = None,  street: String, city: String, 
  country: String, customer: Customer )

The last but not least question is how Slick is going to convert from tuples to case classes and vice-versa? It would be awesome to have such conversion out-of-the box but at this stage Slick needs a bit of help. Using Slick terminology, we are going to shape * table projection (which corresponds to SELECT * FROM ... SQL construct). Let see how it looks like for customers:

// Converts from Customer domain instance to table model and vice-versa
def * = ( id.?, email, firstName.?, lastName.? ).shaped <> ( 
  Customer.tupled, Customer.unapply )

For addresses table, shaping looks a little bit more verbose due to the fact that Slick does not have a way to refer to Customer case class instance by foreign key. Still, it is pretty straightforward, we just construct temporary Customer from its identifier.

// Converts from Customer domain instance to table model and vice-versa
def * = ( id.?, street, city, country, customerId ).shaped <> ( 
  tuple => {
    Address.apply(
      id = tuple._1,
      street = tuple._2,
      city = tuple._3,
      country = tuple._4,
      customer = Customer( Some( tuple._5 ), "" )
    )
  }, {
    (address: Address) =>
      Some { (
        address.id,
        address.street,
        address.city,
        address.country,
        address.customer.id getOrElse 0 
      )
    }
  }
)

Now, when all details have been explained, how can we materialize our Scala table definitions into the real database tables? Thankfully to Slick, it is a easy as that:

implicit lazy val DB = Database.forURL( "jdbc:h2:mem:test", driver = "org.h2.Driver" )
  
DB withSession { implicit session =>
  ( Customers.ddl ++ Addresses.ddl ).create
}

Slick has many ways to query and update data in database. The most beautiful and powerful one is just using pure functional constructs of the Scala language. The easiest way of doing that is by defining companion object and implement typical CRUD operations in it. For example, here is the method which inserts new customer record into customers table:

object Customers extends TableQuery[ Customers ]( new Customers( _ ) ) {
  def create( customer: Customer )( implicit db: Database ): Customer = 
    db.withSession { implicit session =>
      val id = this.autoIncrement.insert( customer )
      customer.copy( id = Some( id ) )
    } 
}

And it could be used like this:

Customers.create( Customer( None, "tom@b.com",  Some( "Tom" ), Some( "Tommyknocker" ) ) )
Customers.create( Customer( None, "bob@b.com",  Some( "Bob" ), Some( "Bobbyknocker" ) ) )

Similarly, the family of find functions could be implemented using regular Scala for comprehension:

def findByEmail( email: String )( implicit db: Database ) : Option[ Customer ] = 
  db.withSession { implicit session =>
    ( for { customer <- this if ( customer.email === email.toLowerCase ) } 
      yield customer ) firstOption
  }
   
def findAll( implicit db: Database ): Seq[ Customer ] = 
  db.withSession { implicit session =>      
    ( for { customer <- this } yield customer ) list
  }

And here are usage examples:

val customers = Customers.findAll
val customer = Customers.findByEmail( "bob@b.com" )

Updates and deletes are a bit different though very simple as well, let us take a look on those:

def update( customer: Customer )( implicit db: Database ): Boolean = 
  db.withSession { implicit session =>
    val query = for { c <- this if ( c.id === customer.id ) } 
      yield (c.email, c.firstName.?, c.lastName.?)
    query.update(customer.email, customer.firstName, customer.lastName) > 0
  }
  
def remove( customer: Customer )( implicit db: Database ) : Boolean = 
  db.withSession { implicit session =>
    ( for { c <- this if ( c.id === customer.id ) } yield c ).delete > 0
  }

Let us see those two methods in action:

Customers.findByEmail( "bob@b.com" ) map { customer =>
  Customers.update( customer.copy( firstName = Some( "Tommy" ) ) )
  Customers.remove( customer )
}

Looks very neat. I am personally still learning Slick however I am pretty excited about it. It helps me to have things done much faster, enjoying the beauty of Scala language and functional programming. No doubts, the upcoming version 3.0 is going to bring even more interesting features, I am looking forward to it.

This post is just an introduction into world of Slick, a lot of implementation details and use cases have been left aside to keep it short and simple. However Slick's documentation is pretty good and please do not hesitate to consult it.

The complete project is available on GitHub.

Sunday, December 7, 2014

Beyond the JAX-RS spec: Apache CXF search extension

In today's post we are going to look beyond the JAX-RS 2.0 specification and explore the useful extensions which Apache CXF, one of the popular JAX-RS 2.0 implementations, is offering to the developers of REST services and APIs. In particular, we are going to talk about search extension using subset of the OData 2.0 query filters.

In the nutshell, search extension just maps some kind of the filter expression to a set of matching typed entities (instances of Java classes). The OData 2.0 query filters may be very complex however at the moment Apache CXF supports only subset of them:

Operator Description Example
eq Equal city eq ‘Redmond’
ne Not equal city ne ‘London’
gt Greater than price gt 20
ge Greater than or equal price ge 10
lt Less than price lt 20
le Less than or equal price le 100
and Logical and price le 200 and price gt 3.5
or Logical or price le 3.5 or price gt 200

Basically, to configure and activate the search extension for your JAX-RS services it is enough to define two properties, search.query.parameter.name and search.parser, plus one additional provider, SearchContextProvider:

@Configuration
public class AppConfig {    
    @Bean( destroyMethod = "shutdown" )
    public SpringBus cxf() {
        return new SpringBus();
    }
    
    @Bean @DependsOn( "cxf" )
    public Server jaxRsServer() {
        final Map< String, Object > properties = new HashMap< String, Object >();        
        properties.put( "search.query.parameter.name", "$filter" );
        properties.put( "search.parser", new ODataParser< Person >( Person.class ) );

        final JAXRSServerFactoryBean factory = 
            RuntimeDelegate.getInstance().createEndpoint( 
                jaxRsApiApplication(), 
                JAXRSServerFactoryBean.class 
            );
        factory.setProvider( new SearchContextProvider() );
        factory.setProvider( new JacksonJsonProvider() );
        factory.setServiceBeans( Arrays.< Object >asList( peopleRestService() ) );
        factory.setAddress( factory.getAddress() );      
        factory.setProperties( properties );

        return factory.create();
    }
    
    @Bean 
    public JaxRsApiApplication jaxRsApiApplication() {
        return new JaxRsApiApplication();
    }
    
    @Bean 
    public PeopleRestService peopleRestService() {
        return new PeopleRestService();
    }   
}

The search.query.parameter.name defines what would be the name of query string parameter used as a filter (we set it to be $filter), while search.parser defines the parser to be used to parse the filter expression (we set it to be ODataParser parametrized with Person class). The ODataParser is built on top of excellent Apache Olingo project which currently implements OData 2.0 protocol (the support for OData 4.0 is on the way).

Once the configuration is done, any JAX-RS 2.0 service is able to benefit from search capabilities by injecting the contextual parameter SearchContext. Let us take a look on that in action by defining the REST service to manage people represented by following class Person:

public class Person {
    private String firstName;
    private String lastName;
    private int age;

    // Setters and getters here
}

The PeopleRestService would just allow to create new persons using HTTP POST and perform the search using HTTP GET, listed under /search endpoint:

package com.example.rs;

import java.util.ArrayList;
import java.util.Collection;

import javax.ws.rs.FormParam;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;

import org.apache.cxf.jaxrs.ext.search.SearchCondition;
import org.apache.cxf.jaxrs.ext.search.SearchContext;

import com.example.model.Person;

@Path( "/people" ) 
public class PeopleRestService {
    private final Collection< Person > people = new ArrayList<>();
       
    @Produces( { MediaType.APPLICATION_JSON  } )
    @POST
    public Response addPerson( @Context final UriInfo uriInfo,
            @FormParam( "firstName" ) final String firstName, 
            @FormParam( "lastName" ) final String lastName,
            @FormParam( "age" ) final int age ) {      
        
        final Person person = new Person( firstName, lastName, age );
        people.add( person );
        
        return Response
            .created( uriInfo.getRequestUriBuilder().path( "/search" )
            .queryParam( "$filter=firstName eq '{firstName}' and lastName eq '{lastName}' and age eq {age}" )
            .build( firstName, lastName, age ) )
            .entity( person ).build();
    }
    
    @GET
    @Path("/search")
    @Produces( { MediaType.APPLICATION_JSON  } )
    public Collection< Person > findPeople( @Context SearchContext searchContext ) {        
        final SearchCondition< Person > filter = searchContext.getCondition( Person.class );
        return filter.findAll( people );
    }
}

The findPeople method is the one we are looking for. Thanks to all hard lifting which Apache CXF does, the method looks very simple: the SearchContext is injected and the filter expression is automatically picked up from $filter query string parameter. The last part is to apply the filter to the data, which in our case is just a collection named people. Very clean and straightforward.

Let us build the project and run it:

mvn clean package
java -jar target/cxf-search-extension-0.0.1-SNAPSHOT.jar

Using awesome curl tool, let us issue a couple of HTTP POST requests to generate some data to run the filter queries against:

> curl http://localhost:8080/rest/api/people -X POST -d "firstName=Tom&lastName=Knocker&age=16"
{
    "firstName": "Tom",
    "lastName": "Knocker",
    "age": 16
}

> curl http://localhost:8080/rest/api/people -X POST -d "firstName=Bob&lastName=Bobber&age=23"
{
    "firstName": "Bob",
    "lastName": "Bobber",
    "age": 23
}

> curl http://localhost:8080/rest/api/people -X POST -d "firstName=Tim&lastName=Smith&age=50"
{
    "firstName": "Tim",
    "lastName": "Smith",
    "age": 50
}

With sample data in place, let us go ahead and come up with a couple of different search criteria, complicated enough to show off the power of OData 2.0 query filters:

  • find all persons whose first name is Bob ($filter="firstName eq 'Bob'")
  • > curl -G -X GET http://localhost:8080/rest/api/people/search --data-urlencode 
      $filter="firstName eq 'Bob'" 
    [
        {
            "firstName": "Bob",
            "lastName": "Bobber",
            "age": 23
        }
    ]
    
  • find all persons whose last name is Bobber or last name is Smith and firstName is not Bob ($filter="lastName eq 'Bobber' or (lastName eq 'Smith' and firstName ne 'Bob')")
  • > curl -G -X GET http://localhost:8080/rest/api/people/search --data-urlencode 
      $filter="lastName eq 'Bobber' or (lastName eq 'Smith' and firstName ne 'Bob')" 
    [
        {
            "firstName": "Bob",
            "lastName": "Bobber",
            "age": 23
        },
        {    
            "firstName": "Tim",
            "lastName": "Smith",
            "age": 50
        }
    ]
    
  • find all persons whose first name starts from letter T and who are 16 or older ($filter="firstName eq 'T*' and age ge 16")
  • > curl -G -X GET http://localhost:8080/rest/api/people/search --data-urlencode 
      $filter="firstName eq 'T*' and age ge 16"
    [
        {
            "firstName": "Tom",
            "lastName": "Knocker",
            "age": 16
        },
        {
            "firstName": "Tim",
            "lastName": "Smith",
            "age": 50
        }
    ]
    
Note: if you run this commands on Linux-like environment, you may need to escape the $ sign using \$ instead, for example: curl -X GET -G http://localhost:8080/rest/api/people/search --data-urlencode \$filter="firstName eq 'Bob'"

At the moment, Apache CXF offers just basic support of OData 2.0 query filters, with many powerful expressions left aside. However, there is a commitment to push it forward once the community expresses enough interest in using this feature.

It is worth mentioning that OData 2.0 query filters is not the only option available. Search extension also supports FIQL (The Feed Item Query Language) and this great article from one of the core Apache CXF developers is a great introduction into it.

I think this quite useful feature of Apache CXF can save a lot of your time and efforts by providing simple (and not so simple) search capabilities to your JAX-RS 2.0 services. Please give it a try if it fits your application needs.

The complete project source code is available on Github.

Sunday, September 28, 2014

Embedded Jetty and Apache CXF: secure REST services with Spring Security

Recently I run into very interesting problem which I thought would take me just a couple of minutes to solve: protecting Apache CXF (current release 3.0.1)/ JAX-RS REST services with Spring Security (current stable version 3.2.5) in the application running inside embedded Jetty container (current release 9.2). At the end, it turns out to be very easy, once you understand how things work together and known subtle intrinsic details. This blog post will try to reveal that.

Our example application is going to expose a simple JAX-RS / REST service to manage people. However, we do not want everyone to be allowed to do that so the HTTP basic authentication will be required in order to access our endpoint, deployed at http://localhost:8080/api/rest/people. Let us take a look on the PeopleRestService class:

package com.example.rs;

import javax.json.Json;
import javax.json.JsonArray;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;

@Path( "/people" ) 
public class PeopleRestService {
    @Produces( { "application/json" } )
    @GET
    public JsonArray getPeople() {
        return Json.createArrayBuilder()
            .add( Json.createObjectBuilder()
                .add( "firstName", "Tom" )
                .add( "lastName", "Tommyknocker" )
                .add( "email", "a@b.com" ) )
            .build();
    }
}

As you can see in the snippet above, nothing is pointing out to the fact that this REST service is secured, just couple of familiar JAX-RS annotations.

Now, let us declare the desired security configuration following excellent Spring Security documentation. There are many ways to configure Spring Security but we are going to show off two of them: using in-memory authentication and using user details service, both built on top of WebSecurityConfigurerAdapter. Let us start with in-memory authentication as it is the simplest one:

package com.example.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder;
import org.springframework.security.config.annotation.method.configuration.EnableGlobalMethodSecurity;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
import org.springframework.security.config.http.SessionCreationPolicy;

@Configuration
@EnableWebSecurity
@EnableGlobalMethodSecurity( securedEnabled = true )
public class InMemorySecurityConfig extends WebSecurityConfigurerAdapter {
    @Autowired
    public void configureGlobal(AuthenticationManagerBuilder auth) throws Exception {
        auth.inMemoryAuthentication()
            .withUser( "user" ).password( "password" ).roles( "USER" ).and()
            .withUser( "admin" ).password( "password" ).roles( "USER", "ADMIN" );
    }

    @Override
    protected void configure( HttpSecurity http ) throws Exception {
        http.httpBasic().and()
            .sessionManagement().sessionCreationPolicy( SessionCreationPolicy.STATELESS ).and()
            .authorizeRequests().antMatchers("/**").hasRole( "USER" );
    }
}

In the snippet above there two users defined: user with the role USER and admin with the roles USER, ADMIN. We also protecting all URLs (/**) by setting authorization policy to allow access only users with role USER. Being just a part of the application configuration, let us plug it into the AppConfig class using @Import annotation.

package com.example.config;

import java.util.Arrays;

import javax.ws.rs.ext.RuntimeDelegate;

import org.apache.cxf.bus.spring.SpringBus;
import org.apache.cxf.endpoint.Server;
import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
import org.apache.cxf.jaxrs.provider.jsrjsonp.JsrJsonpProvider;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Import;

import com.example.rs.JaxRsApiApplication;
import com.example.rs.PeopleRestService;

@Configuration
@Import( InMemorySecurityConfig.class )
public class AppConfig { 
    @Bean( destroyMethod = "shutdown" )
    public SpringBus cxf() {
        return new SpringBus();
    }
 
    @Bean @DependsOn ( "cxf" )
    public Server jaxRsServer() {
        JAXRSServerFactoryBean factory = RuntimeDelegate.getInstance().createEndpoint( jaxRsApiApplication(), JAXRSServerFactoryBean.class );
        factory.setServiceBeans( Arrays.< Object >asList( peopleRestService() ) );
        factory.setAddress( factory.getAddress() );
        factory.setProviders( Arrays.< Object >asList( new JsrJsonpProvider() ) );
        return factory.create();
    }
 
    @Bean 
    public JaxRsApiApplication jaxRsApiApplication() {
        return new JaxRsApiApplication();
    }
 
    @Bean 
    public PeopleRestService peopleRestService() {
        return new PeopleRestService();
    }  
}

At this point we have all the pieces except the most interesting one: the code which runs embedded Jetty instance and creates proper servlet mappings, listeners, passing down the configuration we have created.

package com.example;

import java.util.EnumSet;

import javax.servlet.DispatcherType;

import org.apache.cxf.transport.servlet.CXFServlet;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.springframework.web.context.ContextLoaderListener;
import org.springframework.web.context.support.AnnotationConfigWebApplicationContext;
import org.springframework.web.filter.DelegatingFilterProxy;

import com.example.config.AppConfig;

public class Starter {
    public static void main( final String[] args ) throws Exception {
        Server server = new Server( 8080 );
          
        // Register and map the dispatcher servlet
        final ServletHolder servletHolder = new ServletHolder( new CXFServlet() );
        final ServletContextHandler context = new ServletContextHandler();   
        context.setContextPath( "/" );
        context.addServlet( servletHolder, "/rest/*" );  
        context.addEventListener( new ContextLoaderListener() );
   
        context.setInitParameter( "contextClass", AnnotationConfigWebApplicationContext.class.getName() );
        context.setInitParameter( "contextConfigLocation", AppConfig.class.getName() );
   
        // Add Spring Security Filter by the name
        context.addFilter(
            new FilterHolder( new DelegatingFilterProxy( "springSecurityFilterChain" ) ), 
                "/*", EnumSet.allOf( DispatcherType.class )
        );
         
        server.setHandler( context );
        server.start();
        server.join(); 
    }
}

Most of the code does not require any explanation except the the filter part. This is what I meant by subtle intrinsic detail: the DelegatingFilterProxy should be configured with the filter name which must be exactly springSecurityFilterChain, as Spring Security names it. With that, the security rules we have configured are going to apply to any JAX-RS service call (the security filter is executed before the Apache CXF servlet), requiring the full authentication. Let us quickly check that by building and running the project:

mvn clean package   
java -jar target/jax-rs-2.0-spring-security-0.0.1-SNAPSHOT.jar

Issuing the HTTP GET call without providing username and password does not succeed and returns HTTP status code 401.

> curl -i http://localhost:8080/rest/api/people

HTTP/1.1 401 Full authentication is required to access this resource
WWW-Authenticate: Basic realm="Realm"
Cache-Control: must-revalidate,no-cache,no-store
Content-Type: text/html; charset=ISO-8859-1
Content-Length: 339
Server: Jetty(9.2.2.v20140723)

The same HTTP GET call with username and password provided returns successful response (with some JSON generated by the server).

> curl -i -u user:password http://localhost:8080/rest/api/people

HTTP/1.1 200 OK
Date: Sun, 28 Sep 2014 20:07:35 GMT
Content-Type: application/json
Content-Length: 65
Server: Jetty(9.2.2.v20140723)

[{"firstName":"Tom","lastName":"Tommyknocker","email":"a@b.com"}]

Excellent, it works like a charm! Turns out, it is really very easy. Also, as it was mentioned before, the in-memory authentication could be replaced with user details service, here is an example how it could be done:

package com.example.config;

import java.util.Arrays;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder;
import org.springframework.security.config.annotation.method.configuration.EnableGlobalMethodSecurity;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
import org.springframework.security.config.http.SessionCreationPolicy;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.core.userdetails.User;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.security.core.userdetails.UserDetailsService;
import org.springframework.security.core.userdetails.UsernameNotFoundException;

@Configuration
@EnableWebSecurity
@EnableGlobalMethodSecurity(securedEnabled = true)
public class UserDetailsSecurityConfig extends WebSecurityConfigurerAdapter {
    @Autowired
    public void configureGlobal(AuthenticationManagerBuilder auth) throws Exception {
        auth.userDetailsService( userDetailsService() );
    }
    
    @Bean
    public UserDetailsService userDetailsService() {
        return new UserDetailsService() {
            @Override
            public UserDetails loadUserByUsername( final String username ) 
                    throws UsernameNotFoundException {
                if( username.equals( "admin" ) ) {
                    return new User( username, "password", true, true, true, true,
                        Arrays.asList(
                            new SimpleGrantedAuthority( "ROLE_USER" ),
                            new SimpleGrantedAuthority( "ROLE_ADMIN" )
                        )
                    );
                } else if ( username.equals( "user" ) ) {
                    return new User( username, "password", true, true, true, true,
                        Arrays.asList(
                            new SimpleGrantedAuthority( "ROLE_USER" )
                        )
                    );
                } 
                    
                return null;
            }
        };
    }

    @Override
    protected void configure( HttpSecurity http ) throws Exception {
        http
           .httpBasic().and()
           .sessionManagement().sessionCreationPolicy( SessionCreationPolicy.STATELESS ).and()
           .authorizeRequests().antMatchers("/**").hasRole( "USER" );
    }
}

Replacing the @Import( InMemorySecurityConfig.class ) with @Import( UserDetailsSecurityConfig.class ) in the AppConfig class leads to the same results, as both security configurations define the identical sets of users and their roles.

I hope, this blog post will save you some time and gives a good starting point, as Apache CXF and Spring Security are getting along very well under Jetty umbrella!

The complete source code is available on GitHub.