2016-07-09 12 views
0

3つのマシンクラスタにHadoop 2.7.1がインストールされています。私は、MRJobとHadoop Streamingを使用して、逆インデックスのmapreduceジョブを実行しようとしています。しかし、私は、多くの場合、2つの異なる減速に行くのと同じキーを取得し、私の出力にMRJob同じキーが異なるレデューサーに送信されます

MRJob.SORT_VALUES = True 

def steps(self): 
    JOBCONF_STEP1 = { 
     "mapred.map.tasks":20, 
     "mapred.reduce.tasks":10 
    } 
    return [MRStep(jobconf=JOBCONF_STEP1, 
       mapper=self.mapper, 
       reducer=self.reducer) 
      ] 

気づいた:

はここに私の設定です。これは、このような出力になり:

Key | Output 
Z | 2 
X | 1,2 
X | 3 
Z | 1 

これは、別の減速はまた、Xキーと値3を取得している間に1つの減速は、Xキーと値1と2を取得していることを意味しかし、私はちょうど欲しいですXキーと関連するすべての値を取得する1つの減速器。

だから、所望の出力は次のようになります。

Key | Output 
X | 1,2,3 
Z | 1,2 

どのように私はこの問題を解決しますか。

はここ

%%writefile invertedIndex.py 

import json 
import mrjob 
from mrjob.job import MRJob 
from mrjob.step import MRStep 
class MRinvertedIndex(MRJob): 

    MRJob.SORT_VALUES = True 

    def steps(self): 
     JOBCONF_STEP1 = { 
      "mapred.map.tasks":20, 
      "mapred.reduce.tasks":10 
     } 
     return [MRStep(jobconf=JOBCONF_STEP1, 
        mapper=self.mapper, 
        reducer=self.reducer) 
       ] 

    def mapper(self,_,line): 
     key, stripe = line.split("\t") 
     stripe = json.loads(stripe) 
     for w in stripe: 
      yield w, key 

    def reducer(self,key,values): 
     d = [v for v in values] 
     yield key,d 

    if __name__ == '__main__': 
     MRinvertedIndex.run() enter code here 

答えて

0

はそれを考え出した私のMRJobコードです。 2このため、デフォルトであることを得た方法

'stream.num.map.output.key.fields': '1' 

私は知らない。私は明示的jobconfに設定することで問題を解決し

'stream.num.map.output.key.fields': '2' 

:問題はMRJobは、デフォルトで以下のように設定されたことでした設定、少なくとも私は私の問題を解決した

関連する問題