2016-08-31 13 views
3

私はluigiいくつかのpysparkライブラリを含むpythonタスクを持っています。今私はspark-submitでmesosにこのタスクを提出したいと思います。それを実行するにはどうすればよいですか?以下は、私のコードのスケルトンです:ルイジなしspark-submitとpysparkでluigiタスクを実行するには

from pyspark.sql import functions as F 
from pyspark import SparkContext 

class myClass(SparkSubmitTask): 
# date = luigi.DateParameter() 

    def __init__(self, date): 
    self.date = date # date is datetime.date.today().isoformat() 

    def output(self): 

    def input(self): 

    def run(self): 
    # Some functions are using pyspark libs 

if __name__ == "__main__": 
    luigi.run() 

、私は次のコマンドラインとして、このタスクをsubmmittingよ:

/opt/spark/bin/spark-submit --master mesos://host:port --deploy-mode cluster --total-executor-cores 1 --driver-cores 1 --executor-memory 1G --driver-memory 1G my_module.py 

今の問題は、私がすることができますどのようルイージタスクをスパーク、提出されmy_module.pyが最初に終了する必要タスクを持っている場合

luigi --module my_module myClass --local-scheduler --date 2016-01 

もう一つの質問は、私はそれのためにもっと何かをするか、単に現在のコマンド - と同じに設定する必要がありますか、次のとおりです。ルイジコマンドラインなどが含まライン?

本当にありがとうございました。どうもありがとう。

答えて

4

ルイージにはいくつかのテンプレートタスクがあります。そのうちの1人がPySparkTaskを呼び出しました。 このクラスから継承し、プロパティをオーバーライドすることができます。

https://github.com/spotify/luigi/blob/master/luigi/contrib/spark.py

私はそれをテストしたが、ルイジとの私の経験をもとにしていない私はこれを試してみなければなりません:

import my_module 


class MyPySparkTask(PySparkTask): 
    date = luigi.DateParameter() 

    @property 
    def name(self): 
     return self.__class__.__name__ 

    @property 
    def master(self): 
     return 'mesos://host:port' 

    @property 
    def deploy_mode(self): 
     return 'cluster' 

    @property 
    def total_executor_cores(self): 
     return 1 

    @property 
    def driver_cores(self): 
     return 1 

    @property 
    def executor-memory(self): 
     return 1G 

    @property 
    def driver-memory(self): 
     return 1G 

    def main(self, sc, *args): 
     my_module.run(sc) 

    def self.app_options(): 
     return [date] 

その後、あなたがそれを実行することができます: ルイジ--module task_module MyPySparkTask --local-スケジューラ - -date 2016から01

それらを他のPySparkTasksのデフォルト値を作るためにclient.cfgのファイルのプロパティを設定するオプションもあります。

[spark] 
master: mesos://host:port 
deploy_mode: cluster 
total_executor_cores: 1 
driver_cores: 1 
executor-memory: 1G 
driver-memory: 1G 
+0

こんにちはAYあなたのコメントにとても感謝しています。私はすでにSparkSubmitTaskからクラスを継承していますが、代わりにこのクラスを使用できますか?私がMyPySparkTaskを使用している場合は、単にclient.cfgでsparkコマンドを設定してから、「luigi --module task_module MyPySparkTask --local-scheduler --date 2016-01」というコマンドを実行するだけです。 – zuhakasa

+0

1. SparkSubmitTask(上記の例ではMyPySparkTaskのような)ではなくPySparkTaskから継承し、 'main'メソッドでロジックを実装する必要があります。 2.設定はSparkSubmitTaskと同じですが、client.cfgでプロパティを定義したのでしょうか、またはプロパティを上書きしましたか? 3.実行はSparkSubmitTaskを実行するのと似ています – ayun12

+0

ご協力いただきありがとうございます。私が尋ねたいのは、mainメソッドのコードで、my_module.run(sc)を呼び出すことです。しかし、私が知る限り、luigi.Task my_module関数はtask_module.pyから継承していますが、メソッド実行時にはパラメータscはありません。つまり、私はtask_module.pyでこのメソッドをオーバーライドする必要がありますか? – zuhakasa

関連する問題