2017-04-11 13 views
0

私は、Z1:A、Z2:Bなどの値を含むCSVファイルをカンマで区切っています。 私はしたい: 1.各キーの周波数数でキー値を作成する(既にこの部分を完成している) 2. Zx(xは任意の整数とすることができる)を保持して、 )、コロンとすべてをフォローする(:Aなど)。文字列(Array [String、Int])から特定の文字を保持し、グループごとにアルゴリズムを適用します。

Thisは私のダミーファイルです(簡単にするため、Z1とZ2のみです)。

だから私は私の望ましい結果

val counts = example1.flatMap(line => line.split(",")).map(word => (word, 1)).reduceByKey(_+_).collect 

を得るために、私は.collectが必要であるかどうかわからないけど、それ、私はマップ-削減実行

val example1 = sc.textFile("/Users/....../Spark_stack/example_1.csv") 

にヴァルに私のCVSをロード私の "テーブル"から行全体または特定のセルを呼び出す唯一の方法です。

私は

counts.foreach(println) 

は私が得ることを印刷する場合:

scala> counts.foreach(println) 
(Z1:C,5) 
(Z1:E,3) 
(Z1:A,10) 
(Z2:B,2) 
(Z2:A,2) 
(Z1:D,4) 
(Z2:C,1) 
(Z1:B,24) 

は、私がようことを書き直したい:mapを使用してこれを行うには

(Z1,5) 
(Z1,3) 
(Z1,10) 
(Z2,2) 
(Z2,2) 
(Z1,4) 
(Z2,1) 
(Z1,24) 

可能な方法の1つであるとsubstring(0,2)

scala> counts.map(x => (x._1.substring(0,2),x._2)) 
res25: Array[(String, Int)] = Array((Z1,5), (Z1,3), (Z1,10), (Z2,2), (Z2,2), (Z1,4), (Z2,1), (Z1,24)) 

ここでの問題は、私は彼らの総数は、たとえばZ15のため、9以上であることを、いくつかのポイント非常に多くのZさんを与えられるかもしれないよりある:A、あるいはZ123:D

そこで私は、よりダイナミックな何かが必要その時点までに:がどこにあり、substringになるかを知ることができます。私の問題は、私がそれを書く方法を知らないということです。

私が呼び出す場合:

scala> counts(1)._1.indexOfSlice(":") 
res28: Int = 2 

私は:のposistionを取得し、それゆえ私は、このように適用することができます。

scala> counts(1)._1.substring(0,counts(1)._1.indexOfSlice(":")) 
res30: String = Z1 

が、私はドン; tは全体のカウントにそれを適用する方法を知っています単一の行だけではなく、私もforeachを試しましたが、それはうまくいきません。私はこれを行うたら、これは(ただし、単一Zxをのために働くように

が、私は逆の順序でソート何とか個々のZ1、Z2など

のためにそれを次のアルゴリズムを適用する必要があるので、私は第二でソートする必要があります列descと私の最初の列)

val sorted = counts.sortBy(_._2).reverse 

とそれぞれユニークZxを最終的には各Zxを用について-LOの整数(VAR hを取得するために、この

var h =0 
for (i <- 0 to (sorted.length-1)) { if (sorted(i)._2 >= i+1) { h = i+1;}} 

を適用するための上記のop)

申し訳ありません申し訳ありませんが複雑すぎる場合、私は全く新しいscala-sparkです。

答えて

0
counts.map(x => (x._1.substring(0, x._1.indexOf(":")), x._2)) 
0

まず、絶対にcollectを使用しないでください。これにより、すべてのデータがドライバに強制的に戻され、マシンを圧倒するでしょう.Sparkの代わりに従来のScalaコレクションを使用することができるデータがたくさんある場合を除き、私は列labelcountDataFrameRDDを変換してから出て解析するライブラリ関数substring_indexを使用してlabel列を変換するここで

import org.apache.spark.sql.functions._ 

sc.textFile("/Users/....../Spark_stack/example_1.csv") 
    .toDF("label","count") 
    .select(substring_index($"label", ":", 1).as("label"), $"count") 

それでは、代わりにDataFrame APIとfunctionsライブラリを使用してみましょう結腸の前に来るものあなたの代わりにRDD Sを使用する必要がある場合

、あなたは@sheunisが提案する何ができる(ただしRDDではなくcollectの結果に)、またはこの:あなたは100%正しい

sc.textFile("/Users/....../Spark_stack/example_1.csv").map { 
    case (label, count) => (label.split(":").head, count) 
} 
+0

、I私はRDDに固執する必要があり、データフレームを使用しないことを忘れています。あなたは正しいですが、.collectを使用すると1つの問題が解決され、さらに多くの問題が発生します。私は、各行が '(Z1、(10,5,3,2、..))'であるRDDを作成し、この値(実際にはリスト)にアルゴリズムを適用する必要があると思う。 –

+0

代わりに私が 'RDD'を使ってどのように作業するのかを知っていました。 – Vidya

関連する問題