私は/usr/bin/python3
あなたがspark-env
スコープの下でクラスタ構成で定義されている環境変数PYTHONHASHSEED
を拾っていないと信じています。
あなたが代わりに/usr/bin/python3
のpython34
を使用し、以下のように設定を設定はず:
[
{
"classification":"spark-defaults",
"properties":{
// [...]
}
},
{
"configurations":[
{
"classification":"export",
"properties":{
"PYSPARK_PYTHON":"python34",
"PYTHONHASHSEED":"123"
}
}
],
"classification":"spark-env",
"properties":{
// [...]
}
}
]
をそれでは、それをテストしてみましょう。私はbashスクリプトの呼び出しの両方python
Sを定義します。
#!/bin/bash
echo "using python34"
for i in `seq 1 10`;
do
python -c "print(hash('foo'))";
done
echo "----------------------"
echo "using /usr/bin/python3"
for i in `seq 1 10`;
do
/usr/bin/python3 -c "print(hash('foo'))";
done
評決:
[[email protected] ~]$ bash test.sh
using python34
-4177197833195190597
-4177197833195190597
-4177197833195190597
-4177197833195190597
-4177197833195190597
-4177197833195190597
-4177197833195190597
-4177197833195190597
-4177197833195190597
-4177197833195190597
----------------------
using /usr/bin/python3
8867846273747294950
-7610044127871105351
6756286456855631480
-4541503224938367706
7326699722121877093
3336202789104553110
3462714165845110404
-5390125375246848302
-7753272571662122146
8018968546238984314
PS1:私はAMIリリースemr-4.8.2
を使用しています。
PS2:スニペットはthis answerからインスパイアされました。
編集:私はpyspark
を使用して以下をテストしました。以下のよう
[...]
16/11/22 07:28:42 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
[-5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594] // THE RELEVANT LINE IS HERE.
16/11/22 07:28:42 INFO SparkContext: Invoking stop() from shutdown hook
[...]
:
from pyspark import SparkContext
sc = SparkContext(appName = "simple-app")
numbers = [hash('foo') for i in range(10)]
print(numbers)
も完璧に動作するようです:
[[email protected]*** ~]$ spark-submit --master yarn simple_app.py
が出力(切り捨て)
16/11/22 07:16:56 INFO EventLoggingListener: Logging events to hdfs:///var/log/spark/apps/application_1479798580078_0001
16/11/22 07:16:56 INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
Welcome to
____ __
/__/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__/.__/\_,_/_/ /_/\_\ version 1.6.2
/_/
Using Python version 3.4.3 (default, Sep 1 2016 23:33:38)
SparkContext available as sc, HiveContext available as sqlContext.
>>> print(hash('foo'))
-2457967226571033580
>>> print(hash('foo'))
-2457967226571033580
>>> print(hash('foo'))
-2457967226571033580
>>> print(hash('foo'))
-2457967226571033580
>>> print(hash('foo'))
-2457967226571033580
はまた、シンプルなアプリケーション(simple_app.py
)を作成しましたあなたはできる毎回同じハッシュを返すこともできます。
EDIT 2:コメントから、あなたが執行していないドライバーのハッシュを計算しようとしているようにそれが伝播することができますので、このようにあなたはスパークアプリケーション構成内で、spark.executorEnv.PYTHONHASHSEED
を設定する必要がありますようですエグゼクティブに(それを行う方法の1つです)。
注:執行者のための環境変数を設定すると、このようにsimple_app.py
と、次のミニマリストの例をspark.executorEnv.[EnvironmentVariableName].
を使用し、糸クライアントと同じです:
from pyspark import SparkContext, SparkConf
conf = SparkConf().set("spark.executorEnv.PYTHONHASHSEED","123")
sc = SparkContext(appName="simple-app", conf=conf)
numbers = sc.parallelize(['foo']*10).map(lambda x: hash(x)).collect()
print(numbers)
そして今のはそれをテストしてみましょう再び。ここに切り詰められた出力があります:
16/11/22 14:14:34 INFO DAGScheduler: Job 0 finished: collect at /home/hadoop/simple_app.py:6, took 14.251514 s
[-5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594, -5869373620241885594]
16/11/22 14:14:34 INFO SparkContext: Invoking stop() from shutdown hook
これはすべてをカバーすると思います。
あなたの答えをありがとうが、残念ながらそれは動作していないようです。あなたのスクリプトに問題があり、configはspark pythonのバージョンをpython34に設定するだけです。デフォルトのシェル "python"はまだPython2.xを指しています。 pythonを/ usr/bin/python34に置き換えると、毎回異なるハッシュ値が表示されます。 –
あなたの例は、1つのPythonインスタンスのドライバノード上でしか実行されません。パラレルコレクションを作成してspark-submitを実行すると、異なるハッシュ値が表示されます(少なくとも3ノードクラスタの場合)。 "numbers = ..."行を:numbers = sc.parallelize(['foo'] * 10).map(ラムダx:ハッシュ(x))に置き換えた場合、collect() –
これは素晴らしい私が必要とするものを正確に実行します。 –