2017-12-11 12 views
0

私はMaxmind snowplow libraryを使用して、データフレームにある各IPのジオデータを取り出そうとしています。Maxmind Geoデータを含むSpark UDF

我々はスパークSQL(スパークバージョン2.1.0)を使用していると、私は次のクラスでUDFを作成しました:

class UdfDefinitions @Inject() extends Serializable with StrictLogging { 

sparkSession.sparkContext.addFile("s3n://s3-maxmind-db/latest/GeoIPCity.dat") 
val s3Config = configuration.databases.dataWarehouse.s3 
val lruCacheConst = 20000 
val ipLookups = IpLookups(geoFile = Some(SparkFiles.get(s3Config.geoIPFileName)), 
    ispFile = None, orgFile = None, domainFile = None, memCache = false, lruCache = lruCacheConst) 

def lookupIP(ip: String): LookupIPResult = { 
    val loc: Option[IpLocation] = ipLookups.getFile.performLookups(ip)._1 
    loc match { 
    case None => LookupIPResult("", "", "") 
    case Some(x) => LookupIPResult(Option(x.countryName).getOrElse(""), 
    x.city.getOrElse(""), x.regionName.getOrElse("")) 
    } 
} 

val lookupIPUDF: UserDefinedFunction = udf(lookupIP _) 

} 

意図がファイル(ipLookups)へのポインタを作成することであるUDF外とそれを内部で使用するので、各行でファイルを開くことはできません。これにより、シリアル化されていないタスクのエラーが発生し、UDFでaddFilesを使用すると、開いているファイルが多すぎます(大きなデータセットを使用する場合は小さなデータセットで動作します)。

このスレッドでは、RDDを使用して問題を解決する方法を示しますが、Spark SQLを使用したいと考えています。 using maxmind geoip in spark serialized

ありがとう

答えて

0

ここでの問題は、IpLookupsがSerializableではないことです。それでも、静的ファイル(frmoから収集したもの)からルックアップを作成するので、それを修正できるはずです。レポをクローンしてIpLookupsをシリアライズ可能にすることをお勧めします。その後、spark SQLで動作させるために、あなたがしたようにクラス内のすべてをラップします。あなたは、ドライバですべてを行うことができます:

val IPResolver = new MySerializableIpResolver() 
val resolveIP = udf((ip : String) => IPResolver.resolve(ip)) 
data.withColumn("Result", resolveIP($"IP")) 

をあなたは多くの異なるIPアドレスが、別の解決策があることを持っていない場合は、次のようにメインスパーク仕事で、あなたは何かを書くことができます。

val ipMap = data.select("IP").distinct.collect 
    .map(/* calls to the non serializable IpLookups but that's ok, we are in the driver*/) 
    .toMap 
val resolveIP = udf((ip : String) => ipMap(ip)) 
data.withColumn("Result", resolveIP($"IP")) 
関連する問題