2017-07-20 8 views
1

pythonでsparkを使ってサンプル関数を書いた。誰もでき、理想的な結果は、上記の機能のために13でなければならないが、結果は6パーティション間でスパークする機能を減らすpyspark

として来ている:

#!/usr/bin/env python 
from __future__ import print_function 
from pyspark.sql import SparkSession 
import os 
import sys 
os.environ["SPARK_HOME"] = "/usr/local/spark" 
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3.4" 

spark = SparkSession \ 
    .builder \ 
    .appName("testpython") \ 
    .getOrCreate() 
rdd1 = spark.sparkContext.parallelize([1,6,5,2,99,1000,100009,10000,139,44,45343,23234,34]) 
**rdd3=rdd1.reduce(lambda x,y:x+1)** 
print(rdd3) 

では、我々は(X + 1、λx、y)が与えられている機能を減らす次のように関数であります結果が13の代わりに6である理由を説明してください。 sparkのパーティション間でデータのディビジョンがあるためですか?

コンソール出力:ORG/apacheの/火花/ log4j-defaults.properties 設定デフォルト:スパークのデフォルトのlog4jのプロファイルを使用して

/usr/bin/python3.4 /home/PycharmProjects/sampleproject/ttestexmple.py ログレベルを「WARN」に設定します。 ロギングレベルを調整するには、sc.setLogLevel(newLevel)を使用します。 SparkRの場合は、setLogLevel(newLevel)を使用します。 17/07/20午後5時45分14秒NativeCodeLoaderをWARN:あなたは別のアドレス

にバインドする必要がある場合に設定SPARK_LOCAL_IP:utilsのWARNネイティブHadoopの 17/07/20午後5時45分14秒を読み込むことができません

終了コード0で終了したプロセス

+0

reduceを使って何をしたいですか? – sau

+0

私は最初のメンバーと要素の数を達成したいと思います。カウントや何か内部のような任意の既定の関数を使わずに言うことができます。rdd +(要素の数)の最初の数字を入力します –

+0

答えを提供しました。しかし、これは、削減が使われることになっている方法ではないことをご存じでしょうか。なぜデフォルト機能を使いたくないのかわかりません。 – sau

答えて

2

はい絶対に正しいです。

rdd1 = rdd1.coalesce(1) 
rdd2 = rdd1.reduce(lambda x,y: x+1) 

ここで期待される答えが得られます。

あなたのrddに複数のpartitonがあり、yをまったく使用していないreduceを使用しようとしているときです。だからあなたのrddは2つのパーティションを持っていると言いましょう。あなたはこのようなものを手に入れます。(reduce on partition 1, reduce on partition 2)そして最後にそれはあなたに与えますreduce result on partion 1 + 1

+0

データ全体を1つのパーティションにまとめると、パフォーマンスが低下しますか? –

+0

分散コンピューティングの考え方は、分散しているデータに依存します。あなたのタスクの特定の目的のために、より効率的なソリューションを考え出すべきです。私はあなたが何を達成したいかまだ明確ではありません。 – sau

+1

@svstejaまた、あなたが問題文を解決する場合にも答えを受け入れることができます。 – sau

関連する問題