2017-10-06 13 views
0

誰かがこのPySparkの問題を手伝ってくれますか?私はこの日を過ごしました。私が何度も印刷したときにスキーマの長さが変わる理由を理解できません。 Sparkのバージョンは2.2です。Jupyter Notebookを使用して、20ノードのクラスタ上でコードを実行しています。ここでflatMapの後にスキーマが変更され、pysparkに複数回出力されたとき

は私のコードです:ここでは

import myReader 

    # read data from binary files 
    data=sc.binaryFiles('Data/20171006') 

    # binary file reader convert binary file to a tuple of schema and data array 
    # the 1st item in the tuple is the schema of type StructType 
    # the 2nd item in the tuple is a numpy 2D array 
    tableWithSchemaRDD = data.map(myReader.convert) 

    # print out the length of the schema to check its length 
    # since the schema is the same for all items in the RDD, I only check the first one 
    print "1st print: ", len(tableWithSchemaRDD.first()[0]) 

    # extract the data array from RDD 
    tableRDD = tableWithSchemaRDD.map(lambda x:x[1]) 

    # print out the length of the schema to check its length again 
    print "2nd print: ", len(tableWithSchemaRDD.first()[0]) 

    # flatmap so each item in the rdd is a row instead of 2D array 
    # and sort all the rows by the last item in each row, which is a timestamp 
    rowRDD = tableRDD.flatMap(lambda y:[z for z in y]).sortBy(lambda x:x[-1]) 

    # print out the length of the schema multiple times 
    print "multiple print: " 
    print len(tableWithSchemaRDD.first()[0]) 
    print len(tableWithSchemaRDD.first()[0]) 
    print len(tableWithSchemaRDD.first()[0]) 
    print len(tableWithSchemaRDD.first()[0]) 
    print len(tableWithSchemaRDD.first()[0]) 
    print len(tableWithSchemaRDD.first()[0]) 

が出力されます。

1st print: 73 
    2nd print: 73 
    multiple print: 
    3961 
    3961 
    3961 
    3961 
    73 
    73 

あなたが最初の2 printはRDDの正しい長さをプリントアウトしますが、flatMap後にすることを見ることができるようにそれははるかに大きな数に増加しました。その後、しばらくして4 printの文が正しい長さに戻りました。時々、間違った数は3961ではなく、72の倍数の他の任意の倍数に1を加えたものです。私の推測は、私のスキーマの最初の72のStructFieldがデータの名前で、73番目のStructFieldはタイムスタンプなので3961は72 * 51 + 1です。私は11737,23401,99793,2017なども見ました。

もう1つのことは、コードをmyReaderに置き換えた場合ですそれをモジュールとしてインポートする。この問題は私には分かりません。私はsc.addPyFile()を使ってモジュールをノードに配布しました。

ご意見やご提案がありがとうございます。ありがとうございました!

答えて

0

この質問が今述べられているので、結果について予期しないことはありません。あなた以来ソート:

.sortBy(lambda x: x[-1]) 

スパークは、データをシャッフルしていると、ネクタイの場合は、first一つとして任意の要素を選択することができます。その結果、値は実行ごとに変更される可能性があります。

+0

ありがとうございました。私はなぜあなたは結びつきがあると思いますか?すべてのタイムスタンプは一意で、 'sortBy'に使用されます。 – dangwh

関連する問題