2016-12-06 8 views
0

こんにちは私は、行の最初の単語を特定して一意の値を作成し、それをRDDに追加するためにスカラーを使用しています。しかし、私はそれを行う方法を知らない。私はスカラに新しいので、この質問が不自由に聞こえる場合はご容赦ください。 私が試しているサンプルを以下に示します。RDDスパークで一意の値を追加する方法

サンプル:

OBR|1|METABOLIC PANEL 
OBX|1|Glucose 
OBX|2|BUN 
OBX|3|CREATININE 
OBR|2|RFLX TO VERIFICATION 
OBX|1|EGFR 
OBX|2|SODIUM 
OBR|3|AMBIGUOUS DEFAULT 
OBX|1|POTASSIUM 

私はユニークな価値を創造し、OBRにしての下にそれを追加したいよりも、それはOBRであれば、私は最初の単語がOBRであるかどうかチェックしたいですOBX私はもう一度OBRを探していました。しかし、どうやってこれをすることができますか?私はHDFSから私のデータを持ってきています。

期待される結果:私のコメントで述べたように、これが唯一のシングルコア上で動作すると、誰かが私は何かにいくつかの光を当てることができない限り、火花を使用して行われるべきではありませんので

OBR|1|METABOLIC PANEL|OBR_filename_1 
OBX|1|Glucose|OBR_filename_1 
OBX|2|BUN|OBR_filename_1 
OBX|3|CREATININE|OBR_filename_1 
OBR|2|RFLX TO VERIFICATION|OBR_filename_2 
OBX|1|EGFR|OBR_filename_2 
OBX|2|SODIUM|OBR_filename_2 
OBR|3|AMBIGUOUS DEFAULT|OBR_filename_3 
OBX|1|POTASSIUM|OBR_filename_3 
+0

とほのめかしたと順番にファイルを読み込むようなものは存在しないHDFSより簡単な方法です。たとえば、すべてのOBRが最初に読み込まれた後に、その後にすべてのOBRが読み込まれると、コードはどのように動作しますか?他のすべてのレコードで最後のファイル名を取得しますか?しかし、単一のファイルで単一のコアを使用してアプリケーションを実行している場合は、期待どおりの順序でファイルを読み込むことができますが、その時点でsparkを使用する理由は何ですか? –

+0

@ASpotySpot順番にそれを読み込み、最初にOBR_filename_idを作成し、次の値に達するまで同じOBR_filename_idをすべてのobxに入れて** OBR **を取得しているかどうかを確認します** OBR ** – animal

+0

ファイルhdfs上にあるので、多くの部分に分割されています。たとえば、順番に読むのはどういう意味ですか?もしそれが多くのパーツに分割されていない場合、どのようにそれを行うにしても、単一のコアを使用しない限り、ファイルの部分を並列処理するので、シーケンシャルに動作するようにスパークさせるのは難しいです(私が知る限り)。私は一緒に何かを置くことができるが、その時点で私は信じて火花を使用して無意味です。私のHDFSの –

答えて

1

[OK]を行方不明 私はあなたの例で説明されているようにファイルがhdfsのテキストファイルであると仮定しています。それはオフに一つのエラーまたはマイナータイプミスが、アイデアを受ける可能性があるので、私は上記のテストすることはできません私の現在のennvironementで

val text: RDD[(String, Long)] = sc.textFile(<path>).zipWithIndex 
val tupled: RDD[((String, Int, String), Int)] = text.map{case (r, i) => (r.split('|'), i)).map{case (s, i) => ((s(0), s(1).toInt, s(2)), i)} 
val obrToFirstIndex: Array[(Int, Long)] = tupled.filter(_._1._1 == "OBR").map{case (t, i) => (t._2, i)}.reduceByKey(Math.min).collect() 
val bcIndexes = sc.broadcast(obrToFirstIndex.sortBy(_._2)) 
val withObr = tupled.mapValues(i => bcIndexes.value.find(_._2 >= i).getOrElse(bcIndexes.value.last)._1) 
val result: RDD[String] = withObr.map{case ((t1, t2, t2), obrind) => Array(t1, t2, t3, s"OBR_filaneme_$obrind").mkString("|") 

があります。しかし、私は繰り返し言います、これは火花の仕事ではありません。

編集:mapPartitionsを使用できる部分が1つしかないので、ちょっと出てきました。そのパーティション内のJava/Scalaのコードを書くだけです。

遭遇した問題は、検索が間違っていることです。別の条件が必要です。ここで私は、このような火花として分散システムでの以前のmapPartitions

val text: RDD[String] = sc.textFile(<path>) 
val result: RDD[String] = text.mapPartitions{part => 
    var obrInd = 0 
    part.map{r => 
     val code= r.split('|')(0) 
     if(code == "OBR") obrInd += 1 
     r + "|OBR_filename_" + obrInd 
    } 
} 
+0

'reduceByKey(Math.min)'が使われている理由を教えてください。 – animal

+0

パーティション全体を使用しないスパーク操作では、行には他の行の概念はありません。 eg)ma​​pは、他の行の内容に基づいて出力を変更することはできません。問題を解決するために何らかの方法で行を組み合わせる必要があります。私がしているのは、OBR IDに基づいてすべての行を結合することです。私はその後、発生する最初の時間を得るために最小インデックス(ここではインデックスは行番号です)を取る。例えば、あなたのサンプルでは、​​1 - > 0,2 - > 4,3 - > 7となります。その後、どのインデックスをどのOBR IDに送るべきかを決定するために使用します。OBR_filaneme_1 OBX | | 1 |グルコース| 1 | |代謝PANEL OBR行ごとに一意のID –

+0

私はあなたの方法を試してみましたが、私はこの結果に 'OBRを取得していますがある場合は、それが必要とされないことが今の私に発生しOBR_filaneme_2 OBX | 2 | BUN | OBR_filaneme_2 OBX | 3 |クレアチニン| OBR_filaneme_2 OBR | 2 |検証のRFLX | OBR_filaneme_2 OBX | 1 | EGFR | OBR_filaneme_3 OBX | 2 |ナトリウム| OBR_filaneme_3 OBR | 3 |曖昧DEFAULT | OBR_filaneme_3 OBX | 1 | POTASSIUM | OBR_filaneme_3' – animal

関連する問題