2016-10-11 31 views
0

私には数日間気になる問題があります。SparkAppをDockerコンテナから別のコンテナで実行しているSparkに対して実行しているときにエラーが発生しました

私は、Sparkがスタンドアロンモードで動作する場所にSparkドッカーコンテナを作成しました。マスターとワーカーの両方がそこから始まります。これはAzureで実行されているマシンです。

私はSparkマスタURLとSparkに接続するのに必要なものを渡す別のコンテナ(同じマシン)にSpark Scala Appを配備しようとしました。接続はシームレスです。私が遭遇した

まず問題があった。

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.1.0.4): java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaRDDPartition 

は、その後、私は私のアプリのJARファイルと一緒にフォルダに入れて、スパークを除いて私の依存関係のフォルダを作り、SparkConf.setJarsを使用してSparkConfに追加し、

今、奇妙なことが起こります。これより

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.1.0.4): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD 

より、私はちょうどjava -cp <dependencies(including spark jars) cp> myApp.jarを使用して、私のローカルマシンからScalaのアプリを実行する場合それは完全に動作し、ジョブは正常に実行されます。

私はSPARK_HOMEをローカルに持っていません。setJarsは、基本的に空のリストを取ります。私はそれを使用せず、それでも動作します。 私はアプリケーションを実行するときにclasspathで提供されるjarファイルを使用していると思います。何も提供する必要はありません。

皆さんに感謝したいと思っている人がいれば、これがうまくいかない理由を自分自身で説明することはできません。今まではSparkの配備は行っていませんでした。私は主に埋め込まれたスパークで走った。

Sparkは、ドッカーコンテナで動作しているものと同じバージョン(2.0.0)です。私が使用し :
Scalaの2.11.7を私のアプリのために
のJava 1.8両容器(アプリ、スパーク)

にここに要求されたように私のアプリのコード

val jars = Option(new File(Properties.spark_jars_path).listFiles()).toList.flatten.map(_.getAbsolutePath) 
    val conf = new SparkConf() 
    .setMaster(RunUtils.determineMasterUrl(Properties.mode)) 
    .setAppName(RunUtils.SPARK_APP_NAME) 
    .setJars(jars) 
    .set("spark.cassandra.connection.host", Properties.cassandra_connection_host) 

    val ssc = new StreamingContext(conf, Seconds(1)) 

    case class Result(buyDate: Timestamp, endDate: Timestamp, maxDate: Timestamp, buyAmount: Double, buyRate: Double) 

    def main(args: Array[String]): Unit = { 

    val DateFormatter = new java.text.SimpleDateFormat("yyyy-MM-dd") 

    val kafkaParams = Map("metadata.broker.list" -> Properties.kafka_brokers, "auto.offset.reset" -> Properties.kafka_auto_reset_offset) 

    // 
    // BITSTAMP 
    // 
    val bitstampTopic = Set("bitstamp_trades") 
    val bitstampStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, bitstampTopic) 
    val bitstampTradeStream = bitstampStream.map(_._2).map { trade => 
     val jsonNode = JsonMapper.readTree(trade) 
     Trade(
     "BITSTAMP", 
     "BTC_USD", 
     if(jsonNode.get("type").asInt() == 1) "SELL" else "BUY", 
     DateFormatter.format(new Date(jsonNode.get("timestamp").asLong() * 1000)), 
     new Date(jsonNode.get("timestamp").asLong() * 1000), 
     jsonNode.get("amount").asDouble(), 
     jsonNode.get("price").asDouble() 
    ) 
    } 

    bitstampTradeStream.saveToCassandra("coin_master", "trades", SomeColumns(
     "exchange_house", 
     "exchange_currencies", 
     "exchange_type", 
     "date", 
     "trade_time", 
     "amount", 
     "price") 
    ) 
    ssc.start() 
    ssc.awaitTermination() 
    } 
+0

あなたは、あなたがやっているコードを提供することができmapparttitonsまたはコードどこにこのエラーが発生したと思われる? – VladoDemcak

+0

もう一度質問を確認し、コードを追加しました。 –

+0

okだから、ちょうど問題が私のマップ機能の中にあることに気づいた。私が解決するとき、私は答えを投稿するだろう。 –

答えて

0

オクラホマので、問題あり私の地図機能でした。

具体的には、この行が問題だった:

val jsonNode = JsonMapper.readTree(trade) 

JsonMapperは私が原因で、各エグゼキュータに呼ばれなければならなかったいくつかの方法のsparkContext.broadcastを使用している必要があり、実際にジャクソンライブラリから構成されObjectMapperであり、それが動作するように。

あなたはそれがここで働いていなかった理由についての詳細を読むことができます、だから、 Spark: broadcasting jackson ObjectMapper

それが働いていたこのようなものに自分のコードを変更した後:

val broadcastValue = ssc.sparkContext.broadcast(JsonMapper) 

val kafkaParams = Map("metadata.broker.list" -> Properties.kafka_brokers, "auto.offset.reset" -> Properties.kafka_auto_reset_offset) 

// 
// BITSTAMP 
// 
val bitstampTopic = Set("bitstamp_trades") 
val bitstampStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, bitstampTopic) 
val bitstampTradeStream = bitstampStream.map(_._2).map { trade => 
    broadcastValue.value.registerModule(new DefaultScalaModule with RequiredPropertiesSchemaModule) 
    val jsonNode = broadcastValue.value.readTree(trade) 
    Trade(
    "BITSTAMP", 
    "BTC_USD", 
    if(jsonNode.get("type").asInt() == 1) "SELL" else "BUY", 
    DateFormatter.format(new Date(jsonNode.get("timestamp").asLong() * 1000)), 
    new Date(jsonNode.get("timestamp").asLong() * 1000), 
    jsonNode.get("amount").asDouble(), 
    jsonNode.get("price").asDouble() 
) 
} 
関連する問題