0

目標は、ローカルのダウンロードなしで、大きなjson.gzファイル(圧縮された4GB、圧縮されていない12GBの圧縮ファイル、1200万行程度)をWebサーバーからデータベースに直接流すことです。統合フローでは大規模なストリームを分割する際のスプリング統合バックプレッシャエラー

body = response.body().byteStream(); // thanks okhttp 
reader = new InputStreamReader(body, StandardCharsets.UTF_8); 
br = new BufferedReader(reader, bufferSize); 

Flux<String> flux = Flux.fromStream(br.lines()) 
    .onBackpressureBuffer(10000, x -> log.error("Buffer overrun!")) 
    .doAfterTerminate(() -> closeQuietly(closeables)) 
    .doOnError(t -> log.error(...)) 

.handle(new MessageTransformingHandler(new GzipToFluxTransformer(...))) 
.split() 
.log(LoggingHandler.Level.DEBUG, CLASS_NAME, Message::getHeaders) 
.channel(repositoryInputChannel()) 

しかし

2017-12-08 22:48:47.846 [task-scheduler-7] [ERROR] c.n.d.y.s.GzipToFluxTransformer - Buffer overrun! 
2017-12-08 22:48:48.337 [task-scheduler-7] [ERROR] o.s.i.h.LoggingHandler - org.springframework.messaging.MessageHandlingException: 
error occurred in message handler [org.springframework.integration.splitter.DefaultMessageSplitter#1]; 
nested exception is reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...), 
failedMessage=...}] 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:153) 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) 
Spring統合アウトバウンドゲートウェイはgzip形式をサポートしていないので、私は自動的に応答を解凍することをokhttp使用して、それを自分でやっています

出力チャネルは、ブリッジによってポーリングされた無制限キューを使用して実行時に接続されます。これは、テストのためにキューをDirectChannelに置き換えることができるようにテストを容易にするためです。

@Bean(name = "${...}") 
public PollableChannel streamingOutputChannel() { 
    return new QueueChannel(); 
} 

@Bean 
public IntegrationFlow srcToSinkBridge() { 
    return IntegrationFlows.from(streamingOutputChannel()) 
     .bridge(e -> e.poller(Pollers.fixedDelay(500))) 
     .channel(repositoryInputChannel()) 
     .get(); 
} 

私はここにいくつかの疑問があります。

  1. 私は、Bean名でSPELを使用した動的バインディングが機能しているかどうかはわかりませんが、確認方法はわかりません。
  2. キューは無制限なので、私が考えているのは、ポーリングが十分に速くないということだけです。ただし、例外は、スプリッタが問題を抱えていることを示唆しています。

答えて

0

問題はlogです。これは、ドキュメントを引用AbstractMessageSplitter.

boolean reactive = getOutputChannel() instanceof ReactiveStreamsSubscribableChannel; 

するためのロジックを台無しにDirectChannelにスプリッタの出力チャネルを変更するために盗聴を使用しています。バージョン5.0以降で

、...スプリッタの出力チャンネル場合ReactStreamsSubscribableChannelの インスタンスである場合、 AbstractMessageSplitterは、イテレータ の代わりにFlux結果を生成し、出力チャネルは、下流のフロー要求でバッ​​クプレッシャー に基づいて分割するためにこのFluxに登録されます。

作業コードは、以下の通りです - 単に最後までスプリッタは背圧の問題を修正した直後から、ログ文を移動:私は春の統合GitHubの上で問題2302を開いた

IntegrationFlows.from(inputChannel) 
.filter(Message.class, msg -> msg.getHeaders().containsKey(FILE_TYPE_HEADER)) 
.handle(new GzipToFluxTransformer(...)) 
.transform((Flux<String> payload) -> payload 
     .onBackpressureBuffer(getOnBackpressureBufferSize(), 
       s -> log.error("Buffer overrun!"))) 
.split() 
.channel(c -> c.flux(outputChannel)) 
.log(LoggingHandler.Level.DEBUG, CLASS_NAME, Message::getHeaders) 
.get(); 

関連する問題