によってタプルやスパークでのネストした列およびフィルタによるまたはグループを入れ子にしていると最善の方法は、私は、ネストされた列でグループ化するために問題を抱えているネストされた列
私のアプリケーションのScalaのバージョンは2.11.7ですし、これは私がありますSBT依存
libraryDependencies ++= {
val akkaVersion = "2.4.10"
val sparkVersion = "2.1.1"
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe" % "config" % "1.3.0" ,
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion
)
}
これは、(1行)
124567893|254887452|52448796|2017-02-22 00:00:02|1|4|0014551233548|N|0|0|2||2|44|4||1|1|||2|-1||1|USD|||1457784114521||7|[1~26.927900~0.390200][4~0.000000~0.000000][8~0.000000~0.000000][9~0.000000~0.000000][11~0.000000~0.000000][12~0.000000~0.000000][13~0.000000~0.000000][71~0.000000~0.000000][91~0.000000~0.000000][111~0.000000~0.000000][131~0.000000~0.000000][251~0.000000~0.000000][311~0.000000~0.000000][331~0.000000~0.000000][451~0.000000~0.000000][3~0.000000~0.000000]|[323~4517.702200~0.390200][384~5310.000000~0.000000][443~4296.000000~0.000000][463~0.000000~0.000000][1024~10.535400~0.390200][1343~57.980000~0.000000][783~0.000000~0.000000][303~0.000000~0.000000][403~10.535400~0.390200][523~13790.000000~0.000000][1143~0.000000~0.000000][763~0.000000~0.000000]|
これは私のマッパー
0123私のサンプルデータであります私は
private def mappingSparkLoadedSMSData(sparkRdd:Dataset[String]): Dataset[SMSMap] = {
import SparkFactory.spark.implicits._
sparkRdd
.map(_.split("\\|",-1))
.filter(_.length==33) //adding last empty string
.map(
data =>
SMSMap(
{if(data(0).nonEmpty) data(0).toLong else 0 },
{if(data(1).nonEmpty) data(1).toLong else 0 },
{if(data(2).nonEmpty) data(2).toLong else 0 },
data(3),
{if(data(4).nonEmpty) data(4).toInt else 0 },
{if(data(5).nonEmpty) data(5).toInt else 0 },
data(6),
{if(data(10).nonEmpty) data(10).toInt else 0 },
data(11),
{if(data(12).nonEmpty) data(12).toInt else 0 },
{if(data(13).nonEmpty) data(13).toInt else 0 },
{if(data(14).nonEmpty) data(14).toInt else 0 },
{if(data(15).nonEmpty) data(15).toLong else 0 },
data(16),
{if(data(17).nonEmpty) data(17).toInt else 0 },
data(18),
data(19),
{if(data(20).nonEmpty) data(20).toInt else 0 },
{if(data(23).nonEmpty) data(23).toInt else 0 },
data(27),
{if(data(28).nonEmpty) data(28).toInt else 0 },
{if(data(29).nonEmpty) data(29).toInt else 0 },
data(30)
.drop(1)
.dropRight(1)
.split("\\]\\[")
.map(_.split('~'))
.filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0)
.map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
.toList,
data(31)
.drop(1)
.dropRight(1)
.split("\\]\\[")
.map(_.split('~'))
.filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0)
.map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
.toList
)
)
}
をフィルタリングし、マッピングするためのコードを書かれている。そして、私は一時ビューを作成し、あなたが y_infoを見たとき、この
formattedRDD.createOrReplaceTempView("temp_table") //formattedRDD is a val that stored after Mapping
spark.sql(
s" select balance from temp_table group by balance"
).collectAsList()
のように照会しようとしています: 31
第1列はbal_id(Int)であり、第2列はchange_balance(Double)であり、第3列は累積(Double)であり、それはmo 1は
を設定し、より再今私はbal_idによってグループに望んでいたとchange_balanceの合計を取得するが、私は(それぞれが値であるため、当然のことを行うことはできません)
私はと考えていたことを行うことができませんでした別のデータセット/テーブルとマッピングおよびグループ化に残高を残す(残高:リスト[(Int、Double、Double)]、// 31)を分離するが、auto_increment_idまたはデータセット/テーブルの両方に一意の行識別子を追加する必要があるマッピング目的のために(idは重複することができます)
私はこれと本当に混同しています。いずれかが私を助けてください。
これはあなたの必要なソリューションであると思いますアグリゲータ
を呼んで返信、 につきましては、 'balance(0)._1、//これはbal_idです balance(0)._2、//これはchange_balance balance(0)です。_3、//これはアキュムレータです ' 最初のタプルだけを選択しています。複数のタプルを持つリスト – Muhunthan