Monday, October 30, 2017

Better late than never: SSE, or Server-Sent Events, are now in JAX-RS

Server-Sent Events (or just SSE) is quite useful protocol which allows the servers to push data to the clients over HTTP. This is something our web browsers support for ages but, surprisingly, neglected by JAX-RS specification for quite a long time. Although Jersey had an extension available for SSE media type, the API has never been formalized and as such, was not portable to other JAX-RS implementations.

Luckily, JAX-RS 2.1, also known as JSR-370, has changed that by making SSE support, both client-side and server-side, a part of the official specification. In today's post we are going to look at how to integrate SSE support into the existing Java REST(ful) web services, using recently released version 3.2.0 of the terrific Apache CXF framework. In fact, beside the bootstrapping, there is nothing CXF-specific really, all the examples should work in any other framework which implements JAX-RS 2.1 specification.

Without further ado, let us get started. As the significant amount of Java projects these days are built on top of awesome Spring Framework, our sample application would use Spring Boot and Apache CXF Spring Boot Integration to get us off the ground quickly. The old good buddy Apache Maven would help us as well by managing our project dependencies.


    org.springframework.boot
    spring-boot-starter
    1.5.8.RELEASE



    org.apache.cxf
    cxf-rt-frontend-jaxrs
    3.2.0



    org.apache.cxf
    cxf-spring-boot-starter-jaxrs
    3.2.0



    org.apache.cxf
    cxf-rt-rs-client
    3.2.0



     org.apache.cxf
     cxf-rt-rs-sse
     3.2.0

Under the hood Apache CXF is using Atmosphere framework to implement SSE transport so this is another dependency we have to include.


    org.atmosphere
    atmosphere-runtime
    2.4.14

The specifics around relying on Atmosphere framework introduces a need to provide additional configuration settings, namely transportId, so to ensure that SSE-capable transport will be picked up at runtime. The relevant details could be added into application.yml file:

cxf:
  servlet:
    init:
      transportId: http://cxf.apache.org/transports/http/sse

Great, so the foundation is there, moving on. The REST(ful) web service we are going to build would expose imaginary CPU load averages (for simplicity randomly generated) as the SSE streams. The Stats class would constitute our data model.

public class Stats {
    private long timestamp;
    private int load;

    public Stats() {
    }

    public Stats(long timestamp, int load) {
        this.timestamp = timestamp;
        this.load = load;
    }

    // Getters and setters are omitted
    ...
}

Speaking of streams, the Reactive Streams specification made its way into Java 9 and hopefully we are going to see the accelerated adoption of the reactive programming models by Java community. Moreover, developing SSE-enabled REST(ful) web services would be so much easier and straightforward when backed by Reactive Streams. To make the case, let us onboard RxJava 2 into our sample application.


    io.reactivex.rxjava2
    rxjava
    2.1.6

This is a good moment to start with our StatsRestService class, the typical JAX-RS resource implementation. The key SSE capabilities in JAX-RS 2.1 are centered around Sse contextual object which could be inject like this.

@Service
@Path("/api/stats")
public class StatsRestService {
    @Context 
    public void setSse(Sse sse) {
        // Access Sse context here
    }

Out of the Sse context we could get access to two very useful abstractions: SseBroadcaster and OutboundSseEvent.Builder, for example:

private SseBroadcaster broadcaster;
private Builder builder;
    
@Context 
public void setSse(Sse sse) {
    this.broadcaster = sse.newBroadcaster();
    this.builder = sse.newEventBuilder();
}

As you may already guess, the OutboundSseEvent.Builder constructs the instances of the OutboundSseEvent classes which could be sent over the wire, while SseBroadcaster broadcasts the same SSE stream to all the connected clients. With that being said, we could generate the stream of OutboundSseEvents and distribute it to everyone who is interested:

private static void subscribe(final SseBroadcaster broadcaster, final Builder builder) {
    Flowable
        .interval(1, TimeUnit.SECONDS)
        .zipWith(eventsStream(builder), (id, bldr) -> createSseEvent(bldr, id))
        .subscribeOn(Schedulers.single())
        .subscribe(broadcaster::broadcast);
}

private static Flowable<OutboundSseEvent.Builder> eventsStream(final Builder builder) {
    return Flowable.generate(emitter -> emitter.onNext(builder.name("stats")));
}

If you are not familiar with RxJava 2, no worries, this is what is happening here. The eventsStream method returns an effectively infinite stream of OutboundSseEvent.Builder instances for the SSE events of type stats. The subscribe method is a little bit more complicated. We start off by creating a stream which emits sequential number every second, f.e. 0,1,2,3,4,5,6,... and so on. Later, we combine this stream with the one returned by eventsStream method, essentially merging both streams to a single one which emits a tuple of (number, OutboundSseEvent.Builder) every second. Fairly speaking, this tuple is not very useful to us so we transform it to the instance of OutboundSseEvent class, treating the number as SSE event identifier:

private static final Random RANDOM = new Random();

private static OutboundSseEvent createSseEvent(OutboundSseEvent.Builder builder, long id) {
    return builder
        .id(Long.toString(id))
        .data(Stats.class, new Stats(new Date().getTime(), RANDOM.nextInt(100)))
        .mediaType(MediaType.APPLICATION_JSON_TYPE)
        .build();
}

The OutboundSseEvent may carry any payload in the data property which will be serialized with respect to the mediaType specified, using the usual MessageBodyWriter resolution strategy. Once we get our OutboundSseEvent instance, we send it off using SseBroadcaster::broadcast method. Please notice that we handed off the control flow to another thread using subscribeOn operator, this is usually what you would do all the time.

Good, hopefully the stream part is cleared out now but how could we actually subscribe to the SSE events emitted by SseBroadcaster? That is easier than you might think:

@GET
@Path("broadcast")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void broadcast(@Context SseEventSink sink) {
    broadcaster.register(sink);
}

And we are all set. The most important piece here is content type being produced, which should be set to MediaType.SERVER_SENT_EVENTS. In this case, the contextual instance of the SseEventSink becomes available and could be registered with SseBroadcaster instance.

To see our JAX-RS resource in action, we need to bootstrap the server instance using, for example, JAXRSServerFactoryBean, configuring all the necessary providers along the way. Please take a note that we are also explicitly specifying transport to be used, in this case SseHttpTransportFactory.TRANSPORT_ID.

@Configuration
@EnableWebMvc
public class AppConfig extends WebMvcConfigurerAdapter {
    @Bean
    public Server rsServer(Bus bus, StatsRestService service) {
        JAXRSServerFactoryBean endpoint = new JAXRSServerFactoryBean();
        endpoint.setBus(bus);
        endpoint.setAddress("/");
        endpoint.setServiceBean(service);
        endpoint.setTransportId(SseHttpTransportFactory.TRANSPORT_ID);
        endpoint.setProvider(new JacksonJsonProvider());
        return endpoint.create();
    }
    
    @Override
    public void addResourceHandlers(ResourceHandlerRegistry registry) {
        registry
          .addResourceHandler("/static/**")
          .addResourceLocations("classpath:/web-ui/"); 
    }
}

To close the loop, we just need to supply the runner for our Spring Boot application:

@SpringBootApplication
public class SseServerStarter {    
    public static void main(String[] args) {
        SpringApplication.run(SseServerStarter.class, args);
    }
}

Now, if we run the application and navigate to http://localhost:8080/static/broadcast.html using multiple web browsers or different tabs within the same browser, we would observe the identical stream of events charted inside all of them:

Nice, broadcasting is certainly a valid use case, but what about returning an independent SSE stream on each endpoint invocation? Easy, just use SseEventSink methods, like send and close, to manipulate the SSE stream directly.

@GET
@Path("sse")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void stats(@Context SseEventSink sink) {
    Flowable
        .interval(1, TimeUnit.SECONDS)
        .zipWith(eventsStream(builder), (id, bldr) -> createSseEvent(bldr, id))
        .subscribeOn(Schedulers.single())
        .subscribe(sink::send, ex -> {}, sink::close);
}

This time, if we run the application and navigate to http://localhost:8080/static/index.html using multiple web browsers or different tabs within the same browser, we would observe absolutely different charts:

Excellent, the server-side APIs are indeed very concise and easy to use. But what about client side, could we consume SSE streams from the Java applications? The answer is yes, absolutely. The JAX-RS 2.1 outlines the client-side API as well, with SseEventSource in the heart of it.


final WebTarget target = ClientBuilder
    .newClient()
    .register(JacksonJsonProvider.class)
    .target("http://localhost:8080/services/api/stats/sse");
        
try (final SseEventSource eventSource =
            SseEventSource
                .target(target)
                .reconnectingEvery(5, TimeUnit.SECONDS)
                .build()) {

    eventSource.register(event -> {
        final Stats stats = event.readData(Stats.class, MediaType.APPLICATION_JSON_TYPE);
        System.out.println("name: " + event.getName());
        System.out.println("id: " + event.getId());
        System.out.println("comment: " + event.getComment());
        System.out.println("data: " + stats.getLoad() + ", " + stats.getTimestamp());
        System.out.println("---------------");
    });
    eventSource.open();

    // Just consume SSE events for 10 seconds
    Thread.sleep(10000); 
}     

If we run this code snippet (assuming the server is up and running as well) we would see something like that in the console (as you may recall, the data is generated randomly).

name: stats
id: 0
comment: null
data: 82, 1509376080027
---------------
name: stats
id: 1
comment: null
data: 68, 1509376081033
---------------
name: stats
id: 2
comment: null
data: 12, 1509376082028
---------------
name: stats
id: 3
comment: null
data: 5, 1509376083028
---------------

...

As we can see, the OutboundSseEvent from server-side becomes InboundSseEvent for the client side. The client may consume any payload from the data property which could be deserialized by specifying expected media type, using the usual MessageBodyReader resolution strategy.

There are a lot of material squeezed in the single post. And still, there are few more things regarding SSE and JAX-RS 2.1 which we have not covered here, like for example using HttpHeaders.LAST_EVENT_ID_HEADER or configuring reconnect delays. Those could be a great topic for the upcoming post if there would be an interest to learn about.

To conclude, the SSE support in JAX-RS is what many of us have been awaiting for so long. Finally, it is the there, please give it a try!

The complete project sources are available on Github.