2016-08-22 14 views
-2

を使用してApache Saprkにチェック履歴から2つのデータフレームを比較私は次のような構造でadataframeを持ってScalaの

EmployeeDF

id name date  code 
    1 John 2015-4-14 C11 
    2 Roy 2011-5-20 C11 
    3 John 2010-5-20 C11 
    4 John 2012-5-20 C10 

私がもし同じコードが2年前に同じ従業員に適用されていることを履歴を確認したいん。どうやってやるの。私はデータフレーム内に何百万ものデータを持っているだけのサンプルデータです。私はパフォーマンスを達成したいのです。行が繰り返されるので、データフレームに参加するとパフォーマンスが低下します。デカルトを作成し、自己結合中に行を複製します。私はマップのようなもので達成したい。

EDIT:現在のコード(OPのコメントから追加)

我々は歴史をチェックして、いくつかの従業員のみが存在する場合されているので、私は複数回繰り返され、それらの従業員を取得しています最初のステップで一度それはこの従業員のための歴史がないことを意味します。したがって、このステップのためのコードは次のとおりです。今、主なステップは以下の通りです

val repeatedEmpDF = SparkConfig 
    .sc 
    .sqlContext 
    .sql("SELECT *, '2' as level FROM cpeFirstStep WHERE e_id IN(SELECT e_id FROM cpeFirstStep where code = 'C11' " + " GROUP BY e_id HAVING COUNT(e_id)>1)") 
    .cache() 

val uniqueEmpDF = SparkConfig 
    .sc 
    .sqlContext 
    .sql("SELECT *, '1' as level FROM cpeFirstStep WHERE e_id IN(SELECT e_id FROM cpeFirstStep where code = 'C11' " + " GROUP BY e_id HAVING COUNT(e_id)=1)") 
    .cache() 

第二工程を繰り返し、コードはこれですされている従業員を取得することです

val historyJoin = SparkConfig 
    .sc 
    .sql("SELECT x.*, CASE WHEN y.code = x.code THEN '3' ELSE '4' END level FROM repeatedEmptDF X " + "LEFT JOIN repeatedEmptDF Y ON y.e_id = x.e_id AND y.code = x.code " + "AND y.date < x.data - INTERVAL 2 YEAR") 
+0

正確に2年前ですか? 2年前の意味は? '' code ''と '' id ''をグループ化し、日付の条件をチェックすることができます。 –

+0

あなたが書いたコードを提供してください。それは私たちがあなたを助けやすくするでしょう。 –

+0

@ Sarvesh Kumar Singh 2年以上前に同じ従業員に同じコードが適用された場合、この行にレベル1をマークし、それ以外の場合はレベル2としてマークします。 –

答えて

1

ですから、これを書くにはさまざまな方法がありますが、私があなたの例を正しく理解していると仮定すると、下のスパークコードはそのトリックを行います。あなたが与えたサンプルにいくつか余分なデータを追加しました。また、従業員のJohnが同じIDを持つべきだと仮定しています。だから私のテスト入力は次のようになります。

import org.joda.time.LocalDate 
val df = sc.parallelize(List((1, "John", new LocalDate(2015,4,14), "C11"),(2, "Roy", new LocalDate(2011,5,20), "C11"),(1, "John", new LocalDate(2010,5,20), "C11"),(1, "John", new LocalDate(2012,5,20), "C10"),(1, "John", new LocalDate(2013,1,14), "C11"))) 

そして、少なくとも2年間code同じを持っていた従業員の実際の識別のために:

df.map{case (id: Int, name: String, date: LocalDate, code: String) => ((id, name), List((date, code)))} 
    .reduceByKey(_++_) 
    .filter{case(_, listOfCodes) => listOfCodes.length >= 2} // Not interested in employees with only one code registered 
    .flatMapValues(list => { 
    def sameCodeForTwoYears(list: List[(LocalDate, String)]): List[(LocalDate, String)] = { 
     list match { 
     case x :: Nil => List.empty 
     case x :: xs => if (xs.head._1.minusYears(2).isAfter(x._1) && x._2 == xs.head._2) { 
      List(x, xs.head) 
     } else sameCodeForTwoYears(xs) 
     case Nil => List.empty 
     } 
    } 
    sameCodeForTwoYears(list.sortWith((left, right) => left._1.isBefore(right._1)))}) 
    .map{case((id, name),(date, code)) => (id, name, date, code)} 

この意志出力:

(1,John,2013-01-14,C11)               
(1,John,2015-04-14,C11) 

これはあなたが探していたものですか?

データセットにどのようなパフォーマンスが得られるかわかりませんが、これをSparkでどのように書くことができるのでしょうか。

+0

thanxあなたは正しい方法で私を導いてくれました。今私はRDD [(Int、String、LocalDate、String)]を持っていて、もし私がRDD [Row]を持っていればそれをケースクラスにマップします。私はそれが非常に基本的だが、私はそれが初めてです。 –

+0

喜んで助けてください。答えがあなたが探していたものなら、それを受け入れてください。そうすれば、私は助けを得ることができ、質問は「答えられていない質問」のリストから消えるでしょう:-) –

関連する問題