2016-09-13 5 views
2

次のように私はスパークにいくつかのコードを書いた: "1461762084"、 "XFF":ここスパークwithColumn性能

val df = sqlContext.read.json("s3n://blah/blah.gz").repartition(200) 

val newdf = df.select("KUID", "XFF", "TS","UA").groupBy("KUID", "XFF","UA").agg(max(df("TS")) as "TS").filter(!(df("UA")==="")) 

val dfUdf = udf((z: String) => { 
val parser: UserAgentStringParser = UADetectorServiceFactory.getResourceModuleParser();   
val readableua = parser.parse(z) 
Array(readableua.getName,readableua.getOperatingSystem.getName,readableua.getDeviceCategory.getName) 
}) 

val df1 = newdf.withColumn("useragent", dfUdf(col("UA"))) ---PROBLEM LINE 1 

val df2= df1.map { 
case org.apache.spark.sql.Row(col1:String,col2:String,col3:String,col4:String, col5: scala.collection.mutable.WrappedArray[String]) => (col1,col2,col3,col4, col5(0), col5(1), col5(2)) 
}.toDF("KUID", "XFF","UA","TS","browser", "os", "device") 

val dataset =df2.dropDuplicates(Seq("KUID")).drop("UA") 
val mobile = dataset.filter(dataset("device")=== "Smartphone" || dataset("device") === "Tablet"). 
mobile.write.format("com.databricks.spark.csv").save("s3n://blah/blah.csv") 

は、入力データ { "TS" のサンプルである "85.255.235.31"、 "IP": "10.75.137.217"、 "KUID": "JilBNVgx"、 "UA": "Flixster/1066 CFNetwork/758.3.15 Darwin/15.4.0"}

上記のコードスニペットでは、 2.4GBサイズのgzファイルを読み込んでいます。読み取りは9分です。私はIDでグループ分けし、タイムスタンプを最大にします。(問題行1の)列を追加する行は2時間かかります。この行はユーザーエージェントを取り、OSを取得しようとします、デバイス、ブラウザ情報ここではこれを行うのは間違った方法ですか?

私は次のように構成してr3.4xlargeと4ノードAWSクラスタ(8つのコアおよび122Gbメモリ)でこれを実行しています

--executor-memory 30G --num-executors 9 --executor-cores 5 
+4

UDF内のすべての行に対して新しいパーサーを作成しています: 'val parser:UserAgentStringParser = UADetectorServiceFactory.getResourceModuleParser();'。おそらくそれを構築するのは高価なので、UDFの外に構築してクロージャーとして使用する必要があります。 –

+1

@RobertoCongiu答えがコメントではありません:) – zero323

+0

@Roberto Congiu素早い返信をありがとう。私はTaskNot直列化可能なエラーを取得し、これを行う必要がありました。それを克服するための提案はありますか?ありがとうございました –

答えて

0

ここでの問題は、gzipが分割されていない、として読み取ることができないということです平行。バックグラウンドでは、単一のプロセスがバケットからファイルをダウンロードし、クラスタを再分割してクラスタ全体にデータを分散させることがあります。入力データをa splittable formatに再エンコードしてください。入力ファイルが大きく変更されない場合は、例えば、bzip2と考えることができます(エンコードがかなり高価で時間がかかることがあるため)。

0

更新:ロベルトから答えを拾い、あなたがUDF内のすべての行に対する新しいパーサを作成しているすべての

の利益のためにそれをここに貼り付ける:ヴァルパーサ:UserAgentStringParser = UADetectorServiceFactory.getResourceModuleParser(); 。 UDFの外側にあるものを作成し、それをクロージャーとして使用すると便利です。