2017-11-21 6 views
0

で私はこのようなファイルを持っている(私はあなたにサンプルデータを提供していますが、ファイルが非常に大きい):別々の複数行レコードは、開始と終了デリミタ

私はQQとZZとの間でデータを読みたい
QQ 
1 
2 
3 
ZZ 
b 
QQ 
4 
5 
6 
ZZ 
a 
QQ 
9 
8 
23 

、だから私は、データフレームのようになるはずたい:

[1,2,3] 
[4,5,6] 
[9,8] 

私が試してみましたコードは以下の通りであるが、これは大規模なデータのために失敗しつつあります。

from pyspark.sql.types import * 
from pyspark import SparkContext 
from pyspark.sql import SQLContext 

path ="/tmp/Poonam.Raskar/Sample.txt" 
sc =SparkContext() 
sqlContext = SQLContext(sc) 
sc.setLogLevel("ERROR") 
textFile = sc.textFile(path) 

wi = textFile.zipWithIndex() 
startPos = wi.filter(lambda x: x[0].startswith('QQ')).map(lambda (key,index) : index).collect() 
endPos = wi.filter(lambda x: x[0].startswith('ZZ')).map(lambda (key,index) : index).collect() 
finalPos =zip(startPos,endPos) 
dtlRow =[] 

for pos in finalPos: 
     #print(pos) 
     #print(wi.filter()) 
     dtlRow1 = [[wi.filter(lambda x: x[1]==1).map(lambda (key,index) : key ,).collect() for i in range(pos[0],pos[1])]] #Required option for collect...program is taking long time while executing this statement 
     #print(dtlRow1) 
     dtlRow.append(dtlRow1) 


cSchema = StructType([StructField("DataFromList", ArrayType(StringType()))]) 
df = sqlContext.createDataFrame(dtlRow,schema=cSchema) 
print(df.show()) 
+1

あなたが提供するサンプルデータで動作することを意味しますか?そして、あなたは「失敗する」という意味ですか?コードのどこで、正確なエラーは何ですか? – desertnaut

+0

はい、サンプルデータではコードが機能していますが、大きなデータの場合は計算に無限の時間がかかります。 – Poonam

答えて

0

は、私はあなたの方法のための大規模なデータとの問題はあなたがスケールしませんRDDを、収集した中間ステップを有することであると思います。

# get a DF with a rownumber 
lst=['QQ', '1', '2', '3', 'ZZ', 'b', 'QQ', '4', '5', '6', 'ZZ', 'a', 'QQ', '9', '8', '23'] 
df=sc.parallelize(lst).zipWithIndex()\ 
    .map(lambda (x,i): Row(**{'col': x, 'rownum': i})).toDF() 

# hack to count cumulative occurrences of QQ 
winspec=Window.partitionBy().orderBy('rownum') 
df=df.withColumn('QQ_indicator', f.expr("case when col='QQ' then 1 else 0 end")) 
df=df.withColumn('QQ_indicator_cum', f.sum('QQ_indicator').over(winspec)) 

# ditto for ZZ 
df=df.withColumn('ZZ_indicator', f.expr("case when col='ZZ' then 1 else 0 end")) 
df=df.withColumn('ZZ_indicator_cum', f.sum('ZZ_indicator').over(winspec)) 

df.filter("QQ_indicator_cum=ZZ_indicator_cum+1 and not(col='QQ')")\ 
    .groupby('QQ_indicator_cum')\ 
    .agg(f.collect_list('col').alias('result'))\ 
    .select('result')\ 
    .show(3) 
+0

@https://stackoverflow.com/users/8671053/ags29:col.startwithを「col = 'QQ'、次に1 else 0 end」の代わりに追加できますか?私のファイルでは、データは何かQQ123、QQ456、QQ789で始まるので – Poonam

関連する問題