2016-08-01 8 views
0

Flipで処理されたワードカウントをkafkaトピックに書き込もうとしています。以下のようにコードのエラーです---Flink - Datastream型変換 - エラー

"方法addSink(SinkFunction>)型の中でDataStream>引数には適用されません(FlinkKafkaProducer09)"

-

DataStream<Tuple2<String, Integer>> stream1 = stream.flatMap(new LineSplitter()) 
    .keyBy(0).sum(1); 
stream1.print(); 
env.execute(); 

stream1.addSink(new FlinkKafkaProducer09<String>("WCFKTopic", new SimpleStringSchema(), 
    properties)); 
env.execute(); 

感謝あらかじめ!

+0

エラー - 「方法addSink(SinkFunction >)タイプでDataStream >引数には適用されません(FlinkKafkaProducer09 ) " – user6663136

+0

行番号を含む例外の完全なスタックトレースで最小限の作業サンプルコードを表示すると良いでしょう。 – AKSW

答えて

0

シンク関数の入力タイプがstream1のタイプと一致しません。

これを試してみてください:

new FlinkKafkaProducer09<Tuple2<String, Integer>>(XXX); 
+0

これはうまくいきましたが、引数2、つまり新しいSimpleStringSchema()をキャストする必要がありましたが、実行時エラーが発生しました。別の例外が発生しました。 - スレッド "main"の例外java.lang.ClassCastException:org.apache.flink.stream.serialization.SimpleStringSchema org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaにキャストすることはできません – user6663136

+0

モトはカフカのトピックに単語数を書きます。 – user6663136

+0

行番号を含む完全なスタックトレースを使って最小限の作業サンプルコードを表示すると、私は良いでしょう。 – AKSW

関連する問題