で私はこのようなファイルを持っている(私はあなたにサンプルデータを提供していますが、ファイルが非常に大きい):別々の複数行レコードは、開始と終了デリミタ
私はQQとZZとの間でデータを読みたいQQ
1
2
3
ZZ
b
QQ
4
5
6
ZZ
a
QQ
9
8
23
、だから私は、データフレームのようになるはずたい:
[1,2,3]
[4,5,6]
[9,8]
私が試してみましたコードは以下の通りであるが、これは大規模なデータのために失敗しつつあります。
from pyspark.sql.types import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
path ="/tmp/Poonam.Raskar/Sample.txt"
sc =SparkContext()
sqlContext = SQLContext(sc)
sc.setLogLevel("ERROR")
textFile = sc.textFile(path)
wi = textFile.zipWithIndex()
startPos = wi.filter(lambda x: x[0].startswith('QQ')).map(lambda (key,index) : index).collect()
endPos = wi.filter(lambda x: x[0].startswith('ZZ')).map(lambda (key,index) : index).collect()
finalPos =zip(startPos,endPos)
dtlRow =[]
for pos in finalPos:
#print(pos)
#print(wi.filter())
dtlRow1 = [[wi.filter(lambda x: x[1]==1).map(lambda (key,index) : key ,).collect() for i in range(pos[0],pos[1])]] #Required option for collect...program is taking long time while executing this statement
#print(dtlRow1)
dtlRow.append(dtlRow1)
cSchema = StructType([StructField("DataFromList", ArrayType(StringType()))])
df = sqlContext.createDataFrame(dtlRow,schema=cSchema)
print(df.show())
あなたが提供するサンプルデータで動作することを意味しますか?そして、あなたは「失敗する」という意味ですか?コードのどこで、正確なエラーは何ですか? – desertnaut
はい、サンプルデータではコードが機能していますが、大きなデータの場合は計算に無限の時間がかかります。 – Poonam