2015-11-11 7 views
5

JavaアプリケーションでApache Sparkを使用しています。 私は2つのDataFrameを持っています:df1df2です。 df1は、email,firstNameおよびlastNameを有するを含む。 df2はを含み、emailを含む。Apache Sparkで異なる構造を持つ2つのデータフレームに対してNOT INを実装する方法

df1のすべての行を含むDataFramedf3を作成します。このメールはdf2には存在しません。

Apache Sparkでこれを行う方法はありますか?私はそれらtoJavaRDD()をキャストし、すべての電子メールを含むにし、その使用subtractdf1をフィルタリングすることによりdf1df2からJavaRDD<String>を作成しようとしましたが、私はds1に新しいJavaRDDをマッピングする方法を知っているとDataFrameを得ることはありません。

基本的にはdf1のメールがdf2にないすべての行が必要です。

DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer "); 

DataFrame customersWhoOrderedTheProduct = sqlContext.cassandraSql("SELECT email FROM customer_bought_product " + 
          "WHERE product_id = '" + productId + "'"); 

JavaRDD<String> customersBoughtEmail = customersWhoOrderedTheProduct.toJavaRDD().map(row -> row.getString(0)); 

List<String> notBoughtEmails = customers.javaRDD() 
         .map(row -> row.getString(0)) 
         .subtract(customersBoughtEmail).collect(); 

答えて

4

スパーク2.0.0+

あなたは直接NOT INを使用することができます。これは、参加してフィルタ外側用いて発現させることができる

スパーク< 2.0.0

val customers = sc.parallelize(Seq(
    ("[email protected]", "John", "Doe"), 
    ("[email protected]", "Jane", "Doe") 
)).toDF("email", "first_name", "last_name") 

val customersWhoOrderedTheProduct = sc.parallelize(Seq(
    Tuple1("[email protected]") 
)).toDF("email") 

val customersWhoHaventOrderedTheProduct = customers.join(
    customersWhoOrderedTheProduct.select($"email".alias("email_")), 
    $"email" === $"email_", "leftouter") 
.where($"email_".isNull).drop("email_") 

customersWhoHaventOrderedTheProduct.show 

// +----------------+----------+---------+ 
// |   email|first_name|last_name| 
// +----------------+----------+---------+ 
// |[email protected]|  John|  Doe| 
// +----------------+----------+---------+ 

生SQLと同等:

customers.registerTempTable("customers") 
customersWhoOrderedTheProduct.registerTempTable(
    "customersWhoOrderedTheProduct") 

val query = """SELECT c.* FROM customers c LEFT OUTER JOIN 
       customersWhoOrderedTheProduct o 
       ON c.email = o.email 
       WHERE o.email IS NULL""" 

sqlContext.sql(query).show 

// +----------------+----------+---------+ 
// |   email|first_name|last_name| 
// +----------------+----------+---------+ 
// |[email protected]|  John|  Doe| 
// +----------------+----------+---------+ 
+2

ありがとうございます。最初の例は私のために働いた。これは、Javaのバージョン 'DataFrame customersWhoHaventOrderedTheProduct = customers .join(customersWhoOrderedTheProduct.col(" email "))、customers.col(" email ")。equalTo(customersWhoOrderedTheProduct.col(" email ")) 私はSQLと同等の機能を試しましたが、この問題が発生しました。「scala.MatchError:UUIDType」というエラーメッセージが表示されました。 (クラスorg.apache.spark.sql.cassandra.types.UUIDType $) ' –

+0

私は助けることができてうれしいです。 – zero323

+0

私は 'Cassandra'を使用しています。私は主キーとして' UUID'を持っています。多分、Scalaはその型にマッチすることができません。 –

2

私はキーではない文字列として整数を使用することができ示唆している以外にも、私は、pythonでそれをやりました。

from pyspark.sql.types import * 

samples = sc.parallelize([ 
    ("[email protected]", "Alberto", "Bonsanto"), ("[email protected]", "Miguel", "Bonsanto"), 
    ("[email protected]", "Stranger", "Weirdo"), ("[email protected]", "Dakota", "Bonsanto") 
]) 

keys = sc.parallelize(
    [("[email protected]",), ("[email protected]",), ("[email protected]",)] 
) 

complex_schema = StructType([ 
    StructField("email", StringType(), True), 
    StructField("first_name", StringType(), True), 
    StructField("last_name", StringType(), True) 
]) 

simple_schema = StructType([ 
    StructField("email", StringType(), True) 
]) 

df1 = sqlContext.createDataFrame(samples, complex_schema) 
df2 = sqlContext.createDataFrame(keys, simple_schema) 

df1.show() 
df2.show() 

df3 = df1.join(df2, df1.email == df2.email, "left_outer").where(df2.email.isNull()).show() 
+0

ありがとうございます。私は 'Cassandra'を使用していますので、多くのプライマリキーに' UUID'が含まれています。 –

関連する問題