私はネイティブの英語のスピーカーではありませんが、私はできるだけ明確に質問を表明しています。 私は2日間私を混乱させるこの問題に遭遇しましたが、私はまだ解決策を見つけることができません。春の雲のデータフローでシンクコンポーネントがカフカで適切なデータを取得しない
Hadoop YARNのSpring Could Data Flowで実行されるストリームを構築しました。
ストリームは、HTTPソース、プロセッサ、およびファイルシンクで構成されています。
1.Httpソース
HTTPソースコンポーネントはapplication.propertiesで定義されたDEST1とDEST2 2つの異なる宛先との結合2つの出力チャネルを有しています。
spring.cloud.stream.bindings.output.destination = DEST1 spring.cloud.stream.bindings.output2.destination = DEST2以下
はあなたの参照のためのHTTPソースのコードsnipet ..です
@Autowired
private EssSource channels; //EssSource is the interface for multiple output channels
##output channel 1:
@RequestMapping(path = "/file", method = POST, consumes = {"text/*", "application/json"})
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
logger.info("enter ... handleRequest1...");
channels.output().send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
##output channel 2:
@RequestMapping(path = "/test", method = POST, consumes = {"text/*", "application/json"})
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest2(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
logger.info("enter ... handleRequest2...");
channels.output2().send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
2.プロセッサー
プロセッサは、異なる宛先との結合2つの複数の入力チャネルと2つの出力チャネルを有しています。 宛先バインディングは、プロセッサコンポーネントプロジェクトのapplication.propertiesで定義されています。
//input channel binding
spring.cloud.stream.bindings.input.destination=dest1
spring.cloud.stream.bindings.input2.destination=dest2
//output channel binding
spring.cloud.stream.bindings.output.destination=hdfsSink
spring.cloud.stream.bindings.output2.destination=fileSink
以下は、プロセッサのコードスニペットです。
@Transformer(inputChannel = EssProcessor.INPUT, outputChannel = EssProcessor.OUTPUT)
public Object transform(Message<?> message) {
logger.info("enter ...transform...");
return "processed by transform1";;
}
@Transformer(inputChannel = EssProcessor.INPUT_2, outputChannel = EssProcessor.OUTPUT_2)
public Object transform2(Message<?> message) {
logger.info("enter ... transform2...");
return "processed by transform2";
}
3.ファイルシンクコンポーネント。
私はSpringの公式のfil sinkコンポーネントを使用します。 maven://org.springframework.cloud.stream.app:file-sink-kafka:1.0.0.BUILD-SNAPSHOT
そして、そのアプリケーションのapplicaiton.propertiesファイルにバインディングを追加するだけです。 spring.cloud.stream.bindings.input.destination = fileSink
4.Finding:
私はこれが好きでなければならないと予想データフロー:
Source.handleRequestを() - >プロセッサ.handleRequest()
Source.handleRequest2() - > Processor.handleRequest2() - > Sink.fileWritingMessageHandler();
"transform2によって処理された"という文字列のみがファイルに保存されます。
しかし、私のテストの後、データ・フローは、このような実際です:
Source.handleRequest() - > Processor.handleRequest() - > Sink.fileWritingMessageHandler(); 012.HandleRequest2() - > Processor.handleRequest2()Sink。
Source.handleRequest2() - > Processor.handleRequest2() - > Sink。fileWritingMessageHandler();
"トランスフォーム1で処理されました"と "トランスフォーム2で処理されました"の両方の文字列がファイルに保存されます。
5.Question:Processor.handleRequest(出力チャネルの宛先が、
)はhdfsSink代わりにfileSinkに結合し、データはまだシンクをファイルに流れます。私はこれを理解することができず、これは私が望むものではありません。 私は、Processor.handleRequest2()からのデータだけを、両方の代わりにファイルシンクに渡したいと思います。 もし私がそれを正しくしなければ、どのようにして解決策を教えてもらえますか? 2日間私を混乱させました。
ご協力いただきありがとうございます。
アレックス
こんにちはマリウス、 お返事ありがとうございます。それは今働く。 ありがとうございます。それは本当に役立ちます。 –
ところで、今日の午後にYARNでテストを行ったところ、YARNのSCDFは/ dataflow/artifacts/cacheフォルダのAppsだけでなく、データベースのような場所にもキャッシュされているようです。 DBにアプリをキャッシュするかどうか不安です。 私はストリームを破棄してAppsを登録解除し、/ dataflow/artifacts/cacheフォルダ内のすべてのファイルを削除するので、YDKのSCDFは引き続きキャッシュされたAppsを使用して展開することがわかりました。 最後に、私は選択肢がありませんが、SCDFサーバを再起動すると動作します。 サーバーを再起動せずにキャッシュデータを完全にクリアする方法を知っていますか? ありがとう、 –
このセクション[docs](http://docs.spring.io/spring-cloud-dataflow-server-yarn/docs/1.0.1.RELEASE/reference/htmlsingle/#yarn-how-それは役に立つかもしれない)。キャッシュは '/ dataflow/artifacts/cache'と' hdfs'ディレクトリで行われるので、キャッシュをクリアするたびにサーバを再起動する必要があります。このワークフローへのフィードバックや改善をお寄せください。 –