2016-10-03 5 views
10

こんにちは、地球の人々! Airflowを使用してSparkタスクのスケジュールと実行を行っています。 今回私が気づいたのは、Airflowが管理できるPython DAGです。
DAG例:エアフローでスパークコードを実行するには?

spark_count_lines.py 
import logging 

from airflow import DAG 
from airflow.operators import PythonOperator 

from datetime import datetime 

args = { 
    'owner': 'airflow' 
    , 'start_date': datetime(2016, 4, 17) 
    , 'provide_context': True 
} 

dag = DAG(
    'spark_count_lines' 
    , start_date = datetime(2016, 4, 17) 
    , schedule_interval = '@hourly' 
    , default_args = args 
) 

def run_spark(**kwargs): 
    import pyspark 
    sc = pyspark.SparkContext() 
    df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt') 
    logging.info('Number of lines in people.txt = {0}'.format(df.count())) 
    sc.stop() 

t_main = PythonOperator(
    task_id = 'call_spark' 
    , dag = dag 
    , python_callable = run_spark 
) 

問題は、私はPythonコードで苦手とJavaで書かれたいくつかのタスクを持っています。私の質問は、Python DAGでSpark Java jarを実行する方法です。それとも他の方法がありますか?私は火花の提出を見つけた:http://spark.apache.org/docs/latest/submitting-applications.html
しかし、私はすべてを一緒に接続する方法を知らない。多分誰かがそれを以前に使っていて、実際の例を持っているのかもあなたの時間をありがとう!

答えて

9

BashOperatorを使用することができます。

from airflow.operators.bash_operator import BashOperator 

import os 
import sys 

セット必要なパス:

os.environ['SPARK_HOME'] = '/path/to/spark/root' 
sys.path.append(os.path.join(os.environ['SPARK_HOME'], 'bin')) 

および追加オペレータ:あなたはこれを簡単に拡張することができ

spark_task = BashOperator(
    task_id='spark_java', 
    bash_command='spark-submit --class {{ params.class }} {{ params.jar }}', 
    params={'class': 'MainClassName', 'jar': '/path/to/your.jar'}, 
    dag=dag 
) 

としては、あなたのコードの残りの部分、必要なクラスをインポートし、システムのパッケージを維持Jinjaテンプレートを使用して追加の引数を提供します。

もちろん、たとえば、あなたのケースでは、適切なテンプレートを使用してbash_commandを交換することにより、非スパークのシナリオのためにこれを調整することができます。

bash_command = 'java -jar {{ params.jar }}' 

paramsを調整します。バージョン1.8(本日発表)のよう

6

エアフローは、

があり

SparkSQLHookコード - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_sql_hook.py

SparkSubmitHookコード - これら二つの新しいスパークオペレーター/フックのように "寄贈" の分岐になっていることをhttps://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py

お知らせ1.8バージョンはそうではない(よく)文書化されています。

SparkSubmitOperatorを使用すると、Sparkの実行用のJavaコードを送信できます。

+0

SparkSQLOperatorは必要なもののようですが、接続文字列がどのように表示されるかわからないため、動作させることができません。 –

+0

設定しない場合、接続はデフォルトで糸実行モードになります - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py#L33を参照してください。 – Tagar

関連する問題