2017-07-21 7 views
1

LogisticRegressionWithLBFGSをx回呼び出す繰り返しがあります。PySpark:反復でLogisticRegressionWithLBFGSが遅くなっています

問題は、繰り返しがループごとに遅くなり、最終的には永遠にハングすることです。

私はさまざまなアプローチを試みましたが、これまでのところ運がありません。

コードは以下のようになります。

def getBootsrapedAttribution(iNumberOfSamples, df): 

    def parsePoint(line): 
     return LabeledPoint(line[2], line[3:]) 

    aResults = {} 
    while x <= iNumberOfSamples: 
     print ("## Sample: " + str(x)) 
     a = datetime.datetime.now() 
     dfSample = sampleData(df) 
     dfSample.repartition(700) 
     parsedData = dfSample.rdd.map(parsePoint) 
     parsedData = parsedData.repartition(700) 
     parsedData.persist() 
     model = LogisticRegressionWithLBFGS.train(parsedData) 
     parsedData.unpersist() 
     b = datetime.datetime.now() 
     print(b-a) 
     x+=1 

def sampleData(df): 
    df = df.repartition(500) 
    dfFalse = df.filter('col == 0').sample(False, 0.00035) 
    dfTrue = df.filter('col == 1') 
    dfSample = dfTrue.unionAll(dfFalse) 
    return dfSample 


getBootsrapedAttribution(50, df) 

、出力は次のようになります。model = LogisticRegressionWithLBFGS.train(parsedData)なければ

## Sample: 1 
0:00:44.393886 

## Sample: 2 
0:00:28.403687 

## Sample: 3 
0:00:30.884087 

## Sample: 4 
0:00:33.523481 

## Sample: 5 
0:00:36.107836 

## Sample: 6 
0:00:37.077169 

## Sample: 7 
0:00:41.160941 

## Sample: 8 
0:00:54.768870 

## Sample: 9 
0:01:01.31139 

## Sample: 10 
0:00:59.326750 

## Sample: 11 
0:01:37.222967 

## Sample: 12 

...hangs forever 

それはパフォーマンスの問題もなく動作します。

私クラスタは次のようになります。

spark.default.parallelism 500 
spark.driver.maxResultSize 20G 
spark.driver.memory 200G 
spark.executor.cores 32 
spark.executor.instances 2 
spark.executor.memory 124G 

誰もがこの問題を知っていますか?

答えて

1

私は自分自身の質問に答えています。

問題は方法LogisticRegressionWithLBFGSの範囲内です。この方法をLogisticRegressionSpark 2.1+に置き換えると問題が解決しました。反復ごとに減速することはもうありません。

さらに、上記のコードを使用するとさらに改善が加えられます。 rddの方法sampleは、DataFrameの方法sampleByに置き換えることができます。また、これは、不必要なunionを避けることができます:

.sampleBy('col', fractions={0: 0.00035, 1: 1}, seed=1234)

はさらに、上記のすべてのコードでrepartitionsは不要です。重要なことは、getBootsrapedAttribution()に渡されたdfが良好に分割されており、cachedです。

関連する問題