2017-06-30 10 views
0
Spark Version: 1.6.2. 

データソースがHDFSである一時テーブルが登録され、そのテーブルに対して2回クエリが実行されました。一時テーブルの複数のSQLが失敗しました

その後の仕事は、このエラーで失敗しました:

ERROR ApplicationMaster: User class threw exception:
java.io.IOException: Not a file: hdfs://my_server:8020/2017/01/01
java.io.IOException: Not a file: hdfs://my_server:8020/2017/01/01 at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:322) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:240)

トリッキーな部分は、唯一のクエリが実行された場合、そのジョブが成功しています。
Spark SQLを間違った方法で使用していますか?事前に

val rdd = sc.textFile("hdfs://my_server:8020/2017/*/*/*") 
val table = sqlc.read.json(rdd).cache() 

table.registerTempTable("my_table") 

sql(""" 
    | SELECT contentsId, 
    | SUM(CASE WHEN gender = 'M' then 1 else 0 end) 
    | FROM my_table 
    | GROUP BY contentsId 
    """.stripMargin) 
    .write.format("com.databricks.spark.csv") 
    .save("hdfs://my_server:8020/gender.csv") 

sql(""" 
    | SELECT contentsId, 
    | SUM(CASE WHEN age > 0 AND age < 20 then 1 else 0 end), 
    | SUM(CASE WHEN age >= 20 AND age < 30 then 1 else 0 end) 
    | FROM my_table 
    | GROUP BY contentsId 
    """.stripMargin) 
    .write.format("com.databricks.spark.csv") 
    .save("hdfs://my_server:8020/age.csv") 

ありがとう:

これは私のコードは次のようになります!

+0

なぜ2つの異なるデータフレームを1つの出力ファイルにgender.csvとして保存しようとしていますか? –

+0

エラーファイルを読み取れないことを示していますファイルではありません:hdfs:// my_server:8020/2017/01/01 < - ?それはファイルか空のディレクトリですか? – Bhavesh

+0

@ShankarKoirala私の間違い。それらを2つの異なるファイルに保存することは、私がしなければならないことです。私はクエリを編集しました。 – NaHeon

答えて

0

このようなファイルに対してのみフィルタを適用できます。

val filesRDD = rdd.filter{path => (new java.io.File(path).isFile)} 

これは、この

をRDD に含まれているすべてのディレクトリを削除しても、二回目のデータフレームの使用を節約します
sql(""" 
    | SELECT contentsId, 
    | SUM(CASE WHEN age > 0 AND age < 20 then 1 else 0 end), 
    | SUM(CASE WHEN age >= 20 AND age < 30 then 1 else 0 end) 
    | FROM my_table 
    | GROUP BY contentsId 
    """.stripMargin) 
    .write.format("com.databricks.spark.csv") 
    .mode("append") 
    .save("hdfs://my_server:8020/gender.csv") 

保存値が同じであるか、いくつかの異なるファイルに第二のデータフレームを格納しようとした場合

+0

フィルタリングパスは良いアイデアですが、2つのクエリが1つのジョブにあっても例外が発生するということです。 :-(時間を節約する宣言を追加できますか? – NaHeon