3

私はネイティブの英語のスピーカーではありませんが、私はできるだけ明確に質問を表明しています。 私は2日間私を混乱させるこの問題に遭遇しましたが、私はまだ解決策を見つけることができません。春の雲のデータフローでシンクコンポーネントがカフカで適切なデータを取得しない

Hadoop YARNのSpring Could Data Flowで実行されるストリームを構築しました。

ストリームは、HTTPソース、プロセッサ、およびファイルシンクで構成されています。

1.Httpソース
HTTPソースコンポーネントはapplication.propertiesで定義されたDEST1とDEST2 2つの異なる宛先との結合2つの出力チャネルを有しています。

spring.cloud.stream.bindings.output.destination = DEST1 spring.cloud.stream.bindings.output2.destination = DEST2以下

はあなたの参照のためのHTTPソースのコードsnipet ..です

@Autowired 
    private EssSource channels; //EssSource is the interface for multiple output channels 

##output channel 1: 
    @RequestMapping(path = "/file", method = POST, consumes = {"text/*", "application/json"}) 
    @ResponseStatus(HttpStatus.ACCEPTED) 
    public void handleRequest(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) { 
     logger.info("enter ... handleRequest1..."); 
     channels.output().send(MessageBuilder.createMessage(body, 
       new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType)))); 
    } 

##output channel 2: 
    @RequestMapping(path = "/test", method = POST, consumes = {"text/*", "application/json"}) 
    @ResponseStatus(HttpStatus.ACCEPTED) 
    public void handleRequest2(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) { 
     logger.info("enter ... handleRequest2..."); 
     channels.output2().send(MessageBuilder.createMessage(body, 
       new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType)))); 
    } 

2.プロセッサー
プロセッサは、異なる宛先との結合2つの複数の入力チャネルと2つの出力チャネルを有しています。 宛先バインディングは、プロセッサコンポーネントプロジェクトのapplication.propertiesで定義されています。

//input channel binding 
spring.cloud.stream.bindings.input.destination=dest1 
spring.cloud.stream.bindings.input2.destination=dest2 

//output channel binding 
spring.cloud.stream.bindings.output.destination=hdfsSink 
spring.cloud.stream.bindings.output2.destination=fileSink 

以下は、プロセッサのコードスニペットです。

@Transformer(inputChannel = EssProcessor.INPUT, outputChannel = EssProcessor.OUTPUT) 
    public Object transform(Message<?> message) { 
     logger.info("enter ...transform..."); 

     return "processed by transform1";; 
    } 


    @Transformer(inputChannel = EssProcessor.INPUT_2, outputChannel = EssProcessor.OUTPUT_2) 
    public Object transform2(Message<?> message) { 
     logger.info("enter ... transform2..."); 
     return "processed by transform2"; 
    } 

3.ファイルシンクコンポーネント。

私はSpringの公式のfil sinkコンポーネントを使用します。 maven://org.springframework.cloud.stream.app:file-sink-kafka:1.0.0.BUILD-SNAPSHOT

そして、そのアプリケーションのapplicaiton.propertiesファイルにバインディングを追加するだけです。 spring.cloud.stream.bindings.input.destination = fileSink

4.Finding:

私はこれが好きでなければならないと予想データフロー:

Source.handleRequestを() - >プロセッサ.handleRequest()

Source.handleRequest2() - > Processor.handleRequest2() - > Sink.fileWritingMessageHandler();

"transform2によって処理された"という文字列のみがファイルに保存されます。

しかし、私のテストの後、データ・フローは、このような実際です:

Source.handleRequest() - > Processor.handleRequest() - > Sink.fileWritingMessageHandler(); 012.HandleRequest2() - > Processor.handleRequest2()Sink。

Source.handleRequest2() - > Processor.handleRequest2() - > Sink。fileWritingMessageHandler();

"トランスフォーム1で処理されました"と "トランスフォーム2で処理されました"の両方の文字列がファイルに保存されます。

5.Question:Processor.handleRequest(出力チャネルの宛先が、

)はhdfsSink代わりにfileSinkに結合し、データはまだシンクをファイルに流れます。私はこれを理解することができず、これは私が望むものではありません。 私は、Processor.handleRequest2()からのデータだけを、両方の代わりにファイルシンクに渡したいと思います。 もし私がそれを正しくしなければ、どのようにして解決策を教えてもらえますか? 2日間私を混乱させました。

ご協力いただきありがとうございます。

アレックス

答えて

2

は(「-2」のバージョンが複数のチャネルを持つものです)このようなあなたのストリーム定義の何かか?

http-source-2 | processor-2 | file-sink 

春クラウドデータフロープロセッサのためのspring.cloud.stream.bindings.output.destinationhdfs-sinkに設定されていても、それが実際にfile-sinkの入力にマッチします、なぜ、あるapplications.propertiesで定義された宛先を上書きすることに注意してください。

宛先がストリーム定義から構成されている方法は、(タップの文脈で)ここで説明されて:あなたは何ができるかhttp://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#spring-cloud-dataflow-stream-tap-dsl

は単にチャンネル1と2の意味を交換することである - のためのサイドチャネルを使用しますhdfs。これはやや脆いですが、ストリームのinput/outputチャンネルは自動的に設定され、他のチャンネルはapplication.propertiesで設定されます。この場合、ストリーム定義または配備時にサイドチャンネルの送信先を設定する方がよい場合があります - http://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#_application_propertiesを参照してください。

これらは、通常のコンポーネントを使用して別々のエンドポイントをリッスンしている2つのストリームでもかまいません。つまり、データが並んで流れると仮定します。

+0

こんにちはマリウス、 お返事ありがとうございます。それは今働く。 ありがとうございます。それは本当に役立ちます。 –

+0

ところで、今日の午後にYARNでテストを行ったところ、YARNのSCDFは/ dataflow/artifacts/cacheフォルダのAppsだけでなく、データベースのような場所にもキャッシュされているようです。 DBにアプリをキャッシュするかどうか不安です。 私はストリームを破棄してAppsを登録解除し、/ dataflow/artifacts/cacheフォルダ内のすべてのファイルを削除するので、YDKのSCDFは引き続きキャッシュされたAppsを使用して展開することがわかりました。 最後に、私は選択肢がありませんが、SCDFサーバを再起動すると動作します。 サーバーを再起動せずにキャッシュデータを完全にクリアする方法を知っていますか? ありがとう、 –

+0

このセクション[docs](http://docs.spring.io/spring-cloud-dataflow-server-yarn/docs/1.0.1.RELEASE/reference/htmlsingle/#yarn-how-それは役に立つかもしれない)。キャッシュは '/ dataflow/artifacts/cache'と' hdfs'ディレクトリで行われるので、キャッシュをクリアするたびにサーバを再起動する必要があります。このワークフローへのフィードバックや改善をお寄せください。 –

関連する問題