まず、Dataframe
の作成方法とその内容を説明しましょう。PySpark .groupBy()と.count()は、比較的小さなデータフレームでは遅くなります。
私はこのようなRDDs
へのパスのリストを提供し、これらのHTMLドキュメントに両方のために
をgziped HTML文書の一組とgzipedメタデータの1セットを持っている:私は両方の準備
Wet_Paths_RDD = sc.parallelize(Wet_Paths)
Wet_RDD = Wet_Paths_RDD.map(open_wet_filelist).flatMap(split_wetfiles)
行はこのようになっていることのようにRDDS:
(k,(some,other,values))
そして、私は私の内容と私のメタデータRDD
に参加RDD
一緒にこのような:
Wat_Wet_RDD = Wat_RDD.join(Wet_RDD)
し、私は今では相対的な複雑なタプルを解凍し、とりわけ言語検出を行います。私はRDDs
の結合を行う必要があります。これまでのところ、すべての文字列はDataframeで表現できないbyte strings
として表現されています。
Wat_Wet_RDD = Wat_Wet_RDD.map(unpack_wat_wet_tuple_and_decoding_and_langdetect)
私はその後Dataframe
に参加しRDD
を転送:
wat_wet_schema = StructType([
StructField("URI", StringType(), True),
StructField("Links", StringType(), True),
StructField("N_Links", IntegerType(), True),
StructField("Content_type", StringType(), True),
StructField("Original_Encoding", StringType(), True),
StructField("Content", StringType(), True),
StructField("Language", StringType(), True),
StructField("Language_confidence", IntegerType(), True),
])
WatWet_DF = sqlContext.createDataFrame(Wat_Wet_RDD, schema=wat_wet_schema)
し、それを見てみましょう:すべては24分が、次のステップを取るこれまで
print(WatWet_DF.show(20))
を:
print(WatWet_DF.groupBy(WatWet_DF.Language).count().orderBy(desc('count')).show(100))
24時間後に私はこの段階で1つのタスクを解決しなくても中止しました。
現時点では、単一のテスト用Linux VMでクラスタを実行しています。 VMには4つのコアがあり、同時にマスターとワーカーを実行しています。ワーカーにはそれぞれ3.5Gのメモリを搭載した4人の実行者がいます。データフレームは、約100万行から構成されている必要があります。 Apache Sparkのバージョンは2.1.0で、Python 3.5が使用されています。 VMは24GのRAMを搭載した日付の付いたXeon W3680 6(v12)コアの上を走ります。
「WatWet_DF」にはいくつの行がありますか? – Jaco
それは約百万を与えるか、または取るべきである。 '.count()'だけですでに非常に長い時間がかかります。 'WatWet_DF.content'フィールドには文書全体が含まれているので、おそらく' Dataframe'が遅いのですか? – Thagor