2016-11-26 12 views
1

私はSpark v1.5.2を使用しています。私はPythonでプログラムを書いたので、なぜ入力ファイルを2度読み込むのか分かりません。 Scalaで書かれた同じプログラムは入力ファイルを一度だけ読み込みます。Spark:なぜPythonは入力ファイルを2回読み込むのですか?

アキュムレータを使用して、map()が呼び出された回数をカウントします。アキュムレータの値から、入力ファイルの読み込み回数を推測します。
入力ファイルには3行のテキストが含まれています。

のPython:

from pyspark import SparkContext, SQLContext 
from pyspark.sql.types import * 

def createTuple(record): # used with map() 
    global map_acc 
    map_acc += 1 
    return (record[0], record[1].strip()) 

sc   = SparkContext(appName='Spark test app') # appName is shown in the YARN UI 
sqlContext = SQLContext(sc) 
map_acc = sc.accumulator(0) 
lines  = sc.textFile("examples/src/main/resources/people.txt") 
people_rdd = lines.map(lambda l: l.split(",")).map(createTuple) #.cache() 
fieldNames = 'name age' 
fields  = [StructField(field_name, StringType(), True) for field_name in fieldNames.split()] 
schema  = StructType(fields) 
df   = sqlContext.createDataFrame(people_rdd, schema) 
print 'record count DF:', df.count() 
print 'map_acc:', map_acc.value 
#people_rdd.unpersist() 
$ spark-submit --master local[1] test.py 2> err 
record count DF: 3 
map_acc: 6    ##### why 6 instead of 3?? 

スカラ:

import org.apache.spark._ 
import org.apache.spark.sql.Row; 
import org.apache.spark.sql.types.{StructType,StructField,StringType}; 

object SimpleApp { 
    def main(args: Array[String]) { 
    def createTuple(record:Array[String], map_acc: Accumulator[Int]) = { // used with map() 
     map_acc += 1 
     Row(record(0), record(1).trim) 
    } 
    val conf  = new SparkConf().setAppName("Scala Test App") 
    val sc   = new SparkContext(conf) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    val map_acc = sc.accumulator(0) 
    val lines  = sc.textFile("examples/src/main/resources/people.txt") 
    val people_rdd = lines.map(_.split(",")).map(createTuple(_, map_acc)) 
    val fieldNames = "name age" 
    val schema  = StructType(
     fieldNames.split(" ").map(fieldName => StructField(fieldName, StringType, true))) 
    val df   = sqlContext.createDataFrame(people_rdd, schema) 
    println("record count DF: " + df.count) 
    println("map_acc: " + map_acc.value) 
    } 
} 
$ spark-submit ---class SimpleApp --master local[1] test.jar 2> err 
record count DF: 3 
map_acc: 3 

私は、Pythonプログラムからのコメントを削除し、RDD、その後、入力されたファイルをキャッシュした場合2回読み込まれません。しかし、私はRDDをキャッシュしなければならないとは思わないでしょうか? Scala版では、RDDをキャッシュする必要はありません。

people_rdd = lines.map(lambda l: l.split(",")).map(createTuple).cache() 
... 
people_rdd.unpersist() 
$ spark-submit --master local[1] test.py 2> err 
record count DF: 3 
map_acc: 3 
$ hdfs dfs -cat examples/src/main/resources/people.txt 
Michael, 29 
Andy, 30 
Justin, 19 

答えて

1

それは理由1.5 createDataFrameeagerly validates provided schema on a few elementsに起こります。これに対し、現在のバージョンでは

elif isinstance(schema, StructType): 
    # take the first few rows to verify schema 
    rows = rdd.take(10) 
    for row in rows: 
     _verify_type(row, schema) 

validate schema for all elements but it is done lazilyとあなたは同じ行動を見ていないでしょう。たとえば、これは1.5で瞬時に失敗します:あなたはDataFrameを評価しようとすると、

from pyspark.sql.types import * 

rdd = sc.parallelize([("foo",)]) 
schema = StructType([StructField("foo", IntegerType(), False)]) 
sqlContext.createDataFrame(rdd, schema) 

同等の2.0は失敗します。

一般に、あなたが厳密にSQL APIとのやりとりに限定しない限り、PythonとScalaコードが同じように動作するとは思わないはずです。 PySpark:

  • ほとんどすべてのRDDメソッドがネイティブに実装されているため、同じ連鎖が異なるDAGになる可能性があります。
  • Java APIとの相互作用では、Javaクラスの型情報を提供するために熱心な評価が必要な場合があります。
関連する問題