In this blog post I’m going to write about combining Dropwizard and Atmosphere. Atmosphere is a framework for handling websockets. If you take a look at the samples, it seems like the integration is a no-brainer. I thought so too. Fool.
Atmosphere provides a Jersey integration, Dropwizard also uses Jersey, so everything will be alright. Hah.
First, add org.atmosphere:atmosphere-jersey
and org.eclipse.jetty.websocket:websocket-server:9.0.7.v20131107
(use the same Jetty version as in Dropwizard) to your dependencies. Then register the Atmosphere Servlet in Dropwizard:
@Override
public void run(RestwarsConfiguration configuration, Environment environment) throws Exception {
AtmosphereServlet servlet = new AtmosphereServlet();
servlet.framework().addInitParameter(ApplicationConfig.ANNOTATION_PACKAGE, WebsocketResource.class.getPackage().getName());
servlet.framework().addInitParameter(ApplicationConfig.WEBSOCKET_SUPPORT, "true");
ServletRegistration.Dynamic registration = environment.servlets().addServlet("atmosphere", servlet);
registration.addMapping("/websocket/*");
}
Now we can implement our websocket handler:
@Path("/")
@Produces(MediaType.APPLICATION_JSON)
public class WebsocketResource {
/**
* Name of the round broadcaster.
*/
private static final String ROUND_BROADCASTER_NAME = "round";
@Context
private BroadcasterFactory broadcasterFactory;
@Suspend
@GET
@Path("/round")
public SuspendResponse<String> registerForRoundEvent(@Context AtmosphereResource resource) {
return new SuspendResponse.SuspendResponseBuilder<String>()
.broadcaster(broadcasterFactory.lookup(DefaultBroadcaster.class, ROUND_BROADCASTER_NAME, true))
.build();
}
}
In this code, we let Jersey inject the BroadcasterFactory
trough the @Context
annotation, so we can created a named broadcaster. When a client
opens a websocket to http://localhost:8080/websocket/round
, registerForRoundEvent
will be called and the connection will be held open (@Suspend
annotation).
Now we can use the named broadcaster somewhere in the application to send events to the listening client:
public void broadcastRound(BroadcasterFactory broadcasterFactory, long round) {
broadcasterFactory.lookup(DefaultBroadcaster.class, ROUND_BROADCASTER_NAME, true).broadcast(Long.toString(round));
}
An instance of the BroadcasterFactory
can be obtained as follows:
@Override
public void run(RestwarsConfiguration configuration, Environment environment) throws Exception {
AtmosphereServlet servlet = new AtmosphereServlet();
// ...
BroadcasterFactory broadcasterFactory = servlet.framework().getBroadcasterFactory();
}
Now, everything works. Or so I thought. I pushed the code to GitHub, and travis complained that tests are failing. As it turns out, Atmosphere is not compatible with Dropwizard Testing. Or Dropwizard is not compatible with Atmosphere, whatever.
If you use the ResourceTestRule
from Dropwizard, which starts an in-memory Jersey, strange errors pop up:
ERROR [2015-02-20 16:08:06,098] com.sun.jersey.spi.inject.Errors: The following errors and warnings have been detected with resource and/or provider classes:
SEVERE: Missing dependency for field: javax.servlet.http.HttpServletRequest org.atmosphere.jersey.BaseInjectableProvider.req
SEVERE: Missing dependency for field: javax.servlet.http.HttpServletRequest org.atmosphere.jersey.BaseInjectableProvider.req
SEVERE: Missing dependency for field: javax.servlet.http.HttpServletRequest org.atmosphere.jersey.AtmosphereProviders$BroadcasterProvider.req
SEVERE: Missing dependency for field: javax.servlet.http.HttpServletRequest org.atmosphere.jersey.BaseInjectableProvider.req
SEVERE: Missing dependency for field: javax.servlet.http.HttpServletRequest org.atmosphere.jersey.BaseInjectableProvider.req
SEVERE: Missing dependency for field: protected javax.servlet.http.HttpServletRequest org.atmosphere.jersey.AtmosphereFilter.servletReq
SEVERE: Missing dependency for field: javax.servlet.http.HttpServletRequest org.atmosphere.jersey.BaseInjectableProvider.req
SEVERE: Missing dependency for field: javax.servlet.http.HttpServletRequest org.atmosphere.jersey.BaseInjectableProvider.req
com.sun.jersey.spi.inject.Errors$ErrorMessagesException
at com.sun.jersey.spi.inject.Errors.processErrorMessages(Errors.java:170)
at com.sun.jersey.spi.inject.Errors.postProcess(Errors.java:136)
at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:199)
at com.sun.jersey.server.impl.application.WebApplicationImpl.initiate(WebApplicationImpl.java:795)
at com.sun.jersey.server.impl.application.WebApplicationImpl.initiate(WebApplicationImpl.java:790)
at com.sun.jersey.test.framework.spi.container.inmemory.InMemoryTestContainerFactory$InMemoryTestContainer.start(InMemoryTestContainerFactory.java:165)
at com.sun.jersey.test.framework.JerseyTest.setUp(JerseyTest.java:310)
at io.dropwizard.testing.junit.ResourceTestRule$1.evaluate(ResourceTestRule.java:149)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:211)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
I assume Jersey tries to load the Atmosphere providers through classpath scanning and fails. This error appears even if you don’t use
any code from Atmosphere, just by having the atmosphere-jersey
in your classpath. Very nice.
My workaround:
Don’t use the atmosphere-jersey
module but atmosphere-runtime
instead. The drawback is that you have to write a little bit more code. Lets take a look at our
new WebsocketResource
:
@AtmosphereHandlerService
public class WebsocketResource implements AtmosphereHandler {
/**
* Name of the round broadcaster.
*/
private static final String ROUND_BROADCASTER_NAME = "round";
@Override
public void onRequest(AtmosphereResource atmosphereResource) throws IOException {
BroadcasterFactory broadcasterFactory = atmosphereResource.getAtmosphereConfig().getBroadcasterFactory();
Broadcaster broadcaster = getBroadcaster(broadcasterFactory);
atmosphereResource.setBroadcaster(broadcaster);
atmosphereResource.suspend();
}
private boolean isBroadcast(AtmosphereResourceEvent event) {
return event.getMessage() != null && !event.isCancelled() && !event.isClosedByClient() && !event.isClosedByApplication();
}
@Override
public void onStateChange(AtmosphereResourceEvent event) throws IOException {
AtmosphereResource resource = event.getResource();
if (isBroadcast(event)) {
resource.write(event.getMessage().toString());
switch (resource.transport()) {
case WEBSOCKET:
case STREAMING:
resource.getResponse().flushBuffer();
break;
default:
resource.resume();
break;
}
}
}
@Override
public void destroy() {
}
private Broadcaster getBroadcaster(BroadcasterFactory broadcasterFactory) {
return broadcasterFactory.lookup(ROUND_BROADCASTER_NAME, true);
}
}
First, we annotate our WebsocketResource
with @AtmosphereHandlerService
and implement Atmosphere’s AtmosphereHandler
.
onRequest
is called when a websocket is opened from the client. Here we assign the
named broadcaster and suspend the connection. onStateChange
is called, as the name suggests, wenn a broadcast needs to be
sent (WTF?) or any other state change (disconnect, timeout, …) happens. If the event is a broadcast,
them some logic is triggered: depending on the transport protocol (websocket, long-polling, etc.) we write the broadcast
message and flush the buffer or, in case of long-polling, even close the connection.
Now the code works the same way as if we used the Atmosphere-Jersey combination but our tests are still working. Great!
You can see the whole setup in RESTwars.