2016-11-04 8 views
1

チェックポイントを有効にしたスパークストリーミングジョブがあります。最初に正しく実行されますが、チェックポイントから再起動すると例外をスローします。チェックポイントを有効にしてdstreamとJDBCRDDに参加するにはどうすればよいですか?

org.apache.spark.SparkException:RDD変換およびアクションは だけでなく他の変換の内側に、運転者が呼び出すことができます。 rdd1.map変換の 内で値の変換とカウントアクションを実行できないため、rdd1.map(x => rdd2.values.count()* x)は無効です(例: )。詳細は、SPARK-5063を参照してください。 at org.apache.spark.rdd.RDD.org $ apache $ spark $ rdd $ RDD $$ sc(RDD.scala:87) at org.apache.spark.rdd.RDD.withScope(RDD.scala:352) ) at org.apache.spark.rdd.RDD.union(RDD.scala:565) at org.apache.spark.streaming.Repo $$ anonfun $ createContext $ 1.apply(Repo.scala:23) at org .apache.spark.streaming.Repo $$ anonfun $ createContext $ 1.apply(Repo.scala:19) at org.apache.spark.streaming.dstream.DStream $$ anonfun $ foreachRDD $ 1 $$ anonfun $ apply $ mcV $ sp $ 3.apply(DStream.scala:627)

この問題の回避策を提案してください。以下 サンプルアプリ:

String URL = "jdbc:oracle:thin:" + USERNAME + "/" + PWD + "@//" + CONNECTION_STRING; 

Map<String, String> options = ImmutableMap.of(
    "driver", "oracle.jdbc.driver.OracleDriver", 
    "url", URL, 
    "dbtable", "READINGS_10K", 
    "fetchSize", "10000"); 

DataFrame OracleDB_DF = sqlContext.load("jdbc", options); 
JavaPairRDD<String, Row> OracleDB_RDD = OracleDB_DF.toJavaRDD() 
    .mapToPair(x -> new Tuple2(x.getString(0), x)); 

Dstream.transformToPair(rdd -> 
    rdd.mapToPair(record -> 
    new Tuple2<>(record.getKey().toString(), record)) 
    .join(OracleDB_RDD)) // <-- PairRDD.join inside DStream transformation 
.print(); 

スパークバージョン1.6、糸クラスタモードで実行されています。

答えて

0

私は質問から始めましょう。あなたは既に自分自身に依頼していたに違いないと確信しています。

OracleDB_RDDの値はどれくらいですか?

十分小さければ、ファクトテーブルとして機能し、最初にブロードキャストすることができます。それは、あなたの解決策を働かせるだけでなく効率的にするでしょう。

(これはSpark SQL 2.0を使用しているため、これと類似の質問は古くなりました。これはクエリオプティマイザの最適化のようなものです)。

それは大だ場合、あなたは(ConstantInputDStreamを参照)DataFrame and SQL Operationsで記載またはDStreams間の結合のためのRDDを返すために、独自のDSTREAMを作成すると(foreachアクション内のデータフレームを作成する必要があります。

+0

私はすでに放送を使って試してみました私はまだ再起動時に同じ例外が発生しています。 foreach doesnt内の大きなデータベースを読むことは私の解決策のようです。 –

関連する問題