2016-10-29 5 views
1

基本的には、データの各要素を10行にする必要があります。ただし、次のコードでは、各要素はまだ1行です。私はここで何をしていますか?設計によってNLineInputFormatがSparkで動作しない

val conf = new SparkConf().setAppName("MyApp") 
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
conf.registerKryoClasses(Array[Class[_]](classOf[NLineInputFormat], classOf[LongWritable], 
classOf[Text])) 
val sc = new SparkContext(conf) 

val c = new Configuration(sc.hadoopConfiguration) 
c.set("lineinputformat.linespermap", 10); 
val data = sc.newAPIHadoopFile(fname, classOf[NLineInputFormat], classOf[LongWritable], 
classOf[Text], c) 

答えて

4

NLineInputFormatだけdoesn't perform operation you expect it to:一方の分割などの入力のN行を分割

NLineInputFormat。 (...)は入力ファイルを分割します。デフォルトでは、1行は値として1つのマップタスクに供給されます。

このように、レコードの決定方法ではなく、分割(Spark命名法のパ​​ーティション)の計算方法が変更されています。

説明が明確でない場合、我々は、次の例であることを示すことができる:各パーティション上のショーとして

def nline(n: Int, path: String) = { 
    val sc = SparkContext.getOrCreate 
    val conf = new Configuration(sc.hadoopConfiguration) 
    conf.setInt("mapreduce.input.lineinputformat.linespermap", n); 

    sc.newAPIHadoopFile(path, 
    classOf[NLineInputFormat], classOf[LongWritable], classOf[Text], conf 
) 
} 

require(nline(1, "README.md").glom.map(_.size).first == 1) 
require(nline(2, "README.md").glom.map(_.size).first == 2) 
require(nline(3, "README.md").glom.map(_.size).first == 3) 

を(おそらく最後のものを除く)を正確にn行含ま。

ケースに合わせてこれを改装することはできますが、linespermapパラメータの小さな値にはお勧めできません。

関連する問題