2016-09-19 21 views
1

私はPythonとSparkの新機能ですが、私がしようとしていることを説明できるかどうかを見てみましょう。PythonのクラスメソッドのmapPartitionsを呼び出す

私は、処理したいページの種類がたくさんあります。それらのページのすべての共通属性の基本クラスを作成し、基本クラスから継承したページ固有のクラスを作成しました。スパークランナーは、呼び出されたときだけページタイプを変更することによって、すべてのページに対して正確なことを実行できるというアイデアです。

ランナー

def CreatePage(pageType): 
    if pageType == "Foo": 
     return PageFoo(pageType) 
    elif pageType == "Bar": 
     return PageBar(pageType) 

def Main(pageType): 
    page = CreatePage(pageType) 
    pageList_rdd = sc.parallelize(page.GetPageList()) 
    return = pageList_rdd.mapPartitions(lambda pageNumber: CreatePage(pageType).ProcessPage(pageNumber)) 

print Main("Foo") 

PageBaseClass.py

class PageBase(object): 
    def __init__(self, pageType): 
     self.pageType = None 
     self.dbConnection = None 

    def GetDBConnection(self): 
     if self.dbConnection == None: 
     # Set up a db connection so we can share this amongst all nodes. 
     self.dbConnection = DataUtils.MySQL.GetDBConnection() 
     return self.dbConnection 

    def ProcessPage(): 
     raise NotImplementedError() 

PageFoo.py

class PageFoo(PageBase, pageType): 
    def __init__(self, pageType): 
     self.pageType = pageType 
     self.dbConnetion = GetDBConnection() 

    def ProcessPage(): 
     result = self.dbConnection.cursor("SELECT SOMETHING") 
     # other processing 

あり、私は簡潔から省略しています他のページの特定の機能の多くがありますが、ページクラスでそのページをどのように処理するかの論理をすべて保持したいと考えています。そして、db接続やs3バケットなどのリソースを共有できるようになります。

私はそれが今あることを知っています。それは、rddのすべての項目に対して新しいPageオブジェクトを作成しています。 1つのオブジェクトだけを作成するようにこれを行う方法はありますか?これにはより良いパターンがありますか?ありがとう!

答えて

0

いくつかの提案は:

  • 直接接続を作成しないでください。接続プールを使用する(各エグゼキュータは別々のプロセス設定プールサイズを使用するため、1つのプールサイズで十分です)、代わりに接続がタイムアウト時に自動的に閉じられるようにしてください。
  • Borg patternを使用してプールを保存し、接続を取得するためにコードを調整します。
  • ノード間またはノード内でも接続を共有することはできません(別のプロセスについてのコメントを参照)。取得できる最良の保証は、パーティションごとに1つの接続(またはインタープリターの再利用が可能なパーティション数)です。
+0

ありがとうございます。とても有難い。私はmapPartitionsを使うことを忘れていたので、その部分を更新します。私はボルグのパターンをチェックします。 – zdubu

関連する問題