2017-11-10 8 views
0

AWS上で動作するテストスパーク環境(シングルノード)があります。私はPySparkシェルで暫定的なクエリをほとんど実行しませんでしたが、すべてが期待どおりに実行されました。spark-submitを使用してアプリケーションを実行しているときにエラーが発生します。以下は 糸でSparkアプリケーションを実行しているときの問題

はコードです:

from __future__ import print_function 
from pyspark import SparkContext, SparkConf 
from pyspark.sql.session import SparkSession 
from pyspark.sql import SQLContext as sql 

conf = SparkConf().setAppName("myapp") 
sc = SparkContext(conf=conf) 
spark = SparkSession(sc) 
if __name__ == "__main__": 
    #inp_data = loaded data from db 
    df = inp_data.select('Id','DueDate','Principal','delay','unpaid_emi','future_payment') 
    filterd_unpaid_emi = df.filter(df.unpaid_emi == 1) 
    par = filterd_unpaid_emi.groupBy('Id').sum('Principal').withColumnRenamed('sum(Principal)' , 'par') 
    temp_df = df.filter(df.unpaid_emi == 1) 
    temp_df_1 = temp_df.filter(temp_df.future_payment == 0) 
    temp_df_1.registerTempTable("mytable") 
    bucket_df_1 = sql("""select *, case 
    when delay<0 and delay ==0 then '9999' 
    when delay>0 and delay<7 then '9' 
    when delay>=7 and delay<=14 then '8' 
    when delay>=15 and delay<=29 then '7' 
    when delay>=30 and delay<=59 then '6' 
    when delay>=60 and delay<=89 then '5' 
    when delay>=90 and delay<=119 then '4' 
    when delay>=120 and delay<=149 then '3' 
    when delay>=150 and delay<=179 then '2' 
    else '1' 
    end as bucket 
    from mytable""") 
    bucket_df_1 = bucket_df_1.select(bucket_df_1.Id,bucket_df_1.Principal,bucket_df_1.delay,bucket_df_1.unpaid_emi,bucket_df_1.future_payment,bucket_df_1.bucket.cast("int").alias('buckets')) 
    min_bucket = bucket_df_1.groupBy('Id').min('buckets').withColumnRenamed('min(buckets)' , 'max_delay') 
    joinedDf = par.join(min_bucket, ["Id"]) 
    #joinedDf.printSchema() 

、以下の申請書を提出するためのコマンドです:

spark-submit \ 
--master yarn \ 
--driver-class-path /path to/mysql-connector-java-5.0.8-bin.jar \ 
--jars /path to/mysql-connector-java-5.0.8-bin.jar \ 
/path to/mycode.py 

ERROR:へ

17/11/10 10:00:34 INFO SparkSqlParser: Parsing command: mytable 
Traceback (most recent call last): 
    File "/path to/mycode.py", line 36, in <module> 
    from mytable""") 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/context.py", line 73, in __init__ 
AttributeError: 'str' object has no attribute '_jsc' 
17/11/10 10:00:34 INFO SparkContext: Invoking stop() from shutdown hook 
17/11/10 10:00:34 INFO SparkUI: Stopped Spark web UI at ........ 

私ははかなり新しいです誰かが私がやっている間違いを教えてもらえますか? また、コーディングスタイルの改善に関するご意見をお待ちしております。

スパークバージョン:2.2

答えて

1

あなたは(任意のスパークインスタンスにバインドされていません)あなたの一時テーブルを照会するためにSQLとしてインポートSQLContextを使用している、いない(初期化火花インスタンスから)spark.sql。私はまた、あなたの輸入とコードの一部を変更しました。

from __future__ import print_function 
from pyspark import SparkContext, SparkConf 
from pyspark.sql import SparkSession 



if __name__ == "__main__": 
    # move the initializations within the main 
    conf = SparkConf().setAppName("myapp") 
    # create the session 
    spark = SparkSession.builder.config(conf=conf) \ 
      .getOrCreate() 

    # load your data and do what you need to do 
    #inp_data = loaded data from db 
    df = inp_data.select('Id','DueDate','Principal','delay','unpaid_emi','future_payment') 
    filterd_unpaid_emi = df.filter(df.unpaid_emi == 1) 
    par = filterd_unpaid_emi.groupBy('Id').sum('Principal').withColumnRenamed('sum(Principal)' , 'par') 
    temp_df = df.filter(df.unpaid_emi == 1) 
    temp_df_1 = temp_df.filter(temp_df.future_payment == 0) 
    temp_df_1.registerTempTable("mytable") 

    # use spark.sql to query your table 
    bucket_df_1 = spark.sql("""select *, case 
    when delay<0 and delay ==0 then '9999' 
    when delay>0 and delay<7 then '9' 
    when delay>=7 and delay<=14 then '8' 
    when delay>=15 and delay<=29 then '7' 
    when delay>=30 and delay<=59 then '6' 
    when delay>=60 and delay<=89 then '5' 
    when delay>=90 and delay<=119 then '4' 
    when delay>=120 and delay<=149 then '3' 
    when delay>=150 and delay<=179 then '2' 
    else '1' 
    end as bucket 
    from mytable""") 

    bucket_df_1 = bucket_df_1.select(bucket_df_1.Id,bucket_df_1.Principal,bucket_df_1.delay,bucket_df_1.unpaid_emi,bucket_df_1.future_payment,bucket_df_1.bucket.cast("int").alias('buckets')) 
    min_bucket = bucket_df_1.groupBy('Id').min('buckets').withColumnRenamed('min(buckets)' , 'max_delay') 
    joinedDf = par.join(min_bucket, ["Id"]) 
    #joinedDf.printSchema() 

これは役立ちます、幸運!

+0

素晴らしい!それが助けになりました。 scを使用してSparkSessionを作成するのは間違った/間違ったアプローチですか? sc = SparkContext(conf = conf) spark = SparkSession(sc) – Sumit

+0

@Sumit嬉しいです! scを使って 'SparkSession'を作成するのは、2.0より前のバージョンのSparkでこれを行う方法でした。 Spark 2.2.0を使用しているので、SparkContextを暗黙的に使用することなくセッションを作成することができます。これにより、後で使用するのが容易になります。 [src](https://databricks.com/blog/2016/08/15/how-to-use-sparksession-in-apache-spark-2-0.html) – mkaran

関連する問題