私は「ウォルマート」を訪れたが「ベストバイ」を見た人のIDが必要ないように、スパーク(スカラ)を使ってソートする必要がある以下のデータを持っています。店舗を何度も訪れることができるので、店舗は繰り返している可能性があります。spark-shellのデータをscalaでフィルタリングする方法は?
入力データ:
ID、店舗
1、ウォルマート
1、ウォルマート
1はBestBuy
2、ターゲット
3、ウォルマート
4、はBestBuy
予想される出力: 3は、ウォルマート
は、私は、出力データフレームを使用して火花コンテキストにSQLクエリを実行しているを持っています。しかし、これを行うには、groupByKey
/reduceByKey
などのdataFramesを使用しないなどの方法があります。誰かがコードで私を助けることができます地図の後に - groupByKey
、ShuffleRDD
が形成されており、私はCompactBuffer
のフィルタリングに困難に直面している!
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD
case class Person(id: Int, store: String)
val people = sc.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(p => Person(p(1)trim.toInt, p(1)))
people.registerTempTable("people")
val result = sqlContext.sql("select id, store from people left semi join (select id from people where store in('Walmart','Bestbuy') group by id having count(distinct store)=1) sample on people.id=sample.id and people.url='Walmart'")
が
私が今しようとしていますコードはこれですが、私は第三の工程の後に打たれています:
私はsqlContext
を使用してそれを得ているとのコードは以下の通りです
val data = sc.textFile("examples/src/main/resources/people.txt")
.map(x=> (x.split(",")(0),x.split(",")(1)))
.filter(!_.filter("id"))
val dataGroup = data.groupByKey()
val dataFiltered = dataGroup.map{case (x,y) =>
val url = y.flatMap(x=> x.split(",")).toList
if (!url.contains("Bestbuy") && url.contains("Walmart")){
x.map(x=> (x,y))}}
dataFiltered.collect()を実行している場合、 配列[任意] =配列(ベクター((3、Walmart))、()、())
ががちょうどRDD.filter
を使用し、RDDをフィルタリングするには、このステップ
を働きましたあなたが試したコードを表示した方が簡単です(最小限で完全で検証可能な例を参照)。 –
@TzachZohar:私は質問のコードを更新しました。それで。ありがとう! dataFiletered.collect()。foreachの(printlnのは)私に ベクトル((2、abc.com)) () を(与え – bigdataenthusiast
) が、私は=> 3のみ必要、ウォルマート – bigdataenthusiast