KafkaUtilsを使用してKafkaからデータを受信するスパークストリーミングアプリケーションを作成します。私がKafkaから受け取ったデータを出力することです。ここに私のコードです(私が使用して私のスパークストリーミングジョブを実行するために火花を提出):私はこれを実行すると、それはかなりうまく動作しますスパークストリーミングでコンソールにRDD出力
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
messages.print()
。入力はカフカのプロデューサーで、A、B、Cであれば、私は以下のようにスパークストリーミングから結果を得ることができます。
Time: 1476481700000 ms
-------------------------------------------
(null,a)
(null,b)
(null,c)
をしかし、私は行数をカウントする1行を追加した場合、messages.print()
が動作することはできません。コードは以下の通りである:私は以下の結果を得ています
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
messages.print()
messages.count().print()
:
-------------------------------------------
Time: 1476481800000 ms
-------------------------------------------
4
数のみがプリントアウト取得し、データをプリントアウトすることができません数えます。 messages.print()
は私がmessages.count.print()
を追加した後に実行されない理由です。
もう1つの疑問は、nullがタプル(null, a)(null, b)(null, c)
の中にあることです。
と最終目標を達成することができます。彼らはプリントアウトされていますが、私は前にそれらを見ませんでした。どうも – Frankie