私は単純なMapReduceの例をPythonで書いています。入力がファイルの場合、たとえばtext
ファイルの場合、コードを実行するために単純に次のパターンを使用します。cat <data> | map | sort | reduce
たとえば、私の場合はcat data | ./mapper.py | sort | ./reducer.py
であり、すべて正常に機能します。Python - MapReduce入力としてディレクトリを渡す方法
しかし、ファイルを含むdirectory
のデータを読み込むようにマッパーとレデューサーを変更しました。だから私は入力としてpath of the directory
を渡す必要があります。データを含むディレクトリがdat/
ている間、私は、次のターミナルコマンドcat dat/ | ./mapper.py | sort | ./reducer.py
をテストするが、私はエラーに直面していました:
cat: dat/: Is a directory
Traceback (most recent call last):
File "./mapper.py", line 9, in <module>
for filename in glob.glob(sys.stdin + '*.gz'):
TypeError: unsupported operand type(s) for +: 'file' and 'str'
は、どのように私はPythonでMapReduceの入力としてディレクトリを渡すことができますか?
次は私のコードです:
mapper.py
#!/usr/bin/env python
import sys
#import timeit
import glob
import gzip
QUALITY = '01459'
MISSING = '+9999'
for filename in glob.glob(sys.stdin + '*.gz'):
f = gzip.open(filename, 'r')
for line in f:
val = line.strip()
(year, temp, q) = (val[15:19], val[87:92], val[92:93])
if temp != MISSING and q in QUALITY:
print " %s\t%s" % (year, temp)
reducer.py
#!/usr/bin/env python
import sys
max_val = -sys.maxint
key = ''
for line in sys.stdin:
(key, val) = line.strip().split('\t')
max_val = max(max_val, int(val))
print "The last IF %s\t%s" % (key, max_val)
'zcat data/*。gz | ./mapper.py |並べ替え| ./reducer.py ' – philantrovert
@philantrovertありがとうございます。私のマッパーに注意してください。入力は' .gz'ファイルを含むディレクトリアドレスで、前に行ったように 'forループ'を使ってすべて読むことができますMapReduceモデルで。しかし、あなたの提案はディレクトリ内のすべての '.gz'ファイルの正確なアドレスを渡すと思います。私は正しい? – soheil
'zcat'(gzip + cat)は.gzファイルを抽出し、その内容をマッパーに渡します。たぶん、これはあなたのマッパーを変更せずに.gzファイルのために働いていたでしょう。 – Chickenmarkus