2017-01-19 35 views
0

私はPython上でSparkスクリプトを使っています(Pysparkを使っています)。私はPysparkでPostgresにタイムスタンプを書く

timestamp=datetime.strptime(processed_data[1], DATI_REGEX) 

processed_data [1]有効な日時の文字列であるなど、いくつかのフィールドを持つワットRoを返す関数を、持っています。

編集は完全なコードを表示するために:私はやって、PostgreSQLのDBにそれを書くしようとすると、

DATI_REGEX = "%Y-%m-%dT%H:%M:%S" 

class UserActivity(object): 
    def __init__(self, user, rows): 
     self.user = int(user) 
     self.rows = sorted(rows, key=operator.attrgetter('timestamp')) 

    def write(self): 
     return Row(
      user=self.user, 
      timestamp=self.rows[-1].timestamp, 
     ) 

def parse_log_line(logline): 
    try: 
     entries = logline.split('\\t') 
     processed_data = entries[0].split('\t') + entries[1:] 

     return Row(
      ip_address=processed_data[9], 
      user=int(processed_data[10]), 
      timestamp=datetime.strptime(processed_data[1], DATI_REGEX), 
     ) 
    except (IndexError, ValueError): 
      return None 


logFile = sc.textFile(...) 
rows = (log_file.map(parse_log_line).filter(None) 
     .filter(lambda x: current_day <= x.timestamp < next_day)) 
user_rows = rows.map(lambda x: (x.user, x)).groupByKey() 
user_dailies = user_rows.map(lambda x: UserActivity(current_day, x[0], x[1]).write()) 

問題が来る次

fields = [ 
    StructField("user_id", IntegerType(), False), 
    StructField("timestamp", TimestampType(), False), 
] 
schema = StructType(fields) 
user_dailies_schema = SQLContext(sc).createDataFrame(user_dailies, schema) 
user_dailies_schema.write.jdbc(
    "jdbc:postgresql:.......", 
    "tablename") 

私は次のエラーを取得します:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 172, in main 
    process() 
    File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 167, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 263, in dump_stream 
    vs = list(itertools.islice(iterator, batch)) 
    File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 576, in toInternal 
    File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 576, in <genexpr> 
    File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 436, in toInternal 
    return self.dataType.toInternal(obj) 
    File "/Users/pau/Downloads/spark-2.0.2-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/types.py", line 190, in toInternal 
    seconds = (calendar.timegm(dt.utctimetuple()) if dt.tzinfo 
AttributeError: 'int' object has no attribute 'tzinfo' 

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    ... 1 more 

これを解決する方法はありますか。

答えて

1

問題は比較的簡単です。 PySpark はtupleで、フィールド名順に並べられています。あるとして注文された一方から

Row(timestamp, user) 

StructType

Row(user=self.user, timestamp=self.rows[-1].timestamp) 

出力構造のように命じている:それはあなたが作成したときにことを意味します。その結果、コードはユーザーIDをタイムスタンプとして使用しようとしています。あなたは、プレーンtupleを返す必要があり、次のいずれか

class UserActivity(object): 
    ... 
    def write(self): 
     return (self.user, timestamp) 

や辞書的に注文したスキーマを使用します。

schema = StructType(sorted(fields, key=operator.attrgetter("name"))) 

最後にあなたは属性アクセスと事前に定義された順序の両方を達成するためにnamedtupleを使用することができます。

サイドノートでは、このようなgroupByKeyを使用しないでください。

from functools import partial 

(log_file.map(parse_log_line) 
    .map(lambda x: (x.user, x)) 
    .reduceByKey(partial(max, key=operator.itemgetter("timestamp"))) 
    .values()) 

またはDataFrame集計:また、あなたは、あなたが使用する必要がありますSQLContextを取得する場合

from pyspark.sql import functions as f 

(sqlContext 
    .createDataFrame(
     log_file.map(parse_log_line) 
      # Another way to handle ordering is to choose fields 
      # before you call createDataFrame 
      .map(operator.attrgetter("user", "timestamp")), 
     schema) 
    .groupBy("user_id") 
    .agg(f.max("timestamp").alias("timestamp"))) 

複数のフィールドで

(log_file.map(parse_log_line) 
    .map(operator.attrgetter("user", "timestamp")) 
    .reduceByKey(max)) 

:1がreduceByKeyを使用するとき、それは典型的なケースです工場の方法:

SQLContext.getOrCreate(sc) 

新しいコンテキストを作成すると、予期しない副作用が発生する可能性があります。

関連する問題