次のように私はスパークにいくつかのコードを書いた: "1461762084"、 "XFF":ここスパークwithColumn性能
val df = sqlContext.read.json("s3n://blah/blah.gz").repartition(200)
val newdf = df.select("KUID", "XFF", "TS","UA").groupBy("KUID", "XFF","UA").agg(max(df("TS")) as "TS").filter(!(df("UA")===""))
val dfUdf = udf((z: String) => {
val parser: UserAgentStringParser = UADetectorServiceFactory.getResourceModuleParser();
val readableua = parser.parse(z)
Array(readableua.getName,readableua.getOperatingSystem.getName,readableua.getDeviceCategory.getName)
})
val df1 = newdf.withColumn("useragent", dfUdf(col("UA"))) ---PROBLEM LINE 1
val df2= df1.map {
case org.apache.spark.sql.Row(col1:String,col2:String,col3:String,col4:String, col5: scala.collection.mutable.WrappedArray[String]) => (col1,col2,col3,col4, col5(0), col5(1), col5(2))
}.toDF("KUID", "XFF","UA","TS","browser", "os", "device")
val dataset =df2.dropDuplicates(Seq("KUID")).drop("UA")
val mobile = dataset.filter(dataset("device")=== "Smartphone" || dataset("device") === "Tablet").
mobile.write.format("com.databricks.spark.csv").save("s3n://blah/blah.csv")
は、入力データ { "TS" のサンプルである "85.255.235.31"、 "IP": "10.75.137.217"、 "KUID": "JilBNVgx"、 "UA": "Flixster/1066 CFNetwork/758.3.15 Darwin/15.4.0"}
上記のコードスニペットでは、 2.4GBサイズのgzファイルを読み込んでいます。読み取りは9分です。私はIDでグループ分けし、タイムスタンプを最大にします。(問題行1の)列を追加する行は2時間かかります。この行はユーザーエージェントを取り、OSを取得しようとします、デバイス、ブラウザ情報ここではこれを行うのは間違った方法ですか?
私は次のように構成してr3.4xlargeと4ノードAWSクラスタ(8つのコアおよび122Gbメモリ)でこれを実行しています
--executor-memory 30G --num-executors 9 --executor-cores 5
UDF内のすべての行に対して新しいパーサーを作成しています: 'val parser:UserAgentStringParser = UADetectorServiceFactory.getResourceModuleParser();'。おそらくそれを構築するのは高価なので、UDFの外に構築してクロージャーとして使用する必要があります。 –
@RobertoCongiu答えがコメントではありません:) – zero323
@Roberto Congiu素早い返信をありがとう。私はTaskNot直列化可能なエラーを取得し、これを行う必要がありました。それを克服するための提案はありますか?ありがとうございました –