0
私は以下のようなpyspark
スクリプトを持っています。このスクリプトでは、テーブル名とコードの実行のためにinput
ファイルをループしています。Pythonで反復処理されるたびに関数のログを取得する
今度は、mysql_spark
が繰り返されるたびにログを個別に収集したいと思います。例えば
:
input file
table1
table2
table3
今、私は1つのファイルにすべての3つのテーブルのログを持っています
pyspark
スクリプトを実行したとき。
What I want is 3 separate log files 1 for each table
Pyspark
スクリプト:pyspark
スクリプトファイルの実行を起動する
#!/usr/bin/env python
import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
#Condition to specify exact number of arguments in the spark-submit command line
if len(sys.argv) != 5:
print "Invalid number of args......"
print "Usage: spark-submit import.py Arguments"
exit()
args_file = sys.argv[1]
hivedb = sys.argv[2]
mysqldb=sys.argv[3]
mysqltable=sys.argv[4]
def mysql_spark(table, hivedb, mysqldb, mysqltable):
print "*********************************************************table = {} ***************************".format(table)
df = sqlContext.table("{}.{}".format(mysqldb, mysqltable))
df.registerTempTable("mytempTable")
sqlContext.sql("create table {}.{} as select * from mytempTable".format(hivedb,table))
input = sc.textFile('/user/XXXXXXXX/mysql_spark/%s' %args_file).collect()
for table in input:
mysql_spark(table, hivedb, mysqldb, mysqltable)
sc.stop()
Shell
スクリプト。
#!/bin/bash
source /home/$USER/mysql_spark/source.sh
[ $# -ne 1 ] && { echo "Usage : $0 table ";exit 1; }
args_file=$1
TIMESTAMP=`date "+%Y-%m-%d"`
touch /home/$USER/logs/${TIMESTAMP}.success_log
touch /home/$USER/logs/${TIMESTAMP}.fail_log
success_logs=/home/$USER/logs/${TIMESTAMP}.success_log
failed_logs=/home/$USER/logs/${TIMESTAMP}.fail_log
#Function to get the status of the job creation
function log_status
{
status=$1
message=$2
if [ "$status" -ne 0 ]; then
echo "`date +\"%Y-%m-%d %H:%M:%S\"` [ERROR] $message [Status] $status : failed" | tee -a "${failed_logs}"
exit 1
else
echo "`date +\"%Y-%m-%d %H:%M:%S\"` [INFO] $message [Status] $status : success" | tee -a "${success_logs}"
fi
}
spark-submit --name "${args_file}" --master "yarn-client" /home/$USER/mysql_spark/mysql_spark.py ${args_file} ${hivedb} ${mysqldb} ${mysqltable}
g_STATUS=$?
log_status $g_STATUS "Spark job ${args_file} Execution"
Sample log file:
Connection to spark
***************************table = table 1 ********************************
created dataframe
created table
delete temp directory
***************************table = table 2 ********************************
created dataframe
created table
delete temp directory
***************************table = table 3 ********************************
created dataframe
created table
delete temp directory
Expected output
table1.logfile
Connection to spark
***************************table = table 1 ********************************
created dataframe
created table
delete temp directory
table2.logfile
***************************table = table 1 ********************************
created dataframe
created table
delete temp directory
table3.logfile
***************************table = table 1 ********************************
created dataframe
created table
delete temp directory
shutdown sparkContext
私はこれをどのように達成することができますか?
これは可能ですか?
コードを使用してログファイルを作成できますが、ログ情報を持たない空のファイルです。 –
でも、あなたはファイルオブジェクトを持っています。条件をチェックし、そのファイルオブジェクトに必要なものを書くことができます。例えば –
: DF = sqlContext.table( "{} {}" 形式(MySQLdbは、mysqltable)。) IF(DF): \t logfile.write( "SQLコンテキストを作成しました。")他 : \t logfile.write( "SQLコンテキストが作成されていません。") \t このようなものです。 –