2016-10-15 2 views
2

"01"〜 "15"という名前のいくつかの異なるフォルダと、 "00-00.txt"という名前の各フォルダインクルードファイルからなるデータセットがあります。 "23-59.txt"(各フォルダは1日を示す)。テキストファイルの最初の行の一部をRDDのキーとして使用

ファイルには以下のような行があります。 (!AIVDM始まる各エントリは最初のものを除いて、ラインである、それは数字で始まる)

1443650400.010568 !AIVDM,1,1,,B,[email protected]>h8Jr6?vN2><,0*4B 
!AIVDM,1,1,,A,4022051uvOFD>RG7kDCm1iW0088i,0*23 
!AIVDM,1,1,,A,[email protected]@PHRwPM<[email protected]`OvN2><,0*4C 
!AIVDM,1,1,,A,13n1mSgP00Pgq3TQpibh0?vL2><,0*74 
!AIVDM,1,1,,B,177nPmw002:<Tn<gk1toGL60><,0*2B 
!AIVDM,1,1,,B,139eu9gP00PugK:N2BOP0?vL2><,0*77 
!AIVDM,1,1,,A,13bg8N0P000E2<BN15IKUOvN2><,0*34 
!AIVDM,1,1,,B,14bL20003ReKodINRret28P0><,0*16 
!AIVDM,1,1,,B,15SkVl001EPhf?VQ5SUTaCnH0><,0*00 
!AIVDM,1,1,,A,14eG;ihP00G=4CvL=7qJmOvN0><,0*25 
!AIVDM,1,1,,A,[email protected]<cKrL=6nJ9QfN2><,0*30 

Iは、キーと値のペアのRDD、キーや線である長い値1443650400.010568が欲しいです!AIVDM...から値が始まります。どうすればこれを達成できますか?

+0

!AIVDMで始まる残りのレコードはどうしますか? –

+0

私はその番号がすべてのalの鍵になりたいので、rddは次のようなものになります:(1443650400.010568、!AIVDM、1,1、B、15NOHL0P00J @ uq6> h8Jr6?vN2><、0 * 4B) ( '1443650400.010568、 !AIVDM、1,1、A、4022051uvOFD> RG7kDCm1iW0088i、0 * 23) –

答えて

1

各ファイルを仮定すると(2GBを超えていない)、あなたは、単一のレコードに各ファイルを読み込むSparkContext.wholeTextFilesを使用することができ、単一RDDレコードに含まれるのに十分に小さい、その後、flatMapこれらのレコード:

// assuming data/ folder contains folders 00, 01, ..., 15 
val result: RDD[(String, String)] = sc.wholeTextFiles("data/*").values.flatMap(file => { 
    val lines = file.split("\n") 
    val id = lines.head.split(" ").head 
    lines.tail.map((id, _)) 
}) 

また、その仮定が正しくない場合(個々のファイルが大容量、つまり数百MB以上の場合)、すべてのデータを1つのRDDにロードし、インデックスをデータに追加し、 インデックスごとに「キー」のマップを収集し、これらのインデックスを使用して各データ行に適切なキーを見つけます。

// read files and zip with index to later match each data line to its key 
val raw: RDD[(String, Long)] = sc.textFile("data/*").zipWithIndex().cache() 

// separate data from ID rows 
val dataRows: RDD[(String, Long)] = raw.filter(_._1.startsWith("!AIVDM")) 
val idRows: RDD[(String, Long)] = raw.filter(!_._1.startsWith("!AIVDM")) 

// collect a map if Index -> ID 
val idForIndex = idRows.map { case (row, index) => (index, row.split(" ").head) }.collectAsMap() 

// optimization: if idForIndex is very large - consider broadcasting it or not collecting it and using a join 

// map each row to its key by looking up the MAXIMUM index which is < then row index 
// in other words - find the LAST id record BEFORE the row 
val result = dataRows.map { case (row, index) => 
    val key = idForIndex.filterKeys(_ < index).maxBy(_._1)._2 
    (key, row) 
} 
+0

ありがとう、これは動作します! –

+0

それは助けてくれてうれしいです - 他の読者にそれを知ってもらうために答えを受け入れてください/ upvote :) –

関連する問題