1

これは私のpython-sparkコードの一部です。 特にコードのこの部分は、速度を向上させたいと思っていますが、方法はわかりません。現在、6,000万のデータ行で約1分かかっており、10秒未満に改善したいと考えています。私のスパークアプリのspark appのスピードを改善する

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load() 

もっとコンテキスト:

article_ids = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="article_by_created_at", keyspace=source).load().where(range_expr).select('article','created_at').repartition(64*2) 

axes = sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load() 
speed_df = article_ids.join(axes,article_ids.article==axes.article).select(axes.article,axes.at,axes.comments,axes.likes,axes.reads,axes.shares) \ 
    .map(lambda x:(x.article,[x])).reduceByKey(lambda x,y:x+y) \ 
    .map(lambda x:(x[0],sorted(x[1],key=lambda y:y.at,reverse = False))) \ 
    .filter(lambda x:len(x[1])>=2) \ 
    .map(lambda x:x[1][-1]) \ 
    .map(lambda x:(x.article,(x,(x.comments if x.comments else 0)+(x.likes if x.likes else 0)+(x.reads if x.reads else 0)+(x.shares if x.shares else 0))))  

おかげで、あなたの提案をたくさん。

EDIT:

Countが(50代)の時間の大半を占める

に参加していない、私はまたして増加の並列処理を試してみましたが、それは明らかな効果がありませんでした:

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(number) 

をし、

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source,numPartitions=number).load() 

This is the picture from spark showing how long each operation takes

+1

あなたは負荷ですか、それとも参加してもよろしいですか?ジョインは高価です... –

+0

カウントは、ほとんどの時間がかかる、上記の私の更新を参照してください。ありがとう – peter

+1

この質問は[this](http://stackoverflow.com/a/37507116/1560062)とどう違うのですか? – eliasah

答えて

4

まず、実際に何が最大の時間を費やしているのかを把握する必要があります。

例えば、単にデータの読み取りがこれを助けるかもしれませんが、あなたのカサンドラクラスタのIOを限界いっぱいまでされていない場合にのみ、パラレル読者の並列性や数を増やす

axes = sqlContext 
    .read 
    .format("org.apache.spark.sql.cassandra") 
    .options(table="axes", keyspace=source) 
    .load() 
    .count() 

の所要時間を決定します。

第2に、Dataframes APIですべてを行うことができるかどうかを確認してください。 Pythonラムダを使用するたびに、Pythonとスカラ型の間でシリアル化コストが発生します。

編集:

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source).load().repartition(number) 

のみ負荷がさほど完了している、これはあなたを助けにはなりません後に有効かかります。

sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="axes", keyspace=source,numPartitions=number).load() 

Spark Cassandra Connectorの有効なパラメータではないため、これは何も行いません。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/reference.md#read-tuning-parameters 入力分割サイズは、Sparkパーティションに入れるC *パーティションの数を決定します。

+0

上記のいくつかの詳細を追加しました。上記の2つの方法でparralellsimを増やしてみましたが、効果はありませんでした。 dateframes APIのすべての意味を具体的に指定できますか?ありがとうございました – peter

+2

@Peter DataFrames [前の時刻](http://stackoverflow.com/a/37507116/1560062)だけを使ってアプローチへのリンクを提供しました。 – zero323

+0

@ zero323 dateframesだけを使ってみましたが、dateframeはkeybyとreducebykeyメソッドを持っていないようですので、RDDを使う必要があります。私は試してみると、このエラーメッセージが表示されます。AttributeError: 'DataFrame'オブジェクトに 'keyBy'属性がありません。どのようなアイデアをするか?ありがとう – peter

関連する問題