2016-04-26 10 views
1

クラスタにSpark 1.3とHiveがあります ランダムに選択した行を追加する必要がある大きなHiveテーブルがあります。 条件を満たしていれば、読み込んだ小さなテーブルがあります。条件が満たされていれば、入力するランダムな行を照会するために必要な変数を取得します。私がしたことは、その条件の問い合わせをtable.where(value<number)とし、それをtake(num rows)を使って配列にすることでした。そして、これらの行には、大型のハイブテーブルからランダムな行が必要な情報が含まれているため、配列を繰り返し処理します。HiveテーブルにScala + Spark 1.3をインクリメントして追加します。

クエリを実行すると、クエリにORDER BY RAND()sqlContextを使用)が使用されます。大きなテーブルから列を追加すると、var Hive table(変更可能)が作成されました。ループでは、私はunionAllを行うnewHiveTable = newHiveTable.unionAll(random_rows)

私はこれを行うにはさまざまな方法を試しましたが、CPUと一時ディスクの使用を避ける最良の方法は何か分かりません。データフレームはインクリメンタルな追加を意図したものではないことを私は知っています。 私が今試したがっていることの1つは、cvsファイルを作成し、そのファイルにランダムな行を徐々に書き込み、ループが終了したらcvsファイルをテーブルとしてロードし、unionAllを実行してファイナルテーブル。

フィードバックは素晴らしいです。ハイブ内

create external table test(key string, value string) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY ';' 
LOCATION '/SOME/HDFS/LOCATION' 

そして感​​謝

+0

あなたは最近のスパークバージョンに移行する選択肢を持っているのですか? 'yourDataFrame.writer()。mode(SaveMode.Append).saveAsTable(" YourTableName "' – user1314742

+0

Spark 1.5へのアップグレードまで翌月になることはありません – KBA

答えて

2

私はあなたが、ハイブと外部表を作成する場所を定義して、火花がそのディレクトリにCSV形式で出力を書いてみましょうすることをお勧めします https://github.com/databricks/spark-csvの側近とスパークから、ファイルをCSVするデータフレームを作成し、既存のものに追記:

df.write.format("com.databricks.spark.csv").save("/SOME/HDFS/LOCATION/", SaveMode.Append) 
+1

ありがとうございました。 Hiveで 'LIKE tablename'を使用して追加したいテーブルの名前です。ファイアウォールのために、databricksとapache jarファイルをダウンロードして、spark-submissionを実行したときにコマンドラインに追加しなければならないことに注意してください'spark-submit -master yarn-client --classメインmain.jar - jars spark-csv_2.10-1.4.0.jar、commons-csv-1.2.jar'現在、ファイルはそれぞれに書き込まれていますあなたがそれらをロードするために何を提案しますか?Scalaで、メモリ内のハイブテーブルにそれらを追加し、オリジナルにunionAllを追加して保存しますか? – KBA

+0

Note2そこにいる誰のためにも、私のsbtファイルに: 'libraryDependencies ++ = Seq(' '" com.databricks "%%" spark-csv "%" 1.4.0 "' – KBA

+1

申し訳ありませんがあなたの質問を理解することができませんでした "あなたはそれらを読み込むために何を示唆しますか"、どういう意味ですか?実際には、外部表と同じ場所にあるため、自動的にロードされます。ロードするために何もする必要はありません。私たちが外部テーブルを作成する理由です。 – user1314742

関連する問題