2017-06-18 7 views
-2

pythonに貼り付けられていた既存のコードをpysparkに変換しました。Pysparkコードは、純粋なpythonの代替と比較しても十分にパフォーマンスがありません

Pythonコード:

import json 
import csv 


def main(): 
    # create a simple JSON array 
    with open('paytm_tweets_data_1495614657.json') as str: 

     tweetsList = [] 
     # change the JSON string into a JSON object 
     jsonObject = json.load(str) 

     #print(jsonObject) 

     # # print the keys and values 
     for i in range(len(jsonObject)): 
      tweetsList.insert(i,jsonObject[i]["text"]) 

     #print(tweetsList) 
    displaySentiment(tweetsList) 



def displaySentiment(tweetsList): 
    aDict = {} 

    from sentiment import sentiment_score 

    for i in range(len(tweetsList)): 
     aDict[tweetsList[i]] = sentiment_score(tweetsList[i]) 
    print (aDict) 


    with open('PaytmtweetSentiment.csv', 'w') as csv_file: 
     writer = csv.DictWriter(csv_file, fieldnames = ["Tweets", "Sentiment Value"]) 
     writer.writeheader() 
     writer = csv.writer(csv_file) 
     for key, value in aDict.items(): 
      writer.writerow([key, value]) 


if __name__ == '__main__': 
    main() 

変換さPysparkコード:

import json 
import csv 
import os 
from pyspark import SparkContext, SparkConf 
from pyspark.python.pyspark.shell import spark 

os.environ['PYSPARK_PYTHON'] = "/usr/local/bin/python3" 


def main(): 
    path = "/Users/i322865/DeepInsights/bitbucket-code/ai-engine/twitter-sentiment-analysis/flipkart_tweets_data_1495601666.json" 
    peopleDF = spark.read.json(path).rdd 
    df = peopleDF.map(lambda row: row['text']) 
    displaySentiment(df.collect()) 



def displaySentiment(tweetsList): 
    from sentiment import sentiment_score 

    aDict = sentiment_score(tweetsList) 

    # 
    with open('paytmtweetSentiment.csv', 'w') as csv_file: 
     writer = csv.DictWriter(csv_file, fieldnames = ["Tweets", "Sentiment Value"]) 
     writer.writeheader() 
     writer = csv.writer(csv_file) 
     for i in range(len(tweetsList)): 
      writer.writerow([tweetsList[i], aDict[i]]) 
      print([tweetsList[i], aDict[i]]) 


if __name__ == '__main__': 
    conf = SparkConf().setAppName("Test").setMaster("local") 
    sc = SparkContext.getOrCreate(conf=conf) 
    main() 

私は両方のプログラムを実行しましたが、大幅な性能向上を見ていません。私は何が欠けていますか?あなたはいくつかの考えを出すことができますか?

また、「reduce」も使用する必要がありますか?私は現在、「マップ」のみを使用しています。

+0

このタイプの質問は、サイトには適していませんが、実際には、これはまだ悪いコードですが、正直言って、もちろん犯罪ではありません! Pysparkはプログラミング言語ではありません。一方、Pythonがあります。 – eliasah

+0

@eliasah申し訳ありませんが、質問を修正しました。迅速なフィードバックをありがとう。 – coders

+1

'df.collect()'を2回呼び出すのは当然のことながらパフォーマンスが低下します。すべてのレンダリングでそれを呼び出すことは、ほとんど役に立たないです。 –

答えて

2

あなたはPySparkにおけるプロセス何かを並列したい場合は、ないcollect()バック

def calc_sentiment(tweetsDf): # You should pass a dataframe 
    from sentiment import sentiment_score 

    # Add a new column over the Tweets for the sentiment 
    return tweetsDf.withColumn('sentiment_score', sentiment_score(tweetsDf.text)) 

明らかにPythonのリストに行い、sentiment_scoreニーズがPySparkをColumn

次に受け入れ、返すの両方に同様に変更しましたあなたはこのようなものを持っています

def main(): 
    path = "..../twitter-sentiment-analysis/flipkart_tweets_data_1495601666.json" 
    twitterDf = spark.read.json(path) 

    # Don't call collect, only sample the Dataframe 
    sentimentDf = calc_sentiment(twitterDf) 
    sentimentDf.show(5) 

    # TODO: Write sentimentDf to a CSV 
    sentimentDf.write.csv(....) 
+0

提案に感謝します。 .jsonファイルの代わりに.jsonファイルの代わりに追加の質問をすれば、ストリームデータを渡すと、上記のコードで変更が必要になるでしょう – coders

+0

ストリームをDataframeオブジェクトに入れることができれば、そうではありません気がついた –

2

他の人が指摘しているように、y Sparkがあなたの現在のユースケースを意図していなかったため、私たちのPySpark実装は単純に遅くなるかもしれません。

スパークは、基本的には、ローカル並列化ではなく、非常に大規模な分散データセット(複数のマシン)の処理を高速化することを目指しています。これを達成するために、オーバーヘッド構造とプロセスを使用します。

単一/小規模のデータセットの場合、このオーバーヘッドが容易に支配的になり、ソリューションが遅くなる可能性があります。 This articleでは、Hadoopの使用方法について説明しています.Hadoopは非常に似ています。代わりにmultiprocessingを試したことがありますか?

Sparkがあなたに合っていると確信している場合は、Spark設定の詳細、パフォーマンスの測定方法、データセットに関する新しい質問を投稿すると役に立ちます。

0

私はあなたがどんなスピードアップも見ていないのは完全に理にかなっていると思います。最初にRDDを作成して(データを配布する)、それを収集して分析機能である2番目の機能を実行します。実際には、displaySentiment()関数を適用するために続くすべてのデータをドライバマシンに収集することで、最初の関数が行ったことを破壊します。あなたがしているのは、ただ1台のマシンであるドライバマシンでプログラムを実行することです。したがって、スピードアップはありません。

関連する問題