私には数日間気になる問題があります。SparkAppをDockerコンテナから別のコンテナで実行しているSparkに対して実行しているときにエラーが発生しました
私は、Sparkがスタンドアロンモードで動作する場所にSparkドッカーコンテナを作成しました。マスターとワーカーの両方がそこから始まります。これはAzureで実行されているマシンです。
私はSparkマスタURLとSparkに接続するのに必要なものを渡す別のコンテナ(同じマシン)にSpark Scala Appを配備しようとしました。接続はシームレスです。私が遭遇した
まず問題があった。
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.1.0.4): java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaRDDPartition
は、その後、私は私のアプリのJARファイルと一緒にフォルダに入れて、スパークを除いて私の依存関係のフォルダを作り、SparkConf.setJars
を使用してSparkConfに追加し、
今、奇妙なことが起こります。これより
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.1.0.4): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
より、私はちょうどjava -cp <dependencies(including spark jars) cp> myApp.jar
を使用して、私のローカルマシンからScalaのアプリを実行する場合それは完全に動作し、ジョブは正常に実行されます。
私はSPARK_HOME
をローカルに持っていません。setJars
は、基本的に空のリストを取ります。私はそれを使用せず、それでも動作します。 私はアプリケーションを実行するときにclasspathで提供されるjarファイルを使用していると思います。何も提供する必要はありません。
皆さんに感謝したいと思っている人がいれば、これがうまくいかない理由を自分自身で説明することはできません。今まではSparkの配備は行っていませんでした。私は主に埋め込まれたスパークで走った。
Sparkは、ドッカーコンテナで動作しているものと同じバージョン(2.0.0)です。私が使用し :
Scalaの2.11.7を私のアプリのために
のJava 1.8両容器(アプリ、スパーク)
にここに要求されたように私のアプリのコード
val jars = Option(new File(Properties.spark_jars_path).listFiles()).toList.flatten.map(_.getAbsolutePath)
val conf = new SparkConf()
.setMaster(RunUtils.determineMasterUrl(Properties.mode))
.setAppName(RunUtils.SPARK_APP_NAME)
.setJars(jars)
.set("spark.cassandra.connection.host", Properties.cassandra_connection_host)
val ssc = new StreamingContext(conf, Seconds(1))
case class Result(buyDate: Timestamp, endDate: Timestamp, maxDate: Timestamp, buyAmount: Double, buyRate: Double)
def main(args: Array[String]): Unit = {
val DateFormatter = new java.text.SimpleDateFormat("yyyy-MM-dd")
val kafkaParams = Map("metadata.broker.list" -> Properties.kafka_brokers, "auto.offset.reset" -> Properties.kafka_auto_reset_offset)
//
// BITSTAMP
//
val bitstampTopic = Set("bitstamp_trades")
val bitstampStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, bitstampTopic)
val bitstampTradeStream = bitstampStream.map(_._2).map { trade =>
val jsonNode = JsonMapper.readTree(trade)
Trade(
"BITSTAMP",
"BTC_USD",
if(jsonNode.get("type").asInt() == 1) "SELL" else "BUY",
DateFormatter.format(new Date(jsonNode.get("timestamp").asLong() * 1000)),
new Date(jsonNode.get("timestamp").asLong() * 1000),
jsonNode.get("amount").asDouble(),
jsonNode.get("price").asDouble()
)
}
bitstampTradeStream.saveToCassandra("coin_master", "trades", SomeColumns(
"exchange_house",
"exchange_currencies",
"exchange_type",
"date",
"trade_time",
"amount",
"price")
)
ssc.start()
ssc.awaitTermination()
}
あなたは、あなたがやっているコードを提供することができmapparttitonsまたはコードどこにこのエラーが発生したと思われる? – VladoDemcak
もう一度質問を確認し、コードを追加しました。 –
okだから、ちょうど問題が私のマップ機能の中にあることに気づいた。私が解決するとき、私は答えを投稿するだろう。 –