2017-08-02 6 views
0

IPアドレスとそれに対応する長い値(ip_int)を持つ1つのデータフレーム(df)を持っています。これで、ジオロケーション情報を含む別のデータフレーム(ip2Country)国の名前。 Scalaでどうすればいいですか?私のコードは現在動作しませんでした:メモリ制限を超えています。スパーク:効率的な別のデータフレームの検索方法

val ip_ints=df.select("ip_int").distinct.collect().flatMap(_.toSeq) 
    val df_list = ListBuffer[DataFrame]() 
    for(v <- ip_ints){ 
    var ip_int=v.toString.toLong 
    df_list +=ip2Country.filter(($"network_start_integer"<=ip_int)&&($"network_last_integer">=ip_int)).select("country_name").withColumn("ip_int", lit(ip_int)) 
    } 
    var df1 = df_list.reduce(_ union _) 
    df=df.join(df1,Seq("ip_int"),"left") 

は基本的に私はすべてのip_int値を反復処理し、ip2Countryでそれらを検索し、DFに戻ってそれらをマージしてみてください。

ご協力いただきありがとうございます。

答えて

1

シンプルjoinあなたはヌルCOUNTRY_NAMEを削除したい場合、あなたはあまりにも

df.join(df1, df1("network_start_integer")<=df("ip_int") && df1("network_last_integer")>=df("ip_int"), "left") 
    .select("ip", "ip_int", "country_name") 
    .filter($"country_name".isNotNull) 

をフィルタを追加することができます

df.join(df1, df1("network_start_integer")<=df("ip_int") && df1("network_last_integer")>=df("ip_int"), "left") 
    .select("ip", "ip_int", "country_name") 

のためのトリックを行う必要があり、私は答えは

+0

ありがとう!それは完全に動作します!私は結合がそのように使用できることを知らなかった – ELI

+0

結合は常にこの方法で働いた:)あなたの試みでSeq( "ip_int")は実際にdf( "ip_int")=== df1( "ip_int")です。 :) –

+0

あなたが本当にあなたを助けた場合、あなたもアップアップすることができます:) –

0

非等価結合は、クロス結合とフィルタリングによって実装することができますが、これはリソースが重いためです。あなたはスパーク2.1を使用していると仮定すると:

df.createOrReplaceTempView("ip_int") 
df.select("network_start_integer", "network_start_integer", "country_name").createOrReplaceTempView("ip_int_lookup") 
// val spark: SparkSession 
val result: DataFrame = spark.sql("select a.*, b.country_name from ip_int a, ip_int_lookup b where b.network_start_integer <= a.ip_int and b.network_last_integer >= a.ip_int) 

あなたはヌルip_intを含めたい場合は、右の結果までのDFに参加する必要があります。

+0

マイコードALRましメモリ制限がエラーを超えたので、これは?私はより速くそれを行うことができます – ELI

+0

さてあなたは、メモリ使用量とのすべての関係者の最初であり、これは動作し、あなたが持っているものよりも優れているはずです。メモリパラメータが不安定になり、メモリが増加する可能性があります。 – TaylerJones

0
役立つことを願います

私はここで困惑します。

DF1( "network_start_integer")< = dfを( "ip_int")& & DF1( "network_last_integer")> = DF( "ip_int")

は、我々が DF1使用できます( "network_start_integerを")= == DF(「ip_int」)ここ

してください?

+0

私の状態がnetwork_start_integer <= ip_int <= network_last_integerであるので – ELI

+0

私は、ありがとう – Robin