2017-12-01 18 views
0

私はpysparkを使ってデータを扱います。データは次のとおりです。PySpark: 'ResultIterable'オブジェクトに 'request_tm'という属性がありません

8611060350280948828b33be803 4363 2017-10-01 
8611060350280948828b33be803 4363 2017-10-02 
4e5556e536714363b195eb8f88becbf8 365 2017-10-01 
4e5556e536714363b195eb8f88becbf8 365 2017-10-02 
4e5556e536714363b195eb8f88becbf8 365 2017-10-03 
4e5556e536714363b195eb8f88becbf8 365 2017-10-04 

これらのデータを格納するクラスを作成しました。コードは次のとおりです。

class LogInfo: 
    def __init__(self, session_id, sku_id, request_tm): 
     self.session_id = session_id 
     self.sku_id = sku_id 
     self.request_tm = request_tm 

を取引コードは以下のとおりです。

from classFile import LogInfo 
from pyspark import SparkContext, SparkConf 

conf = SparkConf().setMaster("local[*]") 
sc = SparkContext(conf=conf) 
orgData = sc.textFile(<dataPath>) 
readyData = orgData.map(lambda x: x.split('\t')).\ 
    filter(lambda x: x[0].strip() != "" and x[1].strip() != "" and x[2].strip() != "").\ 
    map(lambda x: LogInfo(x[0], x[1], x[2])).groupBy(lambda x: x.session_id).\ 
    filter(lambda x: len(x[1]) > 3).filter(lambda x: len(x[1]) < 20).\ 
    map(lambda x: x[1]).sortBy(lambda x:x.request_tm).map(lambda x: x.sku_id) 

しかし、コードは動作しませんでした。間違い情報は以下の通りです:

 org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
     File "D:\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin- 
hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 177, in main 
     File "D:\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin- 
hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 172, in process 
     File "D:\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin- 
hadoop2.7\python\pyspark\rdd.py", line 2423, in pipeline_func 
    return func(split, prev_func(split, iterator)) 
     File "D:\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\python\pyspark\rdd.py", line 2423, in pipeline_func 
    return func(split, prev_func(split, iterator)) 
     File "D:\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\python\pyspark\rdd.py", line 2423, in pipeline_func 
     return func(split, prev_func(split, iterator)) 
     File "D:\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\python\pyspark\rdd.py", line 346, in func 
     return f(iterator) 
     File "D:\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\python\pyspark\rdd.py", line 1041, in <lambda> 
     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
     File "D:\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\python\pyspark\rdd.py", line 1041, in <genexpr> 
     return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
     File "D:\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\python\pyspark\rdd.py", line 2053, in <lambda> 
     return self.map(lambda x: (f(x), x)) 
     File 
"D:<filePath>", line 15, in <lambda> 
    map(lambda x: x[1]).sortBy(lambda x:x.request_tm).map(lambda x: x.sku_id) 
AttributeError: 'ResultIterable' object has no attribute 'request_tm' 
at 
org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 
at org.apache.spark.api.python.PythonRunner$$anon$1.<init> 
(PythonRDD.scala:234) 
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
at org.apache.spark.scheduler.Task.run(Task.scala:108) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
at java.lang.Thread.run(Unknown Source) 
[Stage 1:>               (0 + 5)/
10]17/12/01 17:54:15 WARN TaskSetManager: Lost task 3.0 in stage 1.0 (TID 13, localhost, executor driver): org.apache.spark.api.python.PythonException: 
Traceback (most recent call last): 
    File "D:\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 177, in main 
    File "D:\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 172, in process 
    File "D:\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\python\pyspark\rdd.py", line 2423, in pipeline_func 
    return func(split, prev_func(split, iterator)) 
    File "D:\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\python\pyspark\rdd.py", line 2423, in pipeline_func 
    return func(split, prev_func(split, iterator)) 
    File "D:\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\python\pyspark\rdd.py", line 2423, in pipeline_func 
    return func(split, prev_func(split, iterator)) 
    File "D:\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\python\pyspark\rdd.py", line 346, in func 
    return f(iterator) 
    File "D:\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\python\pyspark\rdd.py", line 1041, in <lambda> 
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    File "D:\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\python\pyspark\rdd.py", line 1041, in <genexpr> 
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    File "D:\spark-2.2.0-bin-hadoop2.7\spark-2.2.0-bin-hadoop2.7\python\pyspark\rdd.py", line 2053, in <lambda> 
    return self.map(lambda x: (f(x), x)) 
    File 
"D:<filePath>", line 15, in <lambda> 
    map(lambda x: x[1]).sortBy(lambda x:x.request_tm).map(lambda x: x.sku_id) 
AttributeError: 'ResultIterable' object has no attribute 'request_tm' 

........

私はメインの間違い情報は、上記のようにだと思います。私はどこで間違えたのか分からなかった。誰も助けてくれますか?どうもありがとうございました!これで

map(lambda x: x[1]) 

答えて

0

私はあなたがこれを交換する必要があると思う

flatMap(lambda x: list(x[1])) 

基本的には、GROUPBY後、xは[1] "反復処理可能の結果" オブジェクトであるようならばあなたはそれの各要素をソートしたい、まずそれを平坦化する必要があります。

編集: あなたはRDD内部sku_idのリストが必要な場合:

.map(lambda x: [y.sku_id for y in sorted(list(x[1]), key=lambda x: x.request_tm)]) 
+0

それは作業を行います。しかし実際には、私はモデルを構築するために反復可能な結果を​​得たいと思っています。だから私は最終的に 'flatMap(lambda x:list(x [1]))。sortBy(lambda x:x.request_tm).map(lambda x:x.sku_id)'というエラーを表示します。モデル: 'java.lang.ClassCastException:java.lang.Stringはjava.lang.Iterableにキャストできません'。私は上記の文全体を 'map(lambda x:sorted(list [x [1])、key = lambda x:x.request_tm).map(lambda x:x.sku_id)'と置き換えてみましたが、エラー情報: 'AttributeError:' list 'オブジェクトには属性' map 'がありません。この問題を解決するにはどうすればよいですか?どうもありがとうございました! –

+0

アイテムの繰り返しが必要ですか? sku_id?このモデルはスパークの一部ですか?あなたはこのモデルの使用例を挙げることができますか? – user3689574

+0

私は 'from pyspark.mllib.feature import Word2Vec'という文章でモデルを構築したいと思います。 'word2vec = Word2Vec(); model = word2vec.fit(data = readyData) '。私は[['' 123 '、' 456 '、' 789 ']、[' 321 '、' 654 '、' 987 ']、[' 124 '、' 345 ']などの各session_idのsku_idの繰り返しが必要です]。各サブリストは、同じsession_idで発生し、request_tmでソートされたsku_idの繰り返し可能なものです。 –

関連する問題