2016-06-02 11 views
0

約1億レコード(〜25GB、〜5カラム)の単一テーブルを持つMySQLデータベースがあります。 Apache Sparkを使用して、このデータをJDBCコネクタ経由で取り出し、DataFrameに格納します。 ここからは、データの前処理(NULL値の置き換えなど)を行っているため、絶対に各レコードを調べる必要があります。 次に、次元削減と機能選択(例:PCAを使用)、クラスタリング(例:K-Means)を実行し、後で新しいデータのモデルのテストを実行したいと思います。Apache SparkでJavaデータ構造を使用しないでください。データのコピーを避けるために

私はこれをSparkのJava APIで実装しましたが、DataFrameからjava.util.Vectorおよびjava.util.Listに多くのデータをコピーするため、すべてのレコードを繰り返し処理して前処理を行うことができるようにする)、後でDataFrameに戻すことができます(SparkのPCAはDataFrameを入力として使用するため)。

データベースから情報をorg.apache.spark.sql.Columnに抽出しようとしましたが、それを反復処理する方法が見つかりませんでした。 また、org.apache.spark.mllib.linalg。{DenseVector、SparseVector}を使ってJavaデータ構造(ListやVectorなど)の使用を避けようとしましたが、どちらも機能しません。 最後に、(データフレームとカスタムスキーマからJavaRDDを作成して)JavaRDDの使用を検討しましたが、完全には機能しませんでした。

長い説明の後、私の質問は:Javaデータ構造にすべてのデータをコピーせずに、最初の段落で述べたすべての手順を実行する方法はありますか? 私が試したオプションの1つが実際にはうまくいくかもしれませんが、Sparkのドキュメントや文献が少し不足しているため、どうやって見つけられないようです。

答えて

0

ご質問の文言から、スパーク処理の段階について混乱があるようです。

まず、入力と変換を指定することで、Sparkに指示します。この時点では、(a)処理のさまざまな段階でのパーティション数と(b)データのスキーマのみがわかっています。 org.apache.spark.sql.Columnは、この段階で、列に関連付けられたメタデータを識別するために使用されます。ただし、データは含まれていません。実際、この段階ではデータはまったくありません。

次に、データフレーム/データセットに対してアクションを実行するようにSparkに指示します。これが処理を開始します。入力は読み込まれ、さまざまな変換と最後のアクション操作(collectまたはsaveなど)に流れます。

これは、データベースから情報を抽出することができない理由を説明しています(Column)。

あなたの質問の中核として、コードを見ずにそれが達成しようとしていることを正確に知ることは難しいですが、タイプ間の多くの移行は悪い考えです。ここで

は、より良い結果をご案内役立つかもしれない質問のカップルです:

  • は、なぜあなたはインスタンス上で直接操作することにより、必要なデータ変換を実行することはできませんか?

  • 変換コードの一部をUDFまたはUDAFにラップすると便利でしょうか?

は、この情報がお役に立てば幸いです。

+0

シムは、いくつかの非常に便利で概念的なものを書いています。これらは私にとって非常に便利です。その間。データフレームでUDFを使用して問題を解決しました。また、Spark 2.0.0プレビューには多くの追加機能が用意されているので、コードを2.0.0に移植するとデータセットを排他的に使用することができました(Java Vector/Listにデータをコピーするのと比べて) – Rajko

+0

比較的シンプルなデータ2.0.0の 'Datasets'はうまくいくはずです。 – Sim

関連する問題