私はPythonとSparkの新機能ですが、私がしようとしていることを説明できるかどうかを見てみましょう。PythonのクラスメソッドのmapPartitionsを呼び出す
私は、処理したいページの種類がたくさんあります。それらのページのすべての共通属性の基本クラスを作成し、基本クラスから継承したページ固有のクラスを作成しました。スパークランナーは、呼び出されたときだけページタイプを変更することによって、すべてのページに対して正確なことを実行できるというアイデアです。
ランナー
def CreatePage(pageType):
if pageType == "Foo":
return PageFoo(pageType)
elif pageType == "Bar":
return PageBar(pageType)
def Main(pageType):
page = CreatePage(pageType)
pageList_rdd = sc.parallelize(page.GetPageList())
return = pageList_rdd.mapPartitions(lambda pageNumber: CreatePage(pageType).ProcessPage(pageNumber))
print Main("Foo")
PageBaseClass.py
class PageBase(object):
def __init__(self, pageType):
self.pageType = None
self.dbConnection = None
def GetDBConnection(self):
if self.dbConnection == None:
# Set up a db connection so we can share this amongst all nodes.
self.dbConnection = DataUtils.MySQL.GetDBConnection()
return self.dbConnection
def ProcessPage():
raise NotImplementedError()
PageFoo.py
class PageFoo(PageBase, pageType):
def __init__(self, pageType):
self.pageType = pageType
self.dbConnetion = GetDBConnection()
def ProcessPage():
result = self.dbConnection.cursor("SELECT SOMETHING")
# other processing
あり、私は簡潔から省略しています他のページの特定の機能の多くがありますが、ページクラスでそのページをどのように処理するかの論理をすべて保持したいと考えています。そして、db接続やs3バケットなどのリソースを共有できるようになります。
私はそれが今あることを知っています。それは、rddのすべての項目に対して新しいPageオブジェクトを作成しています。 1つのオブジェクトだけを作成するようにこれを行う方法はありますか?これにはより良いパターンがありますか?ありがとう!
ありがとうございます。とても有難い。私はmapPartitionsを使うことを忘れていたので、その部分を更新します。私はボルグのパターンをチェックします。 – zdubu