スパークストリーミングでは、各バッチが処理される前にdbを照会し、その結果をシリアル化してネットワーク経由でエグゼキュータに送ることができます。spachストリーミングでforeachRDD内のDB接続を使用する
class ExecutingClass implements Serializable {
init(DB db) {
try(JavaStreamingContext jsc = new JavaStreamingContext(...)) {
JavaPairInputDStream<String,String> kafkaStream = getKafkaStream(jsc);
kafkaStream.foreachRDD(rdd -> {
// this part is supposed to execute in the driver
Map<String, String> indexMap = db.getIndexMap();// connects to a db, queries the results as a map
JavaRDD<String> results = processRDD(rdd, indexMap);
...
}
}
JavaRDD<String> processRDD(JavaPairRDD<String, String> rdd, Map<String,String> indexMap) {
...
}
}
上記のコードでは、ドライバでindexMapを初期化し、結果のマップをrddの処理に使用しています。 foreachRDDクロージャの外側でindexMapを宣言しても問題はありませんが、内部で行うとシリアル化エラーが発生します。これの理由は何ですか?
私がこのようなことをしたい理由は、バッチごとにデータベースから最新の値を取得するためです。私はこれがforeachRDDの閉鎖のため閉鎖外のすべてを直列化しようとしていることが原因だと考えています。
なぜ、この目的のためにアキュムレータ(読み取り書き込み)/ブロードキャスト(読み取り専用)を使用できないのですか?この場合、読み書きアキュムレータは意味がありませんから? –
クロージャ内のコードはシリアライズされ、エグゼキュータに送られます。ですから、 'db.getIndexMap()'はこの目的のためにシリアライズ可能ではないと思います。 – LiMuBei
@LiMuBeiそれはキャッチだ。データの各バッチに対して、最初にデータベースにクエリを実行してindexMapを取得し、処理のためにindexMapだけを渡します。 –