0
私は、spooldirのファイルからデータを読み込むflumeプロセスを持っています。&は、MySQLデータベースにデータを読み込みます。同じflumeプロセスで処理できる複数のタイプのファイルが存在します。Flumeのカスタムシンククラスの変数をすべてのバッチでリセットする方法
ファイルのデータ形式を決定するために、初期/最初の読み込み後にローカル変数(sInterfaceType)を更新するカスタムシンクJavaクラス(AbstractSinkを拡張)を作成しました。 ファイル処理が完了したらリセットしなければならないので、次のバッチ/インターフェイスファイルの識別から始めなければなりません。
私はstop()でしようとしましたが、助けになりません。誰かこれをしましたか?
マイシンククラスは、次のようになります。
public class MyFlumeSink2 extends AbstractSink implements Configurable {
private String sInterfaceType; //tells file format of current load
public MyFlumeSink2() {
//my initialization of variables
}
public void configure(Context context) {
//read context variables
}
public void start() {
//create db connection
}
@Override
public void stop() {
//destroy connection
sInterfaceType = ""; //This doesn't help me
super.stop();
}
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
if((sInterfaceType=="" || sInterfaceType==null))
{
//Read first line & set sInterfaceType
}else
//Insert data in MySQL
transaction.commit();
}
}
ファイルは完全に処理されました(シンクレベルで)。 – frb
私は、バッチまたはファイルを完了した後にstop()メソッドが呼び出されると考えていました。しかし、それはそうではないように見えます。だから私はあなたの質問に対する答えを知らない。 – KiranM