1

私は約20kの行を含むDataFrameを持っています。スパーク行を削除する

データセット内のランダムに186行を削除します。

コンテキストを理解する - 欠落しているデータの分類モデルをテストしており、各行にはUNIXのタイムスタンプがあります。 186行は3秒に相当します(1秒間に62行のデータがあります)。

これは、データがストリーミングされているときに、データが 秒間失われる可能性があります。タイムウィンドウからフィーチャーを抽出しているので、データの欠落がモデルのパフォーマンスにどのように影響するのかを見たいと思っています。

これには、rddに変換してfilter関数を使用し、ロジックをフィルタ関数内に配置するのが最適な方法だと思います。

dataFrame.rdd.zipWithIndex().filter(lambda x:)

しかし、私はロジックで立ち往生しています - どのように私はこれを実装していますか? (PySparkを使用して)

答えて

3

は次のように行うようにしてください:

import random 
startVal = random.randint(0,dataFrame.count() - 62) 
dataFrame.rdd.zipWithIndex()\ 
      .filter(lambda x: not x[<<index>>] in range(startVal, startVal+62)) 

これは動作するはずです!

+0

優れた、非常にエレガント! – gbhrea

+0

ちょっと@チアゴ、ちょっと別のフォローアップの質問があります。私は列でフィルタリングしたい、例えば、私の列の1つが10秒分の行を削除するためのタイムスタンプです。私は10秒を削除するために '.filter(タイムスタンプ"(startVal、startVal + 10)) 'を追加しようとしましたが、動作しません。 – gbhrea

+0

あなたの列はIntですか?または、dataFrameのタイムスタンプタイプですか? –