2017-06-26 13 views
0

Spark StreamingとKafkaを使用して、Webサーバーから受信したメッセージを取り込み、処理しようとしています。Scalaのカフカメッセージバイト配列の処理

https://github.com/dibbhatt/kafka-spark-consumer/blob/master/README.mdに記載されている消費者をテストして、追加機能を利用しています。

最初のステップとして、どのように再生されているかを確認するために提供されているサンプルを使用しようとしています。しかし、実際にペイロード内のデータを見るのは難しいです。

次関数の結果を見てみると

:私はそれがRDDSのコレクションを返す見ることができます

ReceiverLauncher.launch 

、タイプの各:

MessageAndMetadata[Array[Byte]] 

私はこの時点で立ち往生午前ドン」これを解析して実際のデータを見る方法を知っています。 Sparkに同梱されているコンシューマを使用するWeb上のすべてのサンプルは、イテレータオブジェクトを作成し、イテレータオブジェクトを作成し、データを処理します。しかし、このカスタムコンシューマから返されたオブジェクトは、どのイテレータインターフェイスでも開始できません。

RDDにはgetPayload()メソッドがありますが、そこからデータを取得する方法はわかりません。私が持っている

質問は以下のとおりです。

  1. は、この消費者が実際に本番環境に適していますか?その外観から、それが提供する機能とそれが提供する抽象化は非常に有望なようです。

  2. 誰もそれを試したことがありますか?誰もデータへのアクセス方法を知っていますか?予め

おかげで、

+0

最終的にはうまくいったようです。 getPayload()関数の結果をStringに変換して、実際のデータを印刷できるようになりました。私が思ったより簡単です:) – Moe

+0

代わりに受信ベースの実装を調べる代わりに、DirectKafkaConsumerを調べる必要があります。https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html – maasg

答えて

-1

getPayload()は、例えば文字列に変換する必要があります

new String(line.getPayload())