2017-05-05 2 views
0

によってタプルやスパークでのネストした列およびフィルタによるまたはグループを入れ子にしていると最善の方法は、私は、ネストされた列でグループ化するために問題を抱えているネストされた列

私のアプリケーションのScalaのバージョンは2.11.7ですし、これは私がありますSBT依存

libraryDependencies ++= { 
    val akkaVersion = "2.4.10" 
    val sparkVersion = "2.1.1" 

    Seq(
    "com.typesafe.akka" %% "akka-actor"       % akkaVersion, 
    "com.typesafe"  % "config"        % "1.3.0" , 
    "org.apache.spark" %% "spark-core"       % sparkVersion, 
    "org.apache.spark" %% "spark-sql"       % sparkVersion, 
    "com.typesafe.akka" %% "akka-slf4j"       % akkaVersion, 
    "org.apache.spark" %% "spark-streaming"      % sparkVersion 
) 
} 

これは、(1行)

124567893|254887452|52448796|2017-02-22 00:00:02|1|4|0014551233548|N|0|0|2||2|44|4||1|1|||2|-1||1|USD|||1457784114521||7|[1~26.927900~0.390200][4~0.000000~0.000000][8~0.000000~0.000000][9~0.000000~0.000000][11~0.000000~0.000000][12~0.000000~0.000000][13~0.000000~0.000000][71~0.000000~0.000000][91~0.000000~0.000000][111~0.000000~0.000000][131~0.000000~0.000000][251~0.000000~0.000000][311~0.000000~0.000000][331~0.000000~0.000000][451~0.000000~0.000000][3~0.000000~0.000000]|[323~4517.702200~0.390200][384~5310.000000~0.000000][443~4296.000000~0.000000][463~0.000000~0.000000][1024~10.535400~0.390200][1343~57.980000~0.000000][783~0.000000~0.000000][303~0.000000~0.000000][403~10.535400~0.390200][523~13790.000000~0.000000][1143~0.000000~0.000000][763~0.000000~0.000000]| 

これは私のマッパー

0123私のサンプルデータであります

私は

private def mappingSparkLoadedSMSData(sparkRdd:Dataset[String]): Dataset[SMSMap] = { 

    import SparkFactory.spark.implicits._ 
    sparkRdd 
     .map(_.split("\\|",-1)) 
     .filter(_.length==33)  //adding last empty string 
     .map(
     data => 
     SMSMap(

      {if(data(0).nonEmpty) data(0).toLong else 0 }, 
      {if(data(1).nonEmpty) data(1).toLong else 0 }, 
      {if(data(2).nonEmpty) data(2).toLong else 0 }, 
      data(3), 
      {if(data(4).nonEmpty) data(4).toInt else 0 }, 
      {if(data(5).nonEmpty) data(5).toInt else 0 }, 
      data(6), 
      {if(data(10).nonEmpty) data(10).toInt else 0 }, 
      data(11), 
      {if(data(12).nonEmpty) data(12).toInt else 0 }, 
      {if(data(13).nonEmpty) data(13).toInt else 0 }, 
      {if(data(14).nonEmpty) data(14).toInt else 0 }, 
      {if(data(15).nonEmpty) data(15).toLong else 0 }, 
      data(16), 
      {if(data(17).nonEmpty) data(17).toInt else 0 }, 
      data(18), 
      data(19), 
      {if(data(20).nonEmpty) data(20).toInt else 0 }, 
      {if(data(23).nonEmpty) data(23).toInt else 0 }, 
      data(27), 
      {if(data(28).nonEmpty) data(28).toInt else 0 }, 
      {if(data(29).nonEmpty) data(29).toInt else 0 }, 

      data(30) 
      .drop(1) 
      .dropRight(1) 
      .split("\\]\\[") 
      .map(_.split('~')) 
      .filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0) 
      .map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble)) 
      .toList, 

      data(31) 
      .drop(1) 
      .dropRight(1) 
      .split("\\]\\[") 
      .map(_.split('~')) 
      .filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0) 
      .map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble)) 
      .toList 


     ) 
    ) 
    } 

をフィルタリングし、マッピングするためのコードを書かれている。そして、私は一時ビューを作成し、あなたが y_infoを見たとき、この

formattedRDD.createOrReplaceTempView("temp_table") //formattedRDD is a val that stored after Mapping 

spark.sql(
     s" select balance from temp_table group by balance" 
    ).collectAsList() 

のように照会しようとしています: 31

第1列はbal_id(Int)であり、第2列はchange_balance(Double)であり、第3列は累積(Double)であり、それはmo 1は

を設定し、より再今私はbal_idによってグループに望んでいたとchange_balanceの合計を取得するが、私は(それぞれが値であるため、当然のことを行うことはできません)

私はと考えていたことを行うことができませんでした別のデータセット/テーブルとマッピングおよびグループ化に残高を残す(残高:リスト[(Int、Double、Double)]、// 31)を分離するが、auto_increment_idまたはデータセット/テーブルの両方に一意の行識別子を追加する必要があるマッピング目的のために(idは重複することができます)

私はこれと本当に混同しています。いずれかが私を助けてください。

答えて

1

天秤の列を3つの異なる列に分けると、のgroupBysumchange_balanceの上に簡単になります。
これらの3つの列は、最初の段階で作成できます。
ここで私はあなたの質問からわかる内容に応じたソリューションです:あなたがしながら、別の列にこれらの3つの値を分離する必要が

case class SampleMap(
         id: Long, //1 
         a_id_1: Long, //2 
         b_id_2: Long, //3 
         date_time: String, //4 
         subscriber_type: Int, //5 
         x_type: Int, //6 
         sub_id_2: String, //7 
         account_type: Int, //11 
         master_sub_id: String, //12 
         application_id: Int, //13 
         sup_type_id: Int, //14 
         unit_type_id: Int, //15 
         usage_amount: Long, //16 
         type_of_charge: String, //17 
         identity_id: Int, //18 
         group_id: String, //19 
         charge_code: String, //20 
         content_type: Int, //21 
         fund_usage_type: Int, //24 
         msc_id: String, //28 
         circle_id: Int, //29 
         sp_id: Int, //30 
         balance: List[(Int, Double, Double)], //31 
         bal_id: Int,    //added by Ramesh 
         change_balance: Double, //added by Ramesh 
         accumulated: Double,  //added by Ramesh 
         z_info: List[(Int, Double, Double)] //33 
        ) 

:として

あなたはあなたのケースクラスの3列名を含める必要がありデータフレーム/データセットを作成します。あなたのコードの改良版を以下に示します。

val formattedRDD = sparkRdd.map(_.split("\\|",-1)) 
     .filter(_.length==33)  //adding last empty string 
     .map(data => { 
     val balance = Try(data(30) 
      .drop(1) 
      .dropRight(1) 
      .split("\\]\\[") 
      .map(_.split('~')) 
      .filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0) 
      .map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble)) 
      .toList) getOrElse List((0, 0.0, 0.0)) 

     SampleMap(
      Try(data(0).toLong) getOrElse 0, 
      Try(data(1).toLong) getOrElse 0, 
      Try(data(2).toLong) getOrElse 0, 
      Try(data(3).toString) getOrElse "", 
      Try(data(4).toInt) getOrElse 0, 
      Try(data(5).toInt) getOrElse 0, 
      Try(data(6).toString) getOrElse "", 
      0, 
      Try(data(11).toString) getOrElse "", 
      Try(data(12).toInt) getOrElse 0, 
      Try(data(13).toInt) getOrElse 0, 
      Try(data(14).toInt) getOrElse 0, 
      Try(data(15).toLong) getOrElse 0, 
      Try(data(16).toString) getOrElse "", 
      Try(data(17).toInt) getOrElse 0, 
      Try(data(18).toString) getOrElse "", 
      Try(data(19).toString) getOrElse "", 
      Try(data(20).toInt) getOrElse 0, 
      Try(data(23).toInt) getOrElse 0, 
      Try(data(27).toString) getOrElse "", 
      Try(data(28).toInt) getOrElse 0, 
      Try(data(29).toInt) getOrElse 0, 
      balance,    //this is the 30th value i.e. balance 
      balance(0)._1,   //this is bal_id 
      balance(0)._2,   //this is change_balance 
      balance(0)._3,   //this is accumulator 

      Try(data(31) 
      .drop(1) 
      .dropRight(1) 
      .split("\\]\\[") 
      .map(_.split('~')) 
      .filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0) 
      .map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble)) 
      .toList) getOrElse List.empty 
     ) 
     } 
    ) 
    .toDS() 

は、今あなたがする必要があるすべては私が遅くのすべて申し訳ありませんの最初の

+0

これはあなたの必要なソリューションであると思いますアグリゲータ

import org.apache.spark.sql.functions.sum formattedRDD.groupBy("bal_id").agg(sum("change_balance")).show 

を呼んで返信、 につきましては、 'balance(0)._1、//これはbal_idです balance(0)._2、//これはchange_balance balance(0)です。_3、//これはアキュムレータです ' 最初のタプルだけを選択しています。複数のタプルを持つリスト – Muhunthan

関連する問題