2017-07-05 3 views
0

私はこの質問には1つの私の以前の質問の拡張であり、2.1.1とScalaの2.11.8avroファイル内のある列と他の列をどのようにマッピングするのですか?

スパークを使用しています:

How to identify null fields in a csv file?

を変更ではなくからデータを読み取ることですCSVファイル、私は今avroファイルからデータを読み込んでいます。私は別のクラスでアブロファイルを解析してい

var ttime: Long = 0; 
var eTime: Long = 0; 
var tids: String = ""; 
var tlevel: Integer = 0; 
var tboot: Long = 0; 
var rNo: Integer = 0; 
var varType: String = ""; 
var uids: List[TRUEntry] = Nil; 

:これは私がからデータを読んでいるアブロファイルの形式です。

私はこの形式のcsvファイルではなくavroファイルを除いて、上に掲載されたリンクの受け入れられた答えと同じ方法で、tids列をすべてのUIDとマッピングする必要があります。これどうやってするの?

これは私がそれをやろうとしているコードです:obj.tids後

val avroRow = spark.read.avro(inputString).rdd 
    val avroParsed = avroRow 
    .map(x => new TRParser(x)) 
    .map((obj: TRParser) => ((obj.tids, obj.uId),1)) 
    .reduceByKey(_+_) 
    .saveAsTextFile(outputString) 

、すべてのuid列が受け入れ答えで述べたように、最終的な出力と同じを与えるために、個別にマップする必要があり上記のリンクの

私はアブロファイルの解析クラス内のすべてのuidを解析していますどのようにこれは、次のとおりです。

this.uids = Nil 
    row.getAs[Seq[Row]]("uids") 
    .foreach((objRow: Row) => 
     this.uids ::= (new TRUEntry(objRow)) 
    ) 

this.uids  
.foreach((obj:TRUEntry) => { 
    uInfo += obj.uId + " , " + obj.initM.toString() + " , " 
}) 

PS:質問はダムと思われる場合、私は謝罪が、これはアブロファイルと私の最初の出会いである

答えて

0

:これは、メインコードで

this.uids 

ループ処理のためにそれを通過させることによって行うことができます

val avroParsed = avroRow 
    .map(x => new TRParser(x)) 
    .map((obj: TRParser) => { 
     val tId = obj.source.trim 
     var retVal: String = "" 
     obj.uids 
     .foreach((obj: TRUEntry) => { 
      retVal += tId + "," + obj.uId.trim + ":" 
     }) 
     retVal.dropRight(1) 
    }) 

val flattened = avroParsed 
.flatMap(x => x.split(":")) 
.map(y => ((y),1)) 
関連する問題