2016-04-15 3 views
0

私はテキストの一部から必要な名前を抽出することができるpysparkコードを持っています。このコードは私に結果を与えますが、私の大きなデータを処理するのにかなり時間がかかります。効率を改善するために、よりpysparkの道に変換するPythonのコードをSpark互換のコード(pyspark)に変換するには?

articles=sc.textFile("file:///home//XXX//articles.csv").map(lambda line: line.split(",")) 
articles_ls=list(articles.map(lambda x: [x[0].lower(),x[1].lower(),x[2].lower(),x[3].lower().strip()]).collect()) 

       #Function which needs to be optimized to run faster 
def mapper(f): 
    article_list=[]  
    list1=[] 
    list2=[] 
    list3=[] 
    list4=[] 
    list5=[] 
    list6=[] 
    list7=[] 
    for i in range(len(articles_ls)): 
     for j in range(len(articles_ls[i])-1): 
      comment=re.split(r'\W+', f.lower().strip()) 
      if articles_ls[i][j] in comment: 
       if articles_ls[i][j]: 

        if articles_ls[i][3] == 'typea': 
         if articles_ls[i][j] not in list1: 
          list1.append(articles_ls[i][0]) 

        if articles_ls[i][3] == 'typeb': 
         if articles_ls[i][j] not in list2: 
          list2.append(articles_ls[i][0]) 

        if articles_ls[i][3] == 'typec': 
         if articles_ls[i][j] not in list3: 
          list3.append(articles_ls[i][0]) 

        if articles_ls[i][3] == 'typed': 
         if articles_ls[i][j] not in list4: 
          list4.append(articles_ls[i][0]) 

        if articles_ls[i][3] == 'typee': 
         if articles_ls[i][j] not in list5: 
          list5.append(articles_ls[i][0]) 

        if articles_ls[i][3] == 'typef': 
         if articles_ls[i][j] not in list6: 
          list6.append(articles_ls[i][0]) 

        if articles_ls[i][3] == 'typeg': 
         if articles_ls[i][j] not in list7: 
          list7.append(articles_ls[i][0]) 

    list1 = list(set(list1)) 
    list2 = list(set(list2)) 
    list3 = list(set(list3)) 
    list4 = list(set(list4)) 
    list5 = list(set(list5)) 
    list6 = list(set(list6)) 
    list7 = list(set(list7)) 

    article_list.append([("ProductA:".split())+list1]+[("ProductB:".split())+list2]+[("ProductC:".split())+list3]+\ 
         [("ProductD:".split())+list4]+[("ProductE:".split())+list5]+[("ProductF:".split())+list6]+\ 
         [("ProductG:".split())+list7]) 
    return article_list 

lines = sc.textFile("file:///home//XXX//data.csv").map(lambda line: line.split(",")).map(lambda x: (x[0],x[1],x[2].encode("ascii", "ignore"))) 
articles_all = (lines.map(lambda x: (x[0],x[1],x[2],(mapper(x[2].lower()))))) 

答えて

0

私はそれをリストに追加するのではなく、スパークDATAFRAMEに私のデータセットをロードしたときに非常に高速に実行するために火花を発見した(新しい環境を刺激します)。そして、forループよりも機能的なスタイルを好むでしょう。

関連する問題