2016-10-18 4 views
0

スパークストリーミングでは、各バッチが処理される前にdbを照会し、その結果をシリアル化してネットワーク経由でエグゼキュータに送ることができます。spachストリーミングでforeachRDD内のDB接続を使用する

class ExecutingClass implements Serializable { 
init(DB db) { 

    try(JavaStreamingContext jsc = new JavaStreamingContext(...)) { 

    JavaPairInputDStream<String,String> kafkaStream = getKafkaStream(jsc); 

    kafkaStream.foreachRDD(rdd -> { 
    // this part is supposed to execute in the driver 
    Map<String, String> indexMap = db.getIndexMap();// connects to a db, queries the results as a map 

    JavaRDD<String> results = processRDD(rdd, indexMap); 

    ... 

} 


    } 
    JavaRDD<String> processRDD(JavaPairRDD<String, String> rdd,  Map<String,String> indexMap) { 
... 
    } 
    } 

上記のコードでは、ドライバでindexMapを初期化し、結果のマップをrddの処理に使用しています。 foreachRDDクロージャの外側でindexMapを宣言しても問題はありませんが、内部で行うとシリアル化エラーが発生します。これの理由は何ですか?

私がこのようなことをしたい理由は、バッチごとにデータベースから最新の値を取得するためです。私はこれがforeachRDDの閉鎖のため閉鎖外のすべてを直列化しようとしていることが原因だと考えています。

+0

なぜ、この目的のためにアキュムレータ(読み取り書き込み)/ブロードキャスト(読み取り専用)を使用できないのですか?この場合、読み書きアキュムレータは意味がありませんから? –

+0

クロージャ内のコードはシリアライズされ、エグゼキュータに送られます。ですから、 'db.getIndexMap()'はこの目的のためにシリアライズ可能ではないと思います。 – LiMuBei

+0

@LiMuBeiそれはキャッチだ。データの各バッチに対して、最初にデータベースにクエリを実行してindexMapを取得し、処理のためにindexMapだけを渡します。 –

答えて

0

あなたは火花がデシベルをシリアル化しようとするので、これを回避するために、我々はあなたがオブジェクトを利用することができますforEachRdd内部のDB接続を作成する必要があります(または)forEachRdd内、(DBのインスタンスである)デシベルオブジェクトを使用しています以下の記事で説明するプール http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/

関連する問題