2017-12-12 10 views
-1

私はこのようになります非常に単純なCSVファイルがあります:私はすべての値がテーブル内のダブルスとして作成されているにもハイブのテーブルに座って、このcsvファイルを、持っているSpark 2.0クラスタリングのCSVデータを前処理する方法は?

time,is_boy,is_girl 
135,1,0 
136,0,1 
137,0,1 

を。

この表は実際には巨大であり、膨大な数の行があるため、この問題を解決するためにSpark 2を使用することにしました。

私は、Pythonで、このクラスタリングライブラリを使用したいと思います: https://spark.apache.org/docs/2.2.0/ml-clustering.html

誰もが、CSVまたは一部スパークSQLの魔法を使用することにより、直接、このデータをロードし、正しく前処理、それをする方法を知っている場合は、使用してPythonでは、kmeans fit()メソッドに渡してモデルを計算することができ、とても感謝しています。私はcsvsとこのライブラリの例はまだ見つけていないので、他の人には便利だと思います。

+2

スパークには組み込みのCSVリーダーがあり、SparkSQLはハイブとやり取りできます(魔法ではなく、よく書かれています)。あなたが試したことを示してください –

+0

downvoteの理由は何ですか? –

+1

私はしませんでしたが、http://idownvotedbecau.se/noresearch/ –

答えて

0

を取得する前にデータフレームのドキュメントを超える読み、私がしなければならなかったかなりの数の奇妙な事がありましたそれが仕事を得るので、私はそれを共有する価値があると感じします

ので、同様に、私は、単純なCSVファイルを作成しました:

time,is_boy,is_girl 
123,1.0,0.0 
132,1.0,0.0 
135,0.0,1.0 
139,0.0,1.0 
140,1.0,0.0 

は、その後、私は色相でこのクエリを実行し、ハイブのテーブルを作成しました:

CREATE EXTERNAL TABLE pollab02.experiment_raw( 
     `time` double, 
     `is_boy` double, 
     `is_girl` double) 
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' with 
    serdeproperties('separatorChar' = ',') 
    STORED AS TEXTFILE LOCATION "/user/me/hive/experiment" 
    TBLPROPERTIES ("skip.header.line.count"="1", "skip.footer.line.count"="0") 

すると、次のように私のpysparkスクリプトは: (私はSparkSessionを仮定している名前「スパーク」で作成されています)

from pyspark.sql import SparkSession 
from pyspark.sql.functions import * 
from pyspark.ml.feature import VectorAssembler 

raw_data = spark.sql("select * from dbname.experiment_raw") 

#filter out row of null values that were added for some reason 
raw_data_filtered=raw_data.filter(raw_data.time>-1) 

#convert rows of strings to doubles for kmeans: 
data=raw_data_filtered.select([col(c).cast("double") for c in raw_data_filtered.columns]) 
cols = data.columns 

#Merge data frame with column called features, that contains all data as a vector in each row 
vectorAss = VectorAssembler(inputCols=cols, outputCol="features") 
vdf=vectorAss.transform(data) 
kmeans = KMeans(k=2, maxIter=10, seed=1) 
model = kmeans.fit(vdf) 

をし、残りは歴史です。私はここで最善のベストプラクティスを行っていない。スペースを節約してパフォーマンスを向上させるために、vdf DataFrameから不要な列を削除することもできますが、これは機能します。

0

フィット法は、単にベクトル/ DATAFRAME

spark.read().csvまたはspark.sql両方を取るあなたのデータフレームを返します。

あなたが前処理データにしたいしかしは、だから私は十分な時間を推測し、最終的にこれを解決しMlLib /関数kmeans例に

関連する問題