チェックポイントを有効にしたスパークストリーミングジョブがあります。最初に正しく実行されますが、チェックポイントから再起動すると例外をスローします。チェックポイントを有効にしてdstreamとJDBCRDDに参加するにはどうすればよいですか?
org.apache.spark.SparkException:RDD変換およびアクションは だけでなく他の変換の内側に、運転者が呼び出すことができます。 rdd1.map変換の 内で値の変換とカウントアクションを実行できないため、rdd1.map(x => rdd2.values.count()* x)は無効です(例: )。詳細は、SPARK-5063を参照してください。 at org.apache.spark.rdd.RDD.org $ apache $ spark $ rdd $ RDD $$ sc(RDD.scala:87) at org.apache.spark.rdd.RDD.withScope(RDD.scala:352) ) at org.apache.spark.rdd.RDD.union(RDD.scala:565) at org.apache.spark.streaming.Repo $$ anonfun $ createContext $ 1.apply(Repo.scala:23) at org .apache.spark.streaming.Repo $$ anonfun $ createContext $ 1.apply(Repo.scala:19) at org.apache.spark.streaming.dstream.DStream $$ anonfun $ foreachRDD $ 1 $$ anonfun $ apply $ mcV $ sp $ 3.apply(DStream.scala:627)
この問題の回避策を提案してください。以下 サンプルアプリ:
String URL = "jdbc:oracle:thin:" + USERNAME + "/" + PWD + "@//" + CONNECTION_STRING;
Map<String, String> options = ImmutableMap.of(
"driver", "oracle.jdbc.driver.OracleDriver",
"url", URL,
"dbtable", "READINGS_10K",
"fetchSize", "10000");
DataFrame OracleDB_DF = sqlContext.load("jdbc", options);
JavaPairRDD<String, Row> OracleDB_RDD = OracleDB_DF.toJavaRDD()
.mapToPair(x -> new Tuple2(x.getString(0), x));
Dstream.transformToPair(rdd ->
rdd.mapToPair(record ->
new Tuple2<>(record.getKey().toString(), record))
.join(OracleDB_RDD)) // <-- PairRDD.join inside DStream transformation
.print();
スパークバージョン1.6、糸クラスタモードで実行されています。
私はすでに放送を使って試してみました私はまだ再起動時に同じ例外が発生しています。 foreach doesnt内の大きなデータベースを読むことは私の解決策のようです。 –