2017-07-25 7 views
0

私はpandas.tslib.Timestamp型のタイムスタンプ列を持つpandasデータフレームを持っています。私は「createDataFrame」(link to source)からpysparkのソースコードを見て、彼らがリストにnumpyのレコード・アレイにデータを変換するようだ:Pandasのデータフレーム変換時に、どのようにSparkがタイムスタンプタイプを処理しますか?

しかし
data = [r.tolist() for r in data.to_records(index=False)] 

、タイムスタンプの種類がリストにこのプロセスに変換しますlong型:今

> df = pd.DataFrame(pd.date_range(start=datetime.datetime.now(),periods=5,freq='s')) 
> df 
0 2017-07-25 11:53:29.353923 
1 2017-07-25 11:53:30.353923 
2 2017-07-25 11:53:31.353923 
3 2017-07-25 11:53:32.353923 
4 2017-07-25 11:53:33.353923 
> df.to_records(index=False).tolist() 
[(1500983799614193000L,), (1500983800614193000L,), (1500983801614193000L,), (1500983802614193000L,), (1500983803614193000L,)] 

私はいくつかの操作を行います(タイムスタンプ列に触れていない)、その後呼び出し、RDDにそのようなリストを渡した場合

> spark.createDataFrame(rdd,schema) // with schema mentioning that column as TimestampType 
TypeError: TimestampType can not accept object 1465197332112000000L in type <type 'long'> 
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193) 
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234) 
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

W帽子は、(リストをRDDに変換する前に)日時型を保持する必要があります。

編集1

私はポストデータフレーム作成処理を伴うであろうことを認識してよ、いくつかの方法があります:

  1. がパンダにdatetimeオブジェクトにタイムゾーン情報を追加します。しかし、これは不要で、あなたのタイムゾーンによってはエラーにつながる可能性があります。

  2. datetimeライブラリを使用して、longをタイムスタンプに変換します。 tstamplを仮定

が入力された:TSTAMP =日時(1970、1、1)+ はtimedelta(マイクロ= tstampl/1000)

  • Pandasデータフレーム側でdatetimeを文字列に変換し、sparkデータフレーム側でdatetimeにキャストします。
  • として私は、データフレームの作成自体の前にすべての処理の世話をするだろう簡単な方法を探していますが

    以下のSureshの答えで説明しました。

    答えて

    0

    タイムスタンプ列を文字列型に変換し、pandasシリーズのtolist()を適用してみました。 spark dataframeのリストを使用し、そこでタイムスタンプに変換し直してください。

    >>> df = pd.DataFrame(pd.date_range(start=datetime.datetime.now(),periods=5,freq='s')) 
    >>> df 
            0 
    0 2017-07-25 21:51:53.963 
    1 2017-07-25 21:51:54.963 
    2 2017-07-25 21:51:55.963 
    3 2017-07-25 21:51:56.963 
    4 2017-07-25 21:51:57.963 
    
    >>> df1 = df[0].apply(lambda x: x.strftime('%Y-%m-%d %H:%M:%S')) 
    >>> type(df1) 
    <class 'pandas.core.series.Series'> 
    >>> df1.tolist() 
    ['2017-07-25 21:51:53', '2017-07-25 21:51:54', '2017-07-25 21:51:55', '2017-07-25 21:51:56', '2017-07-25 21:51:57'] 
    
    from pyspark.sql.types import StringType,TimestampType 
    >>> sdf = spark.createDataFrame(df1.tolist(),StringType()) 
    >>> sdf.printSchema() 
    root 
        |-- value: string (nullable = true) 
    >>> sdf = sdf.select(sdf['value'].cast('timestamp')) 
    >>> sdf.printSchema() 
    root 
        |-- value: timestamp (nullable = true) 
    
    >>> sdf.show(5,False) 
    +---------------------+ 
    |value    | 
    +---------------------+ 
    |2017-07-25 21:51:53.0| 
    |2017-07-25 21:51:54.0| 
    |2017-07-25 21:51:55.0| 
    |2017-07-25 21:51:56.0| 
    |2017-07-25 21:51:57.0| 
    +---------------------+ 
    
    +0

    私はこの方法と、タイムスタンプ再変換(現在私が使用しているもの)の長さに関するもう1つの方法を知っています。問題は、これらのすべてのメソッドが何らかのポストデータフレーム変換処理を必要とすることです。それは私が避けたいものです。 – tangy

    +0

    あなたは/ 1000000000を使用してスパークし、タイムスタンプにキャストするのにタイムスタンプの変換を長くしていますか? – Suresh

    +0

    tstamp = datetime(1970、1、1)+ timedelta(マイクロ秒= tstampl/1000) – tangy

    関連する問題