目標は、ローカルのダウンロードなしで、大きなjson.gzファイル(圧縮された4GB、圧縮されていない12GBの圧縮ファイル、1200万行程度)をWebサーバーからデータベースに直接流すことです。統合フローでは大規模なストリームを分割する際のスプリング統合バックプレッシャエラー
body = response.body().byteStream(); // thanks okhttp
reader = new InputStreamReader(body, StandardCharsets.UTF_8);
br = new BufferedReader(reader, bufferSize);
Flux<String> flux = Flux.fromStream(br.lines())
.onBackpressureBuffer(10000, x -> log.error("Buffer overrun!"))
.doAfterTerminate(() -> closeQuietly(closeables))
.doOnError(t -> log.error(...))
:
.handle(new MessageTransformingHandler(new GzipToFluxTransformer(...)))
.split()
.log(LoggingHandler.Level.DEBUG, CLASS_NAME, Message::getHeaders)
.channel(repositoryInputChannel())
しかし
2017-12-08 22:48:47.846 [task-scheduler-7] [ERROR] c.n.d.y.s.GzipToFluxTransformer - Buffer overrun!
2017-12-08 22:48:48.337 [task-scheduler-7] [ERROR] o.s.i.h.LoggingHandler - org.springframework.messaging.MessageHandlingException:
error occurred in message handler [org.springframework.integration.splitter.DefaultMessageSplitter#1];
nested exception is reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...),
failedMessage=...}]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:153)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
Spring統合アウトバウンドゲートウェイはgzip形式をサポートしていないので、私は自動的に応答を解凍することをokhttp使用して、それを自分でやっています
出力チャネルは、ブリッジによってポーリングされた無制限キューを使用して実行時に接続されます。これは、テストのためにキューをDirectChannel
に置き換えることができるようにテストを容易にするためです。
@Bean(name = "${...}")
public PollableChannel streamingOutputChannel() {
return new QueueChannel();
}
@Bean
public IntegrationFlow srcToSinkBridge() {
return IntegrationFlows.from(streamingOutputChannel())
.bridge(e -> e.poller(Pollers.fixedDelay(500)))
.channel(repositoryInputChannel())
.get();
}
私はここにいくつかの疑問があります。
- 私は、Bean名でSPELを使用した動的バインディングが機能しているかどうかはわかりませんが、確認方法はわかりません。
- キューは無制限なので、私が考えているのは、ポーリングが十分に速くないということだけです。ただし、例外は、スプリッタが問題を抱えていることを示唆しています。