私は単純なMapReduceフローを書き、Google Cloud StorageのファイルからCSVの行を読み込み、その後エンティティを作成しました。しかし、私はそれを複数のシャードで実行することはできません。AppEngineマップの縮尺を縮小するにはどうすればよいですか?
コードはmapreduce.control.start_mapを使用しており、次のようになります。
class LoadEntitiesPipeline(webapp2.RequestHandler):
id = control.start_map(map_name,
handler_spec="backend.line_processor",
reader_spec="mapreduce.input_readers.FileInputReader",
queue_name=get_queue_name("q-1"),
shard_count=shard_count,
mapper_parameters={
'shard_count': shard_count,
'batch_size': 50,
'processing_rate': 1000000,
'files': [gsfile],
'format': 'lines'})
実際にどのメソッドが実際に必要なのかわからないため、私は両方の場所でshard_countを持っています。 shard_countを8から32のいずれかに設定すると、ステータスページで常に1/1シャードの実行が示されるため、何も変更されません。分けるために、私はすべてのインスタンスを多数持つバックエンドのキューですべてを実行させました。キューパラメータper this wikiを調整しようとしました。結局、それはちょうど連続的に実行されているようです。
アイデア?ありがとう!
アップデート(まだ成功):それはまだ、この新しいコードで
class ImportHandler(webapp2.RequestHandler):
def get(self, gsfile):
pipeline = LoadEntitiesPipeline2(gsfile)
pipeline.start(queue_name=get_queue_name("q-1"))
self.redirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id)
class LoadEntitiesPipeline2(base_handler.PipelineBase):
def run(self, gsfile):
yield mapreduce_pipeline.MapperPipeline(
'loadentities2_' + gsfile,
'backend.line_processor',
'mapreduce.input_readers.FileInputReader',
params={'files': [gsfile], 'format': 'lines'},
shards=32
)
:、私はそうのようなパイプラインに直接呼び出しを使用して電話を作ってみました物事を分離しようとする際に
1つのシャード上でのみ実行されます。 mapreduce.input_readers.FileInputReaderが行ごとに並列化できるかどうか疑問に思っています。それはの迅速な読み取りに基づいてシャーディングすることが可能であるべきでFileInputReaderのように私には見えます
ええ、私の改行コードでは何も変わっていませんし、各行を細かく処理することができます。実際には、本当に大きなファイルを小さなファイルに分割すると(それぞれ5000行)。私はshardへのmapreduceコールを得ることができますが、細かい粒度ではなくファイル名でシャーディングされているように見えます。 –