2016-06-22 8 views
3

最近、私はCythonをSparkと使いたいと思っていました。そのためには、the following referenceに従ってください。Spark with Cython

述べたように、私は以下のプログラムを書いたが、私は取得しています:

TypeError: 
fib_mapper_cython() takes exactly 1 argument (0 given) 

spark-tools.py

def spark_cython(module, method): 
    def wrapped(*args, **kwargs): 
     global cython_function_ 
     try: 
      return cython_function_(*args, **kwargs) 
     except: 
      import pyximport 
      pyximport.install() 
      cython_function_ = getattr(__import__(module), method) 
     return cython_function_(*args, **kwargs) 
    return wrapped() 

fib.pyx

def fib_mapper_cython(n): 
    ''' 
    Return the first fibonnaci number > n. 
    ''' 
    cdef int a = 0 
    cdef int b = 0 
    cdef int j = int(n) 
    while b<j: 
     a, b = b, a+b 
    return b, 1 

main.pyを

from spark_tools import spark_cython 
import pyximport 
import os 
from pyspark import SparkContext 
from pyspark import SparkConf 
pyximport.install() 


os.environ["SPARK_HOME"] = "/home/spark-1.6.0" 
conf = (SparkConf().setMaster('local').setAppName('Fibo')) 

sc = SparkContext() 
sc.addPyFile('file:///home/Cythonize/fib.pyx') 
sc.addPyFile('file:///home/Cythonize/spark_tools.py') 
lines = sc.textFile('file:///home/Cythonize/nums.txt') 

mapper = spark_cython('fib', 'fib_mapper_cython') 
fib_frequency = lines.map(mapper).reduceByKey(lambda a, b: a+b).collect() 
print fib_frequency 

私はプログラムを実行するたびにTypeErrorを取得します。何か案は?

+0

fib_mapper_cythonの初期値は無期限にループします。 b = 1を変更すると問題が解決するはずです – MrChristine

答えて

4

これはCythonでもPySparkでもありません。残念ながら、spark_cythonの定義で追加の関数呼び出しを追加しました。具体的には、cython_functionへの呼び出しをラップ機能は、リターンに引数なしで呼び出されます:あなたはこの呼び出しを実行するとき

return wrapped() # call made, no args supplied. 

は、結果として、あなたはラップ機能を返しません。何をするかは、wrapped*argsまたは**kwargsとしないでください。 wrappedは、引数なしのfib_mapper_cythonを呼び出します(*args, **kwargsは提供されないため)TypeErrorを呼び出します。

代わりにすべき:

return wrapped 

と、この問題はもはや存在しなければなりません。

+1

助けてくれてありがとう – StarLord