2016-08-14 18 views
0

私は「ウォルマート」を訪れたが「ベストバイ」を見た人のIDが必要ないように、スパーク(スカラ)を使ってソートする必要がある以下のデータを持っています。店舗を何度も訪れることができるので、店舗は繰り返している可能性があります。spark-shellのデータをscalaでフィルタリングする方法は?

入力データ:

ID、店舗

1、ウォルマート

1、ウォルマート

1はBestBuy

2、ターゲット

3、ウォルマート

4、はBestBuy

予想される出力: 3は、ウォルマート

は、私は、出力データフレームを使用して火花コンテキストにSQLクエリを実行しているを持っています。しかし、これを行うには、groupByKey/reduceByKeyなどのdataFramesを使用しないなどの方法があります。誰かがコードで私を助けることができます地図の後に - groupByKeyShuffleRDDが形成されており、私はCompactBufferのフィルタリングに困難に直面している!

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
import sqlContext.createSchemaRDD 

case class Person(id: Int, store: String) 

val people = sc.textFile("examples/src/main/resources/people.txt") 
       .map(_.split(",")) 
       .map(p => Person(p(1)trim.toInt, p(1))) 
people.registerTempTable("people") 

val result = sqlContext.sql("select id, store from people left semi join (select id from people where store in('Walmart','Bestbuy') group by id having count(distinct store)=1) sample on people.id=sample.id and people.url='Walmart'") 

私が今しようとしていますコードはこれですが、私は第三の工程の後に打たれています:

私はsqlContextを使用してそれを得ているとのコードは以下の通りです

val data = sc.textFile("examples/src/main/resources/people.txt") 
      .map(x=> (x.split(",")(0),x.split(",")(1))) 
      .filter(!_.filter("id")) 
val dataGroup = data.groupByKey() 
val dataFiltered = dataGroup.map{case (x,y) => 
    val url = y.flatMap(x=> x.split(",")).toList 
    if (!url.contains("Bestbuy") && url.contains("Walmart")){ 
     x.map(x=> (x,y))}} 

dataFiltered.collect()を実行している場合、 配列[任意] =配列(ベクター((3、Walmart))、()、())

がちょうどRDD.filterを使用し、RDDをフィルタリングするには、このステップ

+2

を働きましたあなたが試したコードを表示した方が簡単です(最小限で完全で検証可能な例を参照)。 –

+0

@TzachZohar:私は質問のコードを更新しました。それで。ありがとう! dataFiletered.collect()。foreachの(printlnのは)私に ベクトル((2、abc.com)) () を(与え – bigdataenthusiast

+0

) が、私は=> 3のみ必要、ウォルマート – bigdataenthusiast

答えて

0

後に出力を抽出する方法を私を助けてください:

val dataGroup = data.groupByKey() 

val dataFiltered = dataGroup.filter { 
    // keep only lists that contain Walmart but do not contain Bestbuy: 
    case (x, y) => val l = y.toList; l.contains("Walmart") && !l.contains("Bestbuy") 
} 

dataFiltered.foreach(println) // prints: (3,CompactBuffer(Walmart)) 

// if you want to flatten this back to tuples of (id, store): 
val result = dataFiltered.flatMap { case (id, stores) => stores.map(store => (id, store)) } 

result.foreach(println) // prints: (3, Walmart) 
+0

はありがとうございました@TzachZohar 、それは完全に働いた。私は不必要に複雑なことがありますが、RDD – bigdataenthusiast

0

を、私もそれを別の方法を試してみましたが、それは

val data = sc.textFile("examples/src/main/resources/people.txt") 
    .filter(!_.filter("id")) 
     .map(x=> (x.split(",")(0),x.split(",")(1))) 
data.cache() 
val dataWalmart = data.filter{case (x,y) => y.contains("Walmart")}.distinct() 
val dataBestbuy = data.filter{case (x,y) => y.contains("Bestbuy")}.distinct() 
val result = dataWalmart.subtractByKey(dataBestbuy) 
data.uncache() 
関連する問題