私はSpark Streamingでグリップを取得しようとしていますが、難しかったです。ドキュメンテーションを読んで例を分析しても、私はドキュメントから理解できる唯一のものであるテキストファイル/ストリーム/ Kafkaキューで単語数以上のことをしたいと思います。Spark Streamingはワード数以外の何かを行うことができますか?
私は受信したカフカのメッセージストリームを聴き、メッセージをキーでグループ化して処理したいと考えています。以下のコードはプロセスの簡略版です。カフカからのメッセージのストリームを取得し、キーでメッセージをグループ化してメッセージキーでグループ化し、それを処理します。
JavaPairDStream<String, byte[]> groupByKeyList = kafkaStream.reduceByKey((bytes, bytes2) -> bytes);
groupByKeyList.foreachRDD(rdd -> {
List<MyThing> myThingsList = new ArrayList<>();
MyCalculationCode myCalc = new MyCalculationCode();
rdd.foreachPartition(partition -> {
while (partition.hasNext()) {
Tuple2<String, byte[]> keyAndMessage = partition.next();
MyThing aSingleMyThing = MyThing.parseFrom(keyAndMessage._2); //parse from protobuffer format
myThingsList.add(aSingleMyThing);
}
});
List<MyResult> results = myCalc.doTheStuff(myThingsList);
//other code here to write results to file
});
デバッグするとき、私はしばらく(partition.hasNext())
myThingsList
は、外forEachRDD
で宣言List<MyThing> myThingsList
とは異なるメモリアドレスを持っていることがわかります。
がリストの別のインスタンスであるため、List<MyResult> results = myCalc.doTheStuff(myThingsList);
が呼び出されると、結果はありません。
私はこの問題を解決したいと思いますが、なぜこれが動作しないのか(予想通り)、どのように自分自身で解決できるのかを理解するのに役立つドキュメントを参考にしたいと思います。 Sparkドキュメンテーションの単一ページだけでなく、セクション/パラグラフ、または好ましくはまだ機能していないコメント付きコードでScalaの例を提供しない 'JavaDoc'へのリンク)。