最近、私はCython
をSparkと使いたいと思っていました。そのためには、the following referenceに従ってください。Spark with Cython
述べたように、私は以下のプログラムを書いたが、私は取得しています:
TypeError:
fib_mapper_cython() takes exactly 1 argument (0 given)
spark-tools.py
def spark_cython(module, method):
def wrapped(*args, **kwargs):
global cython_function_
try:
return cython_function_(*args, **kwargs)
except:
import pyximport
pyximport.install()
cython_function_ = getattr(__import__(module), method)
return cython_function_(*args, **kwargs)
return wrapped()
fib.pyx
def fib_mapper_cython(n):
'''
Return the first fibonnaci number > n.
'''
cdef int a = 0
cdef int b = 0
cdef int j = int(n)
while b<j:
a, b = b, a+b
return b, 1
main.pyを
from spark_tools import spark_cython
import pyximport
import os
from pyspark import SparkContext
from pyspark import SparkConf
pyximport.install()
os.environ["SPARK_HOME"] = "/home/spark-1.6.0"
conf = (SparkConf().setMaster('local').setAppName('Fibo'))
sc = SparkContext()
sc.addPyFile('file:///home/Cythonize/fib.pyx')
sc.addPyFile('file:///home/Cythonize/spark_tools.py')
lines = sc.textFile('file:///home/Cythonize/nums.txt')
mapper = spark_cython('fib', 'fib_mapper_cython')
fib_frequency = lines.map(mapper).reduceByKey(lambda a, b: a+b).collect()
print fib_frequency
私はプログラムを実行するたびにTypeError
を取得します。何か案は?
fib_mapper_cythonの初期値は無期限にループします。 b = 1を変更すると問題が解決するはずです – MrChristine