誰かがこのPySparkの問題を手伝ってくれますか?私はこの日を過ごしました。私が何度も印刷したときにスキーマの長さが変わる理由を理解できません。 Sparkのバージョンは2.2です。Jupyter Notebookを使用して、20ノードのクラスタ上でコードを実行しています。ここでflatMapの後にスキーマが変更され、pysparkに複数回出力されたとき
は私のコードです:ここでは
import myReader
# read data from binary files
data=sc.binaryFiles('Data/20171006')
# binary file reader convert binary file to a tuple of schema and data array
# the 1st item in the tuple is the schema of type StructType
# the 2nd item in the tuple is a numpy 2D array
tableWithSchemaRDD = data.map(myReader.convert)
# print out the length of the schema to check its length
# since the schema is the same for all items in the RDD, I only check the first one
print "1st print: ", len(tableWithSchemaRDD.first()[0])
# extract the data array from RDD
tableRDD = tableWithSchemaRDD.map(lambda x:x[1])
# print out the length of the schema to check its length again
print "2nd print: ", len(tableWithSchemaRDD.first()[0])
# flatmap so each item in the rdd is a row instead of 2D array
# and sort all the rows by the last item in each row, which is a timestamp
rowRDD = tableRDD.flatMap(lambda y:[z for z in y]).sortBy(lambda x:x[-1])
# print out the length of the schema multiple times
print "multiple print: "
print len(tableWithSchemaRDD.first()[0])
print len(tableWithSchemaRDD.first()[0])
print len(tableWithSchemaRDD.first()[0])
print len(tableWithSchemaRDD.first()[0])
print len(tableWithSchemaRDD.first()[0])
print len(tableWithSchemaRDD.first()[0])
が出力されます。
1st print: 73
2nd print: 73
multiple print:
3961
3961
3961
3961
73
73
あなたが最初の2 print
はRDDの正しい長さをプリントアウトしますが、flatMap
後にすることを見ることができるようにそれははるかに大きな数に増加しました。その後、しばらくして4 print
の文が正しい長さに戻りました。時々、間違った数は3961ではなく、72の倍数の他の任意の倍数に1を加えたものです。私の推測は、私のスキーマの最初の72のStructFieldがデータの名前で、73番目のStructFieldはタイムスタンプなので3961は72 * 51 + 1です。私は11737,23401,99793,2017なども見ました。
もう1つのことは、コードをmyReader
に置き換えた場合ですそれをモジュールとしてインポートする。この問題は私には分かりません。私はsc.addPyFile()
を使ってモジュールをノードに配布しました。
ご意見やご提案がありがとうございます。ありがとうございました!
ありがとうございました。私はなぜあなたは結びつきがあると思いますか?すべてのタイムスタンプは一意で、 'sortBy'に使用されます。 – dangwh