2017-09-29 10 views
0

に基づいてスパークデータフレームを作成します以下のような:はI 2つのデータフレームを有する条件

id,location 
1,Canada 
10,Paris 
4,Berlin 
3,London 

私の問題は、私はlookupからLOCATION_IDとして新しいIDを取る必要があるとlocation_id場合correの古いIDを保持し、その後、idよりも異なっています(古いデータを維持するために)フラグ名を持つ場所をspondingし、フラグ名を持つ新しいIDを各場所のためにアクティブにします。したがって、ハイブの出力表は次のようになります。

location_id,location,flag 
1,Canada,active 
2,Paris,inactive 
10,Paris,active 
3,London,active 
4,Berlin,active 

最初に両方のフレームに参加しようとしました。これは、異なるIDを持っている位置の値を出力しますが、データフレームとしてこれらの値を節約しながら、私は

val joinedFrame = dataframe1.join(lookup, "location") 
val df_temp = joinedFrame.withColumn("flag1", when($"tag_id" === $"tag_number", "active").otherwise("inactive")) 
var count = 1 
df_temp.foreach(x => { 
    val flag1 = x.getAs[String]("flag1").toString 
    val flag = x.getAs[String]("flag").toString 
    val location_id = x.getAs[String]("location_id").toString 
    val location = x.getAs[String]("location").toString 
    val id = x.getAs[String]("id").toString 
    if ((count != 1)&&(flag1 != flag)){ 
    println("------not equal-------",flag1,"-------",flag,"---------",id,"---------",location,"--------",location_id) 
    val df_main = sc.parallelize(Seq((location_id, location,flag1), (id, location, flag))).toDF("location_id", "location", "flag") 
    df_main.show 
    df_main.write.insertInto("location_coords") 
    } 
    count += 1 
}) 

:として次に入社DFに、私はアクションを行っており、hive.I内のすべてのレコードを保存することが操作を試してみました例外取得:あなたのコメントに基づいて

not equal------inactive------active---10---------Paris---------2  
17/09/29 03:43:29 ERROR Executor: Exception in task 0.0 in stage 25.0 (TID 45) 
    java.lang.NullPointerException 
      at $line83.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:75) 
      at $line83.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:65) 
      at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) 
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) 
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
      at org.apache.spark.scheduler.Task.run(Task.scala:99) 
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
      at java.lang.Thread.run(Thread.java:748) 
    17/09/29 03:43:29 WARN TaskSetManager: Lost task 0.0 in stage 25.0 (TID 45, localhost, executor driver): java.lang.NullPointerException 
      at $line83.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:75) 
      at $line83.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:65) 
      at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) 
      at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:918) 
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1951) 
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
      at org.apache.spark.scheduler.Task.run(Task.scala:99) 
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
      at java.lang.Thread.run(Thread.java:748) 
+1

'foreach'を使ってすでにデータフレームをループしているときに' sc.parallelize'を使うことはできません。 – Shaido

+0

@Shaidoそれから可能な代替策は何でしょうか。ありがとう。 – Swati

+0

IDが変更された行のみを保存しますか? – Shaido

答えて

1

を、私は最も簡単な方法ではなく、IDにjoinを使用することだと思います。外部結合を実行すると、欠落している列はnullになります。これらの行は更新され、興味があります。

それ以降は、空の場合に位置列を更新するだけです同様にフラグ列として、(私は多少の列名を変更したことに注意してください)以下の私のコードを参照してください。この後

val spark = SparkSession.builder.getOrCreate() 
import spark.implicits._ 

val df = Seq((1,"Canada","active"),(2,"Paris","active"),(3,"London","active"),(4,"Berlin","active")) 
    .toDF("id", "location", "flag") 
val df2 = Seq((1,"Canada"),(10,"Paris"),(4,"Berlin"),(3,"London")) 
    .toDF("id", "location_new") 

val df3 = df.join(df2, Seq("id"), "outer") 
    .filter($"location".isNull or $"location_new".isNull) 
    .withColumn("location", when($"location_new".isNull, $"location").otherwise($"location_new")) 
    .withColumn("flag", when($"location" === $"location_new", "active").otherwise("inactive")) 
    .drop("location_new") 

> df3.show() 
+---+--------+--------+ 
| id|location| flag| 
+---+--------+--------+ 
| 10| Paris| active| 
| 2| Paris|inactive| 
+---+--------+--------+ 

をあなたはハイブテーブルを更新するには、この新しいデータフレームを使用することができます。

関連する問題