2017-01-05 5 views
2

私はPySparkを通じてSpark Streamingを調べていて、taketransform関数を使用しようとするとエラーが発生します。pformparkのTransformed DStreamは、pprintが呼び出されたときにエラーを返します

DStreamtransformpprintを使って正常にsortByを使用できます。私は同じパターン、次のtakeを使用してpprintにそれをしようとした場合

author_counts_sorted_dstream = author_counts_dstream.transform\ 
    (lambda foo:foo\ 
    .sortBy(lambda x:x[0].lower())\ 
    .sortBy(lambda x:x[1],ascending=False)) 
author_counts_sorted_dstream.pprint() 

しかし:

top_five = author_counts_sorted_dstream.transform\ 
    (lambda rdd:rdd.take(5)) 
top_five.pprint() 

ジョブが

Py4JJavaError: An error occurred while calling o25.awaitTermination. 
: org.apache.spark.SparkException: An exception was raised by Python: 
Traceback (most recent call last): 
    File "/usr/local/spark/python/pyspark/streaming/util.py", line 67, in call 
    return r._jrdd 
AttributeError: 'list' object has no attribute '_jrdd' 

で失敗あなたは完全なコードを見ることができますthe notebook hereに出力します。

私は間違っていますか?

答えて

2

に渡す機能は、RDDからRDDに変換する必要があります。あなたがアクションを使用する場合は、takeのように、あなたが戻っRDDに結果を変換する必要があります使用される造影RDD.sortBy

sc: SparkContext = ... 

author_counts_sorted_dstream.transform(
    lambda rdd: sc.parallelize(rdd.take(5)) 
) 

を変換(RDDを返す)であるので、さらに並列化のための必要はありません。サイドノート次の関数で

lambda foo: foo \ 
    .sortBy(lambda x:x[0].lower()) \ 
    .sortBy(lambda x:x[1], ascending=False) 

はあまり意味がありません。スパークはシャッフルで並べ替えるので、安定していないことを忘れないでください。複数のフィールドで並べ替えたい場合は、

lambda x: (x[0].lower(), -x[1]) 
のような複合キーを使用してください。
関連する問題