2017-01-30 9 views
0

私はSpark Streamingでグリップを取得しようとしていますが、難しかったです。ドキュメンテーションを読んで例を分析しても、私はドキュメントから理解できる唯一のものであるテキストファイル/ストリーム/ Kafkaキューで単語数以上のことをしたいと思います。Spark Streamingはワード数以外の何かを行うことができますか?

私は受信したカフカのメッセージストリームを聴き、メッセージをキーでグループ化して処理したいと考えています。以下のコードはプロセスの簡略版です。カフカからのメッセージのストリームを取得し、キーでメッセージをグループ化してメッセージキーでグループ化し、それを処理します。

JavaPairDStream<String, byte[]> groupByKeyList = kafkaStream.reduceByKey((bytes, bytes2) -> bytes); 

    groupByKeyList.foreachRDD(rdd -> { 
     List<MyThing> myThingsList = new ArrayList<>(); 
     MyCalculationCode myCalc = new MyCalculationCode(); 

     rdd.foreachPartition(partition -> { 
      while (partition.hasNext()) { 
       Tuple2<String, byte[]> keyAndMessage = partition.next(); 
       MyThing aSingleMyThing = MyThing.parseFrom(keyAndMessage._2); //parse from protobuffer format 
       myThingsList.add(aSingleMyThing); 
      } 
     }); 

     List<MyResult> results = myCalc.doTheStuff(myThingsList); 

     //other code here to write results to file 

    }); 

デバッグするとき、私はしばらく(partition.hasNext())myThingsListは、外forEachRDDで宣言List<MyThing> myThingsListとは異なるメモリアドレスを持っていることがわかります。

がリストの別のインスタンスであるため、List<MyResult> results = myCalc.doTheStuff(myThingsList);が呼び出されると、結果はありません。

私はこの問題を解決したいと思いますが、なぜこれが動作しないのか(予想通り)、どのように自分自身で解決できるのかを理解するのに役立つドキュメントを参考にしたいと思います。 Sparkドキュメンテーションの単一ページだけでなく、セクション/パラグラフ、または好ましくはまだ機能していないコメント付きコードでScalaの例を提供しない 'JavaDoc'へのリンク)。

答えて

1

異なるリストアドレスが表示される理由は、SparkがドライバでローカルにforeachPartitionを実行しないため、関数をシリアル化して、パーティションの処理を処理するExecutorで送信する必要があるためです。コードを使って作業しても、すべてが単一の場所で実行されるようなと感じるのですが、計算は実際に分散されています。

最初の問題は、コードに表示されるreduceByKeyと、2バイトの配列をとり、最初のコードを返すことです。これは本当にやりたいことですか?つまり、データの一部を効果的に削除していることを意味します。おそらくcombineByKeyを探しており、JavaPairDStream<String, List<byte[]>を返す可能性があります。あなたのいるProtobufの解析について

、あなたがforeachRDDを望んでいないように私には見えます、あなたがデータを解析するために、追加のmapを必要とする:

kafkaStream 
.combineByKey(/* implement logic */) 
.flatMap(x -> x._2) 
.map(proto -> MyThing.parseFrom(proto)) 
.map(myThing -> myCalc.doStuff(myThing)) 
.foreachRDD(/* After all the processing, do stuff with result */) 
関連する問題