はい、確かにSparkでこれを行うことができます!あなたが問題に近づいたところで、それは実際にはもっと難しいように見えました。あなたが呼び出す
だから私はのは、あなたのデータを文字列に格納されて(ない引数(0)ファイル)を想定してみましょう、REPL例にフルコピーpastableを表示することができます
val data = """Customer1| item1:x1,x2,x3; item2:x1,x4,x5; item1:x1,x3,x6|time1|url
Customer1| item1:x1,x7,x3; item2:x1,x4,x5; item3:x5|time2|url2
Customer2| item1:x1,x7,x3; item3:x5|time3|url3"""
とRDD「ライン」することができますRDDの "rdd"には、
val rdd = sc.parallelize(data.split("\n"))
と書いてあります。次のステップは重要なものです。階層化して数えたり集約したりするのではなく、すべてのデータを一度に処理できるようにデータを準備することができます。これは、1つのマップに続いて1回の削減が行われるため、はるかに読みやすく、効率的です。
val mapped= rdd.flatMap(line => {
val arr = line.split("\\|")
val customer = arr(0)
val items = arr(1)
val time = arr(2)
val url = arr(3)
items.split(";").flatMap(item => {
val itemKey = item.split(":")(0)
val itemValues = item.split(":")(1).split(",")
itemValues.map(value => (customer, itemKey, value, time, url))
})
})
私たちは、この中に何があるか、我々は最後に、我々は数えると、あなたが必要とするベクターに減らすことができますmapped.toDF("customer", "itemId", "itemValue", "time", "url").show
+---------+------+---------+-----+----+
| customer|itemId|itemValue| time| url|
+---------+------+---------+-----+----+
|Customer1| item1| x1|time1| url|
|Customer1| item1| x2|time1| url|
|Customer1| item1| x3|time1| url|
|Customer1| item2| x1|time1| url|
|Customer1| item2| x4|time1| url|
|Customer1| item2| x5|time1| url|
|Customer1| item1| x1|time1| url|
|Customer1| item1| x3|time1| url|
|Customer1| item1| x6|time1| url|
|Customer1| item1| x1|time2|url2|
|Customer1| item1| x7|time2|url2|
|Customer1| item1| x3|time2|url2|
|Customer1| item2| x1|time2|url2|
|Customer1| item2| x4|time2|url2|
|Customer1| item2| x5|time2|url2|
|Customer1| item3| x5|time2|url2|
|Customer2| item1| x1|time3|url3|
|Customer2| item1| x7|time3|url3|
|Customer2| item1| x3|time3|url3|
|Customer2| item3| x5|time3|url3|
+---------+------+---------+-----+----+
とうまくそれを印刷することができます見ることができます:
val reduced = mapped.map{case (customer, itemKey, itemValue, time, url) => ((customer, itemKey, itemValue), 1)}.
reduceByKey(_+_).
map{case ((customer, itemKey, itemValue), count) => (customer, itemKey, itemValue, count)}
し、それを表示:reduced.toDF("customer", "itemKey", "itemValue", "count").show
+---------+-------+---------+-----+
| customer|itemKey|itemValue|count|
+---------+-------+---------+-----+
|Customer1| item1| x2| 1|
|Customer1| item1| x1| 3|
|Customer2| item1| x7| 1|
|Customer1| item1| x6| 1|
|Customer1| item1| x7| 1|
|Customer2| item1| x3| 1|
|Customer2| item3| x5| 1|
|Customer1| item2| x5| 2|
|Customer1| item2| x4| 2|
|Customer1| item2| x1| 2|
|Customer1| item3| x5| 1|
|Customer1| item1| x3| 3|
|Customer2| item1| x1| 1|
+---------+-------+---------+-----+
すべてをベクトルのArray/Seq表現にグループ化する必要がある場合は、データをさらに集約することでこれを行うことができます。お役に立てれば!
timeとurlが存在しない値もあります。この場合、Arr(2)とarr(3)はArrayIndexOutOfBoundsExceptionで失敗します。行を4列でフィルタリングすることが可能です。 filter(l => l.length == 4) URLと時間のないデータは無視できます。 –
これらの列をタプルから削除し、不要な列を削除するだけです。代わりに、 'scala.util.Try'をインポートして、これらの行を ' val time = Try(Some(arr(2)))。getOrElse(None) 'と ' val url = Try(Some(arr(3) ))))。getOrElse(None) ' –
これらの行に値が必要かどうかによって異なります。そうしないと、提案したようにフィルタリングすることができます。あなたがしている場合は、前のコメントを参照してください:) –