0

私はSparkのデータフレームに複数のhtmlファイルを読み込んでいます。 私は完璧に動作Apache Sparkで範囲単位の反復変数を追加する

val dataset = spark 
    .sparkContext 
    .wholeTextFiles(inputPath) 
    .toDF("filepath", "filecontent") 
    .withColumn("biz_name", parseDocValue(".biz-page-title")('filecontent)) 
    .withColumn("biz_website", parseDocValue(".biz-website a")('filecontent)) 

    ... 

    def parseDocValue(cssSelectorQuery: String) = 
    udf((html: String) => Jsoup.parse(html).select(cssSelectorQuery).text()) 

UDFカスタムを使用してデータフレームの列にHTMLの要素を変換しています、しかし各withColumn呼び出しは冗長であるHTML文字列の解析になります。

行ごとに "filecontent"カラムに基づいて1つの解析済みドキュメント(Jsoup.parse(html))を生成し、それをデータフレーム内のすべてのwithColumnコールで使用できる方法がありますか(ルックアップテーブルなどは使用しないでください)

それとも私もデータフレームを使用して試してみて、ちょうどRDDのを使うべきではないのですか?

+0

サンプルテキスト文字列で更新できますか? –

+0

本質的には 'wholeTextFiles'の非並列化に問題があります(たとえば、64コアクラスタの2人のエグゼキュータ、さらに分割する前に)ので、おそらく全体を書き直すことになります。私はその問題に取り組んだとき、提案を更新して見ていきます。ご不便をおかけして申し訳ございません。 –

+0

解決しましたか? –

答えて

0

ので、最終的な答えは実際には非常に簡単だった:

行をマップしてそこにオブジェクトを作成するだけです

def docValue(cssSelectorQuery: String, attr: Option[String] = None)(implicit document: Document): Option[String] = { 
    val domObject = document.select(cssSelectorQuery) 

    val domValue = attr match { 
     case Some(a) => domObject.attr(a) 
     case None => domObject.text() 
    } 

    domValue match { 
     case x if x == null || x.isEmpty => None 
     case y => Some(y) 
    } 
    } 

val dataset = spark 
     .sparkContext 
     .wholeTextFiles(inputPath, minPartitions = 265) 
     .map { 
     case (filepath, filecontent) => { 
      implicit val document = Jsoup.parse(filecontent) 

      val customDataJson = docJson(filecontent, customJsonRegex) 


      DataEntry(
      biz_name = docValue(".biz-page-title"), 
      biz_website = docValue(".biz-website a"), 
      url = docValue("meta[property=og:url]", attr = Some("content")), 
      ... 
      filename = Some(fileName(filepath)), 
      fileTimestamp = Some(fileTimestamp(filepath)) 
     ) 
     } 
     } 
     .toDS() 
0

私はおそらく次のように一度に解析および選択を行うには、それを書き直し、一時列に入れたい:

val dataset = spark 
    .sparkContext 
    .wholeTextFiles(inputPath) 
    .withColumn("temp", parseDocValue(Array(".biz-page-title", ".biz-website a"))('filecontent)) 
    .withColumn("biz_name", col("temp")(0)) 
    .withColumn("biz_website", col("temp")(1)) 
    .drop("temp") 

def parseDocValue(cssSelectorQueries: Array[String]) = 
udf((html: String) => { 
    val j = Jsoup.parse(html) 
    cssSelectorQueries.map(query => j.select(query).text())}) 
関連する問題