こんにちは、地球の人々! Airflowを使用してSparkタスクのスケジュールと実行を行っています。 今回私が気づいたのは、Airflowが管理できるPython DAGです。
DAG例:エアフローでスパークコードを実行するには?
spark_count_lines.py
import logging
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime
args = {
'owner': 'airflow'
, 'start_date': datetime(2016, 4, 17)
, 'provide_context': True
}
dag = DAG(
'spark_count_lines'
, start_date = datetime(2016, 4, 17)
, schedule_interval = '@hourly'
, default_args = args
)
def run_spark(**kwargs):
import pyspark
sc = pyspark.SparkContext()
df = sc.textFile('file:///opt/spark/current/examples/src/main/resources/people.txt')
logging.info('Number of lines in people.txt = {0}'.format(df.count()))
sc.stop()
t_main = PythonOperator(
task_id = 'call_spark'
, dag = dag
, python_callable = run_spark
)
問題は、私はPythonコードで苦手とJavaで書かれたいくつかのタスクを持っています。私の質問は、Python DAGでSpark Java jarを実行する方法です。それとも他の方法がありますか?私は火花の提出を見つけた:http://spark.apache.org/docs/latest/submitting-applications.html
しかし、私はすべてを一緒に接続する方法を知らない。多分誰かがそれを以前に使っていて、実際の例を持っているのかもあなたの時間をありがとう!
SparkSQLOperatorは必要なもののようですが、接続文字列がどのように表示されるかわからないため、動作させることができません。 –
設定しない場合、接続はデフォルトで糸実行モードになります - https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py#L33を参照してください。 – Tagar