2017-08-14 5 views
2

pysparkスクリプトが正常に動作しています。このスクリプトは、mysqlからデータをフェッチし、HDFSにハイブテーブルを作成します。シェルスクリプトを使用してPythonで関数のログを収集する

pysparkスクリプトは以下のとおりです。

#!/usr/bin/env python 
import sys 
from pyspark import SparkContext, SparkConf 
from pyspark.sql import HiveContext 
conf = SparkConf() 
sc = SparkContext(conf=conf) 
sqlContext = HiveContext(sc) 

#Condition to specify exact number of arguments in the spark-submit command line 
if len(sys.argv) != 8: 
    print "Invalid number of args......" 
    print "Usage: spark-submit import.py Arguments" 
    exit() 
table = sys.argv[1] 
hivedb = sys.argv[2] 
domain = sys.argv[3] 
port=sys.argv[4] 
mysqldb=sys.argv[5] 
username=sys.argv[6] 
password=sys.argv[7] 

df = sqlContext.read.format("jdbc").option("url", "{}:{}/{}".format(domain,port,mysqldb)).option("driver", "com.mysql.jdbc.Driver").option("dbtable","{}".format(table)).option("user", "{}".format(username)).option("password", "{}".format(password)).load() 

#Register dataframe as table 
df.registerTempTable("mytempTable") 

# create hive table from temp table: 
sqlContext.sql("create table {}.{} as select * from mytempTable".format(hivedb,table)) 

sc.stop() 

今、このpysparkスクリプトはshellスクリプトを使用して呼び出されます。このシェルスクリプトでは、ファイルから引数としてテーブル名を渡しています。

shell scriptは以下のとおりです。

#!/bin/bash 

source /home/$USER/spark/source.sh 
[ $# -ne 1 ] && { echo "Usage : $0 table ";exit 1; } 

args_file=$1 

TIMESTAMP=`date "+%Y-%m-%d"` 
touch /home/$USER/logs/${TIMESTAMP}.success_log 
touch /home/$USER/logs/${TIMESTAMP}.fail_log 
success_logs=/home/$USER/logs/${TIMESTAMP}.success_log 
failed_logs=/home/$USER/logs/${TIMESTAMP}.fail_log 

#Function to get the status of the job creation 
function log_status 
{ 
     status=$1 
     message=$2 
     if [ "$status" -ne 0 ]; then 
       echo "`date +\"%Y-%m-%d %H:%M:%S\"` [ERROR] $message [Status] $status : failed" | tee -a "${failed_logs}" 
       #echo "Please find the attached log file for more details" 
       exit 1 
       else 
        echo "`date +\"%Y-%m-%d %H:%M:%S\"` [INFO] $message [Status] $status : success" | tee -a "${success_logs}" 
       fi 
} 
while read -r table ;do 
    spark-submit --name "${table}" --master "yarn-client" --num-executors 2 --executor-memory 6g --executor-cores 1 --conf "spark.yarn.executor.memoryOverhead=609" /home/$USER/spark/sql_spark.py ${table} ${hivedb} ${domain} ${port} ${mysqldb} ${username} ${password} > /tmp/logging/${table}.log 2>&1 
    g_STATUS=$? 
    log_status $g_STATUS "Spark job ${table} Execution" 
done < "${args_file}" 

echo "************************************************************************************************************************************************************************" 

上記のシェルスクリプトを使用して、args_fileの各テーブルのログを収集できます。

私はmysqlに200以上のテーブルを持っています。私は以下のようにpysparkスクリプトを修正しました。私はargs_fileを介して作成し、コードを実行する関数を作成しています。

New spark script

#!/usr/bin/env python 
import sys 
from pyspark import SparkContext, SparkConf 
from pyspark.sql import HiveContext 
conf = SparkConf() 
sc = SparkContext(conf=conf) 
sqlContext = HiveContext(sc) 

#Condition to specify exact number of arguments in the spark-submit command line 
if len(sys.argv) != 8: 
    print "Invalid number of args......" 
    print "Usage: spark-submit import.py Arguments" 
    exit() 
args_file = sys.argv[1] 
hivedb = sys.argv[2] 
domain = sys.argv[3] 
port=sys.argv[4] 
mysqldb=sys.argv[5] 
username=sys.argv[6] 
password=sys.argv[7] 

def testing(table, hivedb, domain, port, mysqldb, username, password): 

    print "*********************************************************table = {} ***************************".format(table) 
    df = sqlContext.read.format("jdbc").option("url", "{}:{}/{}".format(domain,port,mysqldb)).option("driver", "com.mysql.jdbc.Driver").option("dbtable","{}".format(table)).option("user", "{}".format(username)).option("password", "{}".format(password)).load() 

    #Register dataframe as table 
    df.registerTempTable("mytempTable") 

    # create hive table from temp table: 
    sqlContext.sql("create table {}.{} stored as parquet as select * from mytempTable".format(hivedb,table)) 

input = sc.textFile('/user/XXXXXXX/spark_args/%s' %args_file).collect() 

for table in input: 
testing(table, hivedb, domain, port, mysqldb, username, password) 

sc.stop() 

今私はargs_fileに、個々のテーブルのためのログを収集します。しかし、私はすべてのテーブルのログを持つログファイルを1つだけ取得しています。

私の要求をどのように達成できますか?それとも私がやっている方法は完全に間違っている

新しいシェルスクリプト:

spark-submit --name "${args_file}" --master "yarn-client" --num-executors 2 --executor-memory 6g --executor-cores 1 --conf "spark.yarn.executor.memoryOverhead=609" /home/$USER/spark/sql_spark.py ${table} ${hivedb} ${domain} ${port} ${mysqldb} ${username} ${password} > /tmp/logging/${args_file}.log 2>&1 
+0

まだpython sparkを呼び出すためにbashスクリプトを使用していますか? – sal

+0

@salはい私はまだ同じシェルスクリプトを使用しています –

+0

@sal私は別のことをする必要があるかどうか教えてください –

答えて

1

何ができることは、単一のログファイルを取得し、ログファイルをカットしますpythonスクリプトを記述していますそれ以前の行printstableの名前です。例えば

*************************************table=table1*************** 

その後、次のログファイルが

*************************************table=table2**************** 

から始まり、など。テーブル名をファイル名にすることもできます

関連する問題