2017-10-25 7 views
2

重複しているエントリのすべてのレコードを削除したいのですが、タイムスタンプの違いはオフセットの時間の長さになる可能性がありますが、簡単にするためには2分を使用します。閉じるタイムスタンプのエントリを削除する

+-------------------+-----+----+ 
|Date    |ColA |ColB| 
+-------------------+-----+----+ 
|2017-07-04 18:50:21|ABC |DEF | 
|2017-07-04 18:50:26|ABC |DEF | 
|2017-07-04 18:50:21|ABC |KLM | 
+-------------------+-----+----+ 

私はこのような何かを試してみましたが、これは重複を削除しない行のみ

+-------------------+-----+----+ 
|Date    |ColA |ColB| 
+-------------------+-----+----+ 
|2017-07-04 18:50:26|ABC |DEF | 
|2017-07-04 18:50:21|ABC |KLM | 
+-------------------+-----+----+ 

を持っている私のデータフレームをしたいと思います。今の

val joinedDfNoDuplicates = joinedDFTransformed.as("df1").join(joinedDFTransformed.as("df2"), col("df1.ColA") === col("df2.ColA") && 
     col("df1.ColB") === col("df2.ColB") && 
     && abs(unix_timestamp(col("Date")) - unix_timestamp(col("Date"))) > offset 
    ) 

、私はちょうど特定の列に基づいて、ここでFind minimum for a timestamp through Spark groupBy dataframeデータの分で明確なまたはグループを選択するのですが、私は、より堅牢なソリューションをしたいと思います。この理由は、その間隔の外のデータがあり得ることです有効なデータ。また、オフセットは要件に応じて5秒または5分以内に変更することができます。

誰かが、日付を比較するUDFの作成について言及しましたが、他のすべての列が同じである場合は、私は行をフィルタリングしたりフラグを追加してそれらの行を削除する助けていただければ幸いです。

ここに似たSQL質問Duplicate entries with different timestamp

ありがとう!

答えて

5

私はこのようにそれを行うだろう:

  1. はダミーカラムに日付に注文するウィンドウを定義します。
  2. ダミー列を追加し、それに定数値を追加します。
  3. 前のレコードの日付を含む新しい列を追加します。
  4. 日付と前日の差を計算します。
  5. 差異の値に基づいてレコードをフィルタリングします。あなたは時間が近いかもしれないレコードのペアを持っている場合、これ以上の解決策が有効である

    import org.apache.spark.sql.functions._ 
    import org.apache.spark.sql.expressions._ 
    
    val w = Window.partitionBy("dummy").orderBy("Date") // step 1 
    
    df.withColumn("dummy", lit(1)) // this is step 1 
        .withColumn("previousDate", lag($"Date", 1) over w) // step 2 
        .withColumn("difference", unix_timestamp($"Date") - unix_timestamp("previousDate")) // step 3 
    

コードは以下のようなものにすることができます。 3つ以上のレコードがある場合、各レコードをウィンドウ内の最初のレコード(前のレコードではない)と比較できるので、lag($"Date",1)の代わりにfirst($"Date")を使用します。この場合、「差異」列には、現在のレコードとウィンドウ内の最初のレコードとの間の時間差が含まれます。

関連する問題