L is Bエンジニアブログ

ビジネス用メッセンジャーdirectのエンジニアによるブログ

LisBエンジニアブログ

ビジネスチャットdirectのエンジニアブログ

Spring による Reactive WebSocket

f:id:mike_neck:20180221140954p:plain

こんにちは、directのサーバーエンジニアをしている持田(@mike_neck)です。 会社ブログに初めて記事を書くので、非常に緊張しておりま…せん。

Spring Framework 5 がリリースされて少し経ち、僕も素振りに余年がありませんが、今回も素振りがてら Spring の Reactive websocket を使って簡単な websocket サーバーを書いてみました。 direct でも websocket を利用していますが、 Spring の WebFlux framework でどのように扱うのか非常に気になります!


Spring の WebFlux フレームワークについて

モバイル端末の普及などに伴って低速なネットワークによる大量アクセスなどが増えた結果、 従来のServlet APIを用いたブロッキングなIOでは低速なネットワークからのリクエストによってスレッドが専有されてしまい、CPUなどが有効に活用できません。 そこでノンブロッキングIOを活用してサーバーのスレッド/CPUを有効活用することが求められてきました。 Spring Framework 5 からは Netty 等のノンブロッキングなネットワークフレームワークの上に、 ノンブロッキングなバックプレッシャー制御のある非同期ストリーム処理フレームワークである Reactor をベースとした 新しいWebフレームワークとして WebFlux が追加されました。 WebFlux フレームワークは websocket API を提供しており、非同期な websocket サーバーを http サーバーと同じようなモデルで記述できます。


サンプルプロジェクトについて

今回はエコーサーバーでもいいかと思ったのですが、サンプルのエコーサーバーのコードを見ると、メッセージの取り回しが必要なさそうでした。 メッセージを取り出すコードも書きたかったので、今回は送られてきたメッセージに文字列Hello,をつけて返すサーバーにします。

サーバー

まずはサーバー側の実装を紹介します。プロジェクトの準備には Spring Initializer を使います。

curl https://start.spring.io/starter.tgz \
  -d bootVersion=2.0.0.RC1 \
  -d dependencies=webflux,lombok \
  -d type=gradle-project \
  -d baseDir=app | tar -xzvf -

解凍されたディレクトリーにある build.gradle には次のような依存ライブラリーが追加されていると思います。

dependencies {
    compile('org.springframework.boot:spring-boot-starter-webflux')
    compileOnly('org.projectlombok:lombok')
    testCompile('org.springframework.boot:spring-boot-starter-test')
    testCompile('io.projectreactor:reactor-test')

    // eclipse-collections は僕が追加したライブラリーです
    compile group: 'org.eclipse.collections', name: 'eclipse-collections', version: '9.1.0'
}

まずは websocket のハンドラークラスを作ります。

package com.example.app;

import java.nio.charset.StandardCharsets;
import java.time.Duration;

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Slf4j
public class HellowebsocketHandler implements WebSocketHandler {

  @Override
  public Mono<Void> handle(final WebSocketSession session) {
    log.info("new connection: {}", session.getId());
    final Flux<WebSocketMessage> result =
        session
            .receive()  // (1)
            .delaySubscription(Duration.ofSeconds(2L)) // (2)
            .map(WebSocketMessage::getPayloadAsText) //(3)
            .flatMap(text -> this.handle(text, session.bufferFactory())); // (4)
    return session.send(result); // (5)
  }

  private Mono<WebSocketMessage> handle(final String input, final DataBufferFactory factory) {
    final String message = "Hello, " + input;
    log.info("coming: {}, going: {}", input, message);
    final DataBuffer buffer = factory.wrap(message.getBytes(StandardCharsets.UTF_8));
    final WebSocketMessage webSocketMessage =
        new WebSocketMessage(WebSocketMessage.Type.TEXT, buffer); // (6)
    return Mono.just(webSocketMessage);
  }
}
  1. websocket のメッセージが到着したらメッセージを取り出します (Flux<WebSocketMessage> が返ってくる)
  2. websocket らしく(?)メッセージを 2 秒ほど遅らせて返します
  3. WebSocketMessage#getPayloadAsText メソッドにより、 String でメッセージを取り出します
  4. WebSocketMessage にするために DataBuffer の形にする必要があるので、 DataBufferFactory をもらって、 handle(String, DataBufferFactory) メソッドに渡します
  5. WebSocketSession#send メソッドによりメッセージを送信します(戻り値が Mono<Void>)
  6. レスポンスメッセージをテキストメッセージとして構築します

次にアプリケーションの main メソッドのあるクラスを組み立てます。

package com.example.app;

import java.util.Map;

import org.eclipse.collections.impl.factory.Maps;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.WebSocketService;
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy;

@SpringBootApplication
@EnableWebFlux // (1)
public class WebApp {

  public static void main(String[] args) {
    SpringApplication.run(WebApp.class, args);
  }

  @Bean
  WebSocketService webSocketService() { // (2)
    return new HandshakeWebSocketService(new ReactorNettyRequestUpgradeStrategy());
  }

  @Bean
  WebSocketHandlerAdapter handlerAdapter() { // (3)
    return new WebSocketHandlerAdapter(webSocketService());
  }

  @Bean
  HandlerMapping handlerMapping() {
    final Map<String, WebSocketHandler> handlerMappings =
        Maps.immutable
            .<String, WebSocketHandler>of("/ws/hello", new HelloWebSocketHandler()) // (4)
            .castToMap();
    final SimpleUrlHandlerMapping urlHandlerMapping = new SimpleUrlHandlerMapping();
    urlHandlerMapping.setUrlMap(handlerMappings);
    urlHandlerMapping.setOrder(-1);
    return urlHandlerMapping;
  }
}
  1. WebFlux を利用するため、 @EnableWebFlux をつけます
  2. http から websocket に切り替えるための RequestUpgradeStrategy の実装を指定します
  3. WebSocketHandlerDispatcherHandler に接続します
  4. URL /ws/hello にきた websocket のリクエストを HelloWebSocketHandler に割り振ります

これでサーバー側の実装は完了です


クライアント

クライアントですが、 WebFlux に付随しているクライアントを利用しようとしたのですが、Spring Webから切り離せそうになかったので、 普通に Tyrus を使いました。 Tyrus については他のサイト等にも掲載されていますので、そちらもあわせて参照して下さい。

build.gradle

plugins {
    id 'java'
}

repositories {
    mavenCentral()
}

dependencies {
    compile group: 'org.glassfish.tyrus', name: 'tyrus-container-grizzly-client', version: '1.13.1'
    compile group: 'org.glassfish.tyrus', name: 'tyrus-client', version: '1.13.1'
    compile group: 'javax.websocket', name: 'javax.websocket-client-api', version: '1.1'
    compile 'org.slf4j:slf4j-api:1.7.25'
    compile 'ch.qos.logback:logback-classic:1.2.3'
    compile group: 'io.projectreactor', name: 'reactor-core', version: '3.1.4.RELEASE'
}

クライアントのハンドラー

package com.example.tyrus;

import javax.websocket.ClientEndpoint;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ClientEndpoint
public class TyrusClient {

  private static final Logger log = LoggerFactory.getLogger(TyrusClient.class);

  @OnOpen
  public void onOpen(final Session session) {
    log.info("open session to server: {}", session.getId());
  }

  @OnMessage
  public void onMessage(final Session session, final String message) {
    log.info("message coming from server : {}", message);
  }
}

単にメッセージが飛んできたらログをとるだけです。

クライアントのアプリのmain

package com.example.tyrus;

import static com.example.tyrus.ExceptionalConsumer.consumer;
import static com.example.tyrus.ExceptionalSupplier.supplier;

import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import javax.websocket.DeploymentException;
import javax.websocket.Session;

import org.glassfish.tyrus.client.ClientManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import reactor.core.publisher.Mono;

public class TyrusApp {

  private static final Logger log = LoggerFactory.getLogger(TyrusClient.class);

  private static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();

  public static void main(String[] args) throws DeploymentException {
    final ClientManager clientManager = ClientManager.createClient();
    final Future<Session> futureSession =
        clientManager.asyncConnectToServer(
            TyrusClient.class, URI.create("ws://localhost:8080/ws/hello"));

    log.info("start application");
    Mono.fromFuture(CompletableFuture.supplyAsync(supplier(futureSession::get), EXECUTOR))
        .delaySubscription(Duration.ofSeconds(2L)) // (1)
        .doOnNext(consumer(session -> session.getBasicRemote().sendText("tyrus"))) // (2)
        .delayElement(Duration.ofSeconds(4L)) // (3)
        .doOnNext(consumer(Session::close)) // (4)
        .doOnTerminate(() -> log.info("close connection"))
        .doOnTerminate(EXECUTOR::shutdown)
        .subscribe(); // (5)
  }
}

無駄に reactor を使ってみました。

  1. 接続したら 2 秒待つ
  2. サーバーに tyrus というメッセージを送る
  3. イベントの間隔を 4 秒にする
  4. 切断する
  5. イベントの購読を開始する

これでクライアントの実装は終わりです


では早速サーバー、クライアントの順に起動してみます。

その時のログです。

サーバーのログ(抜粋)

f:id:mike_neck:20180221104538p:plain

クライアントのログ

f:id:mike_neck:20180221104601p:plain

接続してから4秒後にメッセージが飛んでいる様子がわかると思います。また、送信メッセージ tyrus message を受け取って Hello, tyrus message が返ってきているのがわかると思います。

今回のコードはこちらから入手できます。(幻となった WebFlux 版のクライアント実装もあります)

github.com


まとめ

僕がまだ reactor の API(Mono とか Flux とか)に不慣れだったり、結構 low level な API なこともあり、実装での型合わせ(Future からの Mono の生成や、 WebSocketSession の求めるオブジェクト(DataBufferString などを使えない)の生成など)には結構手こずりました。 ただ、ノンブロッキングなサーバーを利用できると、 direct のサーバーリソースを抑えられる見込みがありますので、今のうちに素振りして慣れておきたいものです。


社員を絶賛募集中

L is B では新しい技術が大好きなエンジニアを募集しています。われこそは!と思う方、思わない方、是非一緒に働きませんか? こちらのURL よりご応募いただくか、ツイッターで @mike_neck にメンションください!

おことわり

この記事は Spring boot 2.0.0.RC1 での実行結果であり、 Spring Boot 2.0.0 以後での動作を保証するものではありません。