2017-10-27 12 views
0

spark sql 2.2.0を使用してjdbcソースからフェッチされたレコードの数を制限する方法はありますか?spark、scala&jdbc - レコード数を制限する方法

私は別のMS SQL Serverのテーブルから>移動のタスク(および変換)レコードの数が多い200Mを取り扱っております

val spark = SparkSession 
    .builder() 
    .appName("co.smith.copydata") 
    .getOrCreate() 

val sourceData = spark 
    .read 
    .format("jdbc") 
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") 
    .option("url", jdbcSqlConnStr) 
    .option("dbtable", sourceTableName) 
    .load() 
    .take(limit) 

それは動作しますが、それは明らかに最初にすべてのロードされましたデータベースから200Mのレコードを取得してから18分後にテストと開発のために必要なレコードの数を返します。

take(...)とload()を切り替えるとコンパイルエラーが発生します。

サンプルデータを小さなテーブルにコピーする方法、SSISまたは代替のetlツールを使用する方法があります。

spark、sql、jdbcを使用して目標を達成する方法があるかどうかは、本当に不思議です。

答えて

0

は、ドキュメントの「DBTABLE」、説明でテーブル名の代わりに使用することができます(Oracleでは "rownum"のような)行数を制限するための固有の機能です。

+0

私はテーブル名の代わりにSQL文を使って2つのアプローチを試してみました。そして、私はval sqlDF = spark.sql( "...")アプローチを使用してトップ100のレコードを選択しようとしましたが、 'invalide spark sql syntax'などのエラーがあります。エイリアスを使用したクエリ括弧の場合は – vkhazin

+0

を使用する必要があります。 https://stackoverflow.com/questions/43174838/how-to-use-a-subquery-for-dbtable-option-in-jdbc-data-ソース – pasha701

+0

優秀!次の作品! \t //ソースデータ \t val jdbcSourceConnection = "" jdbc:sqlserver:// $ sourceDbHost; databaseName = $ sourceDbName; user = $ sourceDbUsername; password = $ sourceDbPassword; " \tヴァルsourceData =スパーク \t \t .read \t \t .format( "JDBC") \t \t .OPTION( "ドライバ"、 "com.microsoft.sqlserver.jdbc.SQLServerDriver") \t \t .OPTION(」 URL」、jdbcSourceConnection) // \t \t .OPTION( "DBTABLE"、sourceTableName) \t \t .OPTION( "DBTABLE"、 "(Customer.Preferenceからトップ100 *を選択する)TABLE1のように") \t \t .LOAD これは動作しますが3210 \t .sort(「得意先」) – vkhazin

1

このアプローチは、リレーショナルデータベースにとっては少し悪いことです。 sparkのロード機能はフルテーブルを要求し、メモリ/ディスクに格納し、RDDの変換と実行を行います。

探索的な作業をしたい場合は、このデータを最初の負荷に格納することをお勧めします。それを行うにはいくつかの方法があります。あなたのコードを取ると、次のように実行します。

val sourceData = spark 
    .read 
    .format("jdbc") 
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") 
    .option("url", jdbcSqlConnStr) 
    .option("dbtable", sourceTableName) 
    .load() 
sourceData.write 
    .option("header", "true") 
    .option("delimiter", ",") 
    .format("csv") 
    .save("your_path") 

これは、あなたがCSV、あなたが探査のための任意の言語で作業することができ、最も一般的な形式としてローカルマシンにデータを保存できるようになります。 これをロードするたびに、このファイルからこのデータを取り出します。あなたがリアルタイム分析、またはこのような他のものをしたい場合。私はあなたが別のストレージを更新するためにデータの変換でパイプラインを構築することをお勧めします。このアプローチを使用して毎回あなたのデータベースからロードのデータを処理するのは良くありません。クエリで https://spark.apache.org/docs/latest/sql-programming-guide.html

条件がサーバーと、例えば、指定することができます「」:ダウンロードされたローの制限数、SQLクエリの

+0

を詳細なコメントありがとうございました! – vkhazin

0

私はこれをテストしていませんが、takeではなくlimitを試してください。 takeコール次の注意を持ってカバー下headは:結果の配列を小さく と予想される場合、すべてのデータは、ドライバのメモリにロードされる

この方法のみ、使用されるべきです。それは遅延評価であるようにLIMITでlimit結果に対し

は、SQLクエリに押し込ま:

この機能とheadの違いはheadが作用があることがあり、トリガーによって(配列を返しますクエリの実行)、limitは新しいデータセットを返します。

あなたが最初にそれを引っ張っせずにデータを望むなら、あなたも何か行うことができます:

...load.limit(limitNum).take(limitNum) 
+0

私は、データソースとしてカサンドラとその正確なアプローチを使用していたが、SQL ServerのJDBCのためにそれは私のために動作しませんでした。 私はさらにそれを見てください、提案をありがとう! – vkhazin

+0

いいえ:それは私が求めている100レコードを読み込むために甘い長い時間がかかります。スパークはすべてのレコードを最初にロードしてからこの場合に限度を適用するように見えます。 \t \t .OPTION ' \tヴァルsourceData =スパーク \t \t .read \t \t .format( "JDBC")( "ドライバ"、 "com.microsoft.sqlserver.jdbc.SQLServerDriver") \t \t .OPTION( "URL"、jdbcSourceConnection) // \t \t .OPTION( "DBTABLE"、S "($ {} config.sourceTableNameから*トップ$ {config.sourceLimit}を選択)TABLE1のように") \t \t .OPTION ( "DBTABLE"、config.sourceTableName) \t( "CustomerID") '' ' – vkhazin

関連する問題