0

私はMS SQLデータベースからデータを読み込むためにSpark JDBCを使用していますが、私は奇妙な結果を得ています。Apache Spark JDBCデータフレーム数の問題

たとえば、私のMS SQLデータベースからレコードを読み取るコードは次のとおりです。 私がデータを読み込んでいるテーブルにレコードが挿入され続けていることに注意してください。

//Extract Data from JDBC source 
    val jdbcTable = sqlContext.read.format("jdbc").options(
     Map(
     "url" -> jdcbUrl, 
     "driver" -> "net.sourceforge.jtds.jdbc.Driver", 
     "dbtable" -> 
      s"(SELECT COLUMNS WITH INNER JOINS WHERE tbl.ID > ${lastExtractUnixTime}) as t")) 
     .load 

    println(s"STEP 1] JDBC RECORDS COUNTS ---> ${jdbcTable.count()}") 

    val updateJdbcDF = jdbcTable 
     .withColumn("ID-COL1", trim($"COl1")) 
     .withColumn("ID-COL1", trim($"COl2")) 

    println(s"STEP 2] UPDATE JDBC RECORDS COUNTS ---> ${updateJdbcDF.count()}") 

私は例えば、私はいつも${updateJdbcDF.count()}数>${jdbcTable.count()}を取得し、2つの異なるカウント値に私は私のプログラムを実行するたびに取得します。

誰かがなぜこのようなことが起こっているのか説明できますか?これは私のユースケースで多くの問題を作り出しています。それが作成された後にjdbcTable DataFrameのカウントを制限する方法。私はjdbcTable.cache()を試しましたが運はありません。

jdbcTable DataFrameから派生した他のデータフレームで操作を使用すると、レコードが大きくなります。 jdbcTableデータフレームから派生したデータフレームを使用するたびにデータフレームが呼び出されますか?

+0

違いはありますか?あるいは、毎回両方のステートメントに異なるカウントを取得していますか? – philantrovert

+0

@philantrovert違いは一定ではありません。私は毎回別のカウントを取得しています。 – nilesh1212

+1

"データを読み込んでいるテーブルがレコードに連続的に挿入されています"という要求が固定範囲の述語を定義していない場合は、テーブル内の行数は、スパークがアクセスするたびに異なります。だからあなたが見ているもの(カウントを変更する)は、予期されるだけですね。 – GPI

答えて

1

jdbcTable.cache()を適用してこの問題を解決できました.jdbcTableデータフレームから派生したDFでは、jdbcTable.count()よりも高いカウントを取得できません。すべての計算は今すぐOKです。説明ありがとう@GPI

//Extract Data from JDBC source 
    val jdbcTable = sqlContext.read.format("jdbc").options(
     Map(
     "url" -> jdcbUrl, 
     "driver" -> "net.sourceforge.jtds.jdbc.Driver", 
     "dbtable" -> 
      s"(SELECT COLUMNS WITH INNER JOINS WHERE tbl.ID > ${lastExtractUnixTime}) as t")) 
     .load 

    jdbcTable.cache() 

    println(s"STEP 1] JDBC RECORDS COUNTS ---> ${jdbcTable.count()}") 


    val updateJdbcDF = jdbcTable 
     .withColumn("ID-COL1", trim($"COl1")) 
     .withColumn("ID-COL1", trim($"COl2")) 

    println(s"STEP 2] UPDATE JDBC RECORDS COUNTS ---> ${updateJdbcDF.count()}") 
    /** 
    * MORE DATA PROCESSING HERE 
    /** 

    jdbcTable.unpersist()