2017-12-07 11 views
0

私はHDFSに保存されているScalaとSparkの処理ファイルを使ってプロジェクトを進めています。それらのファイルは毎朝HDFSに上陸しています。私は毎日HDFSからそのファイルを読み取り、処理してHDFSに結果を書き込むという仕事をしています。ファイルをDataframeに変換した後、このジョブはフィルタを実行して、最後のファイル内で処理された最も高いタイムスタンプよりも高いタイムスタンプを含む行のみを取得します。このフィルタの動作は数日しか不明です。新しいファイルにそのフィルタと一致する行が含まれているにもかかわらず、ある日は期待通りに動作し、その他の日はフィルタ結果が空です。これは、同じファイルがTEST環境で実行されているが、同じHDFS接続を持つ同じファイルを使用してローカルで動作しているときに、同じファイルに対して常に発生します。スパークフィルタデータフレームが空の結果を返す

は、私はさまざまな方法でフィルタリングするために試みたが、その後はいずれも、いくつかの特定のファイルのためにその環境で動作していないが、その後のすべては私の地元では正常に動作: 1)スパークSQL

val diff = fp.spark.sql("select * from curr " + 
s"where TO_DATE(CAST(UNIX_TIMESTAMP(substring(${updtDtCol}, 
${substrStart},${substrEnd}),'${dateFormat}') as TIMESTAMP))" + 
s" > TO_DATE(CAST(UNIX_TIMESTAMP('${prevDate.substring(0,10)}' 
,'${dateFormat}') as TIMESTAMP))") 

2)スパークフィルタ関数

val diff = df.filter(date_format(unix_timestamp(substring(col(updtDtCol),0,10),dateFormat).cast("timestamp"),dateFormat).gt(date_format(unix_timestamp(substring(col("PrevDate"),0,10),dateFormat).cast("timestamp"),dateFormat))) 

3)フィルタの結果を用いて、余分な列を追加し、この新しい列でフィルタリング

val test2 = df.withColumn("PrevDate", lit(prevDate.substring(0,10))) 
     .withColumn("DatePre", date_format(unix_timestamp(substring(col("PrevDate"),0,10),dateFormat).cast("timestamp"),dateFormat)) 
     .withColumn("Result", date_format(unix_timestamp(substring(col(updtDtCol),0,10),dateFormat).cast("timestamp"),dateFormat).gt(date_format(unix_timestamp(substring(col("PrevDate"),0,10),dateFormat).cast("timestamp"),dateFormat))) 
     .withColumn("x", when(date_format(unix_timestamp(substring(col(updtDtCol),0,10),dateFormat).cast("timestamp"),dateFormat).gt(date_format(unix_timestamp(substring(col("PrevDate"),0,10),dateFormat).cast("timestamp"),dateFormat)), lit(1)).otherwise(lit(0))) 

val diff = test2.filter("x == 1") 

私はこの問題がフィルタそのものによっても、おそらくファイルによっても引き起こされないと思いますが、私は何を確認すべきか、誰かがこれに直面したかどうかについてのフィードバックを受けたいと思います。

フィードバックを受け取るには、ここに投稿するのに役立つ情報をお知らせください。

ファイルの例の一部には、以下のようになります。

|TIMESTAMP     |Result|x| 
|2017-11-30-06.46.41.288395|true |1| 
|2017-11-28-08.29.36.188395|false |0| 

TIMESTAMP値は(例えば:2017年11月29日)previousDateと比較され、私は「結果」と呼ばれる列を作成しますその比較の結果は常に同じ環境で 'x'と呼ばれる別の列でも動作します。

前述のように、両方の日付の間にコンパレータ関数を使用するか、結果 'または' x 'の結果をデータフレームにフィルタリングすると、結果は空のデータフレームになりますが、ファイルには、結果が含まれています。

答えて

0

私はそれがデータ/日付形式の問題であると思われます。変換された日付が期待どおりであるかどうか確認する機会を得ましたか?

両方の列の日付文字列にタイムゾーンが含まれる場合、その動作は予測可能です。

タイムゾーンが1つしか含まれていない場合は、ローカルとリモートで実行すると結果が異なります。それは完全にクラスタのタイムゾーンに依存します。

問題をデバッグするには、それぞれの日付文字列のunix_timestamp(..)/ millisを取得する列を追加し、2つの列の差分を取得する列を追加することをお勧めします。差分の列は、コンバージョンがどこに、なぜ間違っているのかを調べるのに役立ちます。お役に立てれば。

+0

良いアプローチですが、同じクエリや関数を使用して新しいカラムを作成し、結果が常に正常に機能するブール値として使用できることを確認しましたが、同じクエリや関数、さらにはデータフレームをフィルタリングする新しい列の値が動作しないことがあります。 –

関連する問題