以下は、2つのデータフレームを比較し、交差関数を適用するために書いたコードです。データフレームを比較するpyspark intersection()関数
import os
from pyspark import SparkContext
sc = SparkContext("local", "Simple App")
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
df = sqlContext.read.format("jdbc").option("url","jdbc:sqlserver://xxx:xxx").option("databaseName","xxx").option("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver").option("dbtable","xxx").option("user","xxxx").option("password","xxxx").load()
df.registerTempTable("test")
df1= sqlContext.sql("select * from test where amitesh<= 300")
df2= sqlContext.sql("select * from test where amitesh <= 400")
df3= df1.intersection(df2)
df3.show()
私はエラーの下に取得しています:私の理解が正しければ、
AttributeError: 'DataFrame' object has no attribute 'intersection'
交差点()はPythonの集合関数から導出作り付けのサブ機能です。したがって、私はそれをpysparkの中で使用しようとしているのですが、私のコードの中に特別なモジュールをインポートする必要がありますか、それともpysparkのために組み込まれたものとして動作するのでしょうか?
2)このintersection()関数を使用するには、最初にdfをrddに変換する必要がありますか?
私が間違っているところで私を修正してください。誰かが私に実例を教えてもらえますか?
私の動機は、SQL Serverから共通レコードを取得し、HIVEに移動することです。今のところ、私は最初に交差点関数の仕事を得ようとしており、次に交差点()が働いているかどうかを気にすることができるというHIVEの要求から始めます。