0
AWS上で動作するテストスパーク環境(シングルノード)があります。私はPySparkシェルで暫定的なクエリをほとんど実行しませんでしたが、すべてが期待どおりに実行されました。spark-submitを使用してアプリケーションを実行しているときにエラーが発生します。以下は 糸でSparkアプリケーションを実行しているときの問題
はコードです:from __future__ import print_function
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql import SQLContext as sql
conf = SparkConf().setAppName("myapp")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
if __name__ == "__main__":
#inp_data = loaded data from db
df = inp_data.select('Id','DueDate','Principal','delay','unpaid_emi','future_payment')
filterd_unpaid_emi = df.filter(df.unpaid_emi == 1)
par = filterd_unpaid_emi.groupBy('Id').sum('Principal').withColumnRenamed('sum(Principal)' , 'par')
temp_df = df.filter(df.unpaid_emi == 1)
temp_df_1 = temp_df.filter(temp_df.future_payment == 0)
temp_df_1.registerTempTable("mytable")
bucket_df_1 = sql("""select *, case
when delay<0 and delay ==0 then '9999'
when delay>0 and delay<7 then '9'
when delay>=7 and delay<=14 then '8'
when delay>=15 and delay<=29 then '7'
when delay>=30 and delay<=59 then '6'
when delay>=60 and delay<=89 then '5'
when delay>=90 and delay<=119 then '4'
when delay>=120 and delay<=149 then '3'
when delay>=150 and delay<=179 then '2'
else '1'
end as bucket
from mytable""")
bucket_df_1 = bucket_df_1.select(bucket_df_1.Id,bucket_df_1.Principal,bucket_df_1.delay,bucket_df_1.unpaid_emi,bucket_df_1.future_payment,bucket_df_1.bucket.cast("int").alias('buckets'))
min_bucket = bucket_df_1.groupBy('Id').min('buckets').withColumnRenamed('min(buckets)' , 'max_delay')
joinedDf = par.join(min_bucket, ["Id"])
#joinedDf.printSchema()
、以下の申請書を提出するためのコマンドです:
spark-submit \
--master yarn \
--driver-class-path /path to/mysql-connector-java-5.0.8-bin.jar \
--jars /path to/mysql-connector-java-5.0.8-bin.jar \
/path to/mycode.py
ERROR:へ
17/11/10 10:00:34 INFO SparkSqlParser: Parsing command: mytable
Traceback (most recent call last):
File "/path to/mycode.py", line 36, in <module>
from mytable""")
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/context.py", line 73, in __init__
AttributeError: 'str' object has no attribute '_jsc'
17/11/10 10:00:34 INFO SparkContext: Invoking stop() from shutdown hook
17/11/10 10:00:34 INFO SparkUI: Stopped Spark web UI at ........
私ははかなり新しいです誰かが私がやっている間違いを教えてもらえますか? また、コーディングスタイルの改善に関するご意見をお待ちしております。
スパークバージョン:2.2
素晴らしい!それが助けになりました。 scを使用してSparkSessionを作成するのは間違った/間違ったアプローチですか? sc = SparkContext(conf = conf) spark = SparkSession(sc) – Sumit
@Sumit嬉しいです! scを使って 'SparkSession'を作成するのは、2.0より前のバージョンのSparkでこれを行う方法でした。 Spark 2.2.0を使用しているので、SparkContextを暗黙的に使用することなくセッションを作成することができます。これにより、後で使用するのが容易になります。 [src](https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html) – mkaran