2017-05-09 6 views
0

私のデータは次のようになります。スパーク利用reduceByKeyは

Customer1 | item1:x1、x2、x3; item2:x1、x4、x5; item1:x1、x3、x6 | time1 | url
カスタマー1 | item1:x1、x7、x3; item2:x1、x4、x5; item3:x5 | time2 | url2
カスタマー2 | item1:x1、x7、x3; ITEM3:X5 | TIME3 | URL3

私はそれぞれのcustomerIdのための個別の項目の和集合を取得するために同じのCustomerIDsとmapValuesをReduceByKeyたい:

Customer1 | item1:x1、x2、x3; item2:x1、x4、x5; item1:x1、x3、x6; item1:x1、x7、x3; ITEM3:X5

私はによって達成することができていますこの:

ヴァル・ライン= spark.sparkContext.textFile(引数(0))
ヴァルレコード= line.map(L => L reduceByKey((x、y)=> x.union(y))。mapValues(x => .split( "\ |")) x.distinct)

さて、私は2列目の各項目も同様ユニークになりたいと同じキーにあるすべての値のようなものを得るために、労働組合と明確なを使用して結合する必要があります。

カスタマー1 | item1:x1、x2、x3、x6、x7; item2:x1、x4、x5; ITEM3:はcustomerIdのために1 .... とX(1-10)のベクトルを更新:×1:2、×2、これは私が各xのすべての周波数、例えばを選択したい行われた後X5

私が得る周波数と一緒に。

これはスパークで達成できますか?

答えて

0

はい、確かにSparkでこれを行うことができます!あなたが問題に近づいたところで、それは実際にはもっと難しいように見えました。あなたが呼び出す

だから私はのは、あなたのデータを文字列に格納されて(ない引数(0)ファイル)を想定してみましょう、REPL例にフルコピーpastableを表示することができます

val data = """Customer1| item1:x1,x2,x3; item2:x1,x4,x5; item1:x1,x3,x6|time1|url 
Customer1| item1:x1,x7,x3; item2:x1,x4,x5; item3:x5|time2|url2 
Customer2| item1:x1,x7,x3; item3:x5|time3|url3""" 

とRDD「ライン」することができますRDDの "rdd"には、

val rdd = sc.parallelize(data.split("\n")) 

と書いてあります。次のステップは重要なものです。階層化して数えたり集約したりするのではなく、すべてのデータを一度に処理できるようにデータを準備することができます。これは、1つのマップに続いて1回の削減が行われるため、はるかに読みやすく、効率的です。

val mapped= rdd.flatMap(line => { 
    val arr = line.split("\\|") 
    val customer = arr(0) 
    val items = arr(1) 
    val time = arr(2) 
    val url = arr(3) 

    items.split(";").flatMap(item => { 
     val itemKey = item.split(":")(0) 
     val itemValues = item.split(":")(1).split(",") 

     itemValues.map(value => (customer, itemKey, value, time, url)) 
    }) 
}) 

私たちは、この中に何があるか、我々は最後に、我々は数えると、あなたが必要とするベクターに減らすことができますmapped.toDF("customer", "itemId", "itemValue", "time", "url").show

+---------+------+---------+-----+----+ 
| customer|itemId|itemValue| time| url| 
+---------+------+---------+-----+----+ 
|Customer1| item1|  x1|time1| url| 
|Customer1| item1|  x2|time1| url| 
|Customer1| item1|  x3|time1| url| 
|Customer1| item2|  x1|time1| url| 
|Customer1| item2|  x4|time1| url| 
|Customer1| item2|  x5|time1| url| 
|Customer1| item1|  x1|time1| url| 
|Customer1| item1|  x3|time1| url| 
|Customer1| item1|  x6|time1| url| 
|Customer1| item1|  x1|time2|url2| 
|Customer1| item1|  x7|time2|url2| 
|Customer1| item1|  x3|time2|url2| 
|Customer1| item2|  x1|time2|url2| 
|Customer1| item2|  x4|time2|url2| 
|Customer1| item2|  x5|time2|url2| 
|Customer1| item3|  x5|time2|url2| 
|Customer2| item1|  x1|time3|url3| 
|Customer2| item1|  x7|time3|url3| 
|Customer2| item1|  x3|time3|url3| 
|Customer2| item3|  x5|time3|url3| 
+---------+------+---------+-----+----+ 

とうまくそれを印刷することができます見ることができます:

val reduced = mapped.map{case (customer, itemKey, itemValue, time, url) => ((customer, itemKey, itemValue), 1)}. 
    reduceByKey(_+_). 
    map{case ((customer, itemKey, itemValue), count) => (customer, itemKey, itemValue, count)} 

し、それを表示:reduced.toDF("customer", "itemKey", "itemValue", "count").show

+---------+-------+---------+-----+            
| customer|itemKey|itemValue|count| 
+---------+-------+---------+-----+ 
|Customer1| item1|  x2| 1| 
|Customer1| item1|  x1| 3| 
|Customer2| item1|  x7| 1| 
|Customer1| item1|  x6| 1| 
|Customer1| item1|  x7| 1| 
|Customer2| item1|  x3| 1| 
|Customer2| item3|  x5| 1| 
|Customer1| item2|  x5| 2| 
|Customer1| item2|  x4| 2| 
|Customer1| item2|  x1| 2| 
|Customer1| item3|  x5| 1| 
|Customer1| item1|  x3| 3| 
|Customer2| item1|  x1| 1| 
+---------+-------+---------+-----+ 

すべてをベクトルのArray/Seq表現にグループ化する必要がある場合は、データをさらに集約することでこれを行うことができます。お役に立てれば!

+0

timeとurlが存在しない値もあります。この場合、Arr(2)とarr(3)はArrayIndexOutOfBoundsExceptionで失敗します。行を4列でフィルタリングすることが可能です。 filter(l => l.length == 4) URLと時間のないデータは無視できます。 –

+0

これらの列をタプルから削除し、不要な列を削除するだけです。代わりに、 'scala.util.Try'をインポートして、これらの行を ' val time = Try(Some(arr(2)))。getOrElse(None) 'と ' val url = Try(Some(arr(3) ))))。getOrElse(None) ' –

+0

これらの行に値が必要かどうかによって異なります。そうしないと、提案したようにフィルタリングすることができます。あなたがしている場合は、前のコメントを参照してください:) –

関連する問題