2017-06-16 5 views
0

変数を使用して、2つの変数値の範囲内のエントリを選択したいとします。私のSQLクエリはString sql = "Select count(*) FROM Records WHERE event_time <= UPPERTIME('1') AND event_time >= LOWERTIME('1')";です。このクエリUPPERTIME('1')LOWERTIME('1')でUDFとそこ定義はSpark 2.1.1:ストラクチャードストリーミングクエリに変数をバインドする方法

spark.udf().register("LOWERTIME", new UDF1 < String, String >() { 
     @Override public String call(String lowertime) { 
      System.out.println("lowerTime="+lowerTime.toString()); 
      return lowerTime.toString(); 
      } 
      }, DataTypes.StringType); 


spark.udf().register("UPPERTIME", new UDF1 < String, String >() { 
     @Override public String call(String uppertime) { 
      System.out.println("upperTime="+upperTime.toString()); 
       return upperTime.toString(); 
      } 
      }, DataTypes.StringType); 

されているUDFに渡される引数はダミーであり、私は実際にはグローバル変数「upperTimeとlowerTime」を返しています。

上記のクエリを実行すると、テーブル内のすべてのエントリの数が表示されますが、条件によっては、指定した範囲内のエントリ数に対応する数が表示されます。何がうまくいかないのですか?

答えて

0

私はドライバでこれらのグローバル変数を変更し、エグゼキュータは変更を認識しないと考えています。エグゼキュータ・プロセスは、通常、別のノード上にあり、別のノード内の別のプロセス内の通常の変数にはアクセスできません。

一般に、Sparkでグローバル変数を使用することは悪い考えです。 Sparkはエグゼキュータとドライバの間で変数を共有するためにbroadcast variablesを提供します。

+0

ありがとう@zsxwingしかし、SQLクエリでバインドする変数は読み取り専用ではなく、特定の条件が満たされた後に一定の値だけ増分したいと考えています。 – kadsank

+0

Sparkの仕事中にあなたの変更をブロードキャストできるとは思いません。また、データの処理順序を前提としているようですか?それは通常間違っている。 – zsxwing

関連する問題