私はMaxmind snowplow libraryを使用して、データフレームにある各IPのジオデータを取り出そうとしています。Maxmind Geoデータを含むSpark UDF
我々はスパークSQL(スパークバージョン2.1.0)を使用していると、私は次のクラスでUDFを作成しました:
class UdfDefinitions @Inject() extends Serializable with StrictLogging {
sparkSession.sparkContext.addFile("s3n://s3-maxmind-db/latest/GeoIPCity.dat")
val s3Config = configuration.databases.dataWarehouse.s3
val lruCacheConst = 20000
val ipLookups = IpLookups(geoFile = Some(SparkFiles.get(s3Config.geoIPFileName)),
ispFile = None, orgFile = None, domainFile = None, memCache = false, lruCache = lruCacheConst)
def lookupIP(ip: String): LookupIPResult = {
val loc: Option[IpLocation] = ipLookups.getFile.performLookups(ip)._1
loc match {
case None => LookupIPResult("", "", "")
case Some(x) => LookupIPResult(Option(x.countryName).getOrElse(""),
x.city.getOrElse(""), x.regionName.getOrElse(""))
}
}
val lookupIPUDF: UserDefinedFunction = udf(lookupIP _)
}
意図がファイル(ipLookups)へのポインタを作成することであるUDF外とそれを内部で使用するので、各行でファイルを開くことはできません。これにより、シリアル化されていないタスクのエラーが発生し、UDFでaddFilesを使用すると、開いているファイルが多すぎます(大きなデータセットを使用する場合は小さなデータセットで動作します)。
このスレッドでは、RDDを使用して問題を解決する方法を示しますが、Spark SQLを使用したいと考えています。 using maxmind geoip in spark serialized
ありがとう