2017-03-16 2 views
0

私は、spooldirのファイルからデータを読み込むflumeプロセスを持っています。&は、MySQLデータベースにデータを読み込みます。同じflumeプロセスで処理できる複数のタイプのファイルが存在します。Flumeのカスタムシンククラスの変数をすべてのバッチでリセットする方法

ファイルのデータ形式を決定するために、初期/最初の読み込み後にローカル変数(sInterfaceType)を更新するカスタムシンクJavaクラス(AbstractSinkを拡張)を作成しました。 ファイル処理が完了したらリセットしなければならないので、次のバッチ/インターフェイスファイルの識別から始めなければなりません。

私はstop()でしようとしましたが、助けになりません。誰かこれをしましたか?

マイシンククラスは、次のようになります。

public class MyFlumeSink2 extends AbstractSink implements Configurable { 

private String sInterfaceType; //tells file format of current load 

public MyFlumeSink2() { 
    //my initialization of variables 
} 

public void configure(Context context) { 
    //read context variables 
} 

public void start() { 
    //create db connection 
} 

@Override 
public void stop() { 
    //destroy connection 
    sInterfaceType = ""; //This doesn't help me 
    super.stop(); 
} 

public Status process() throws EventDeliveryException { 
    Channel channel = getChannel(); 
    Transaction transaction = channel.getTransaction(); 

    if((sInterfaceType=="" || sInterfaceType==null)) 
    { 
    //Read first line & set sInterfaceType 
    }else 
    //Insert data in MySQL 

    transaction.commit(); 
} 
} 
+0

ファイルは完全に処理されました(シンクレベルで)。 – frb

+0

私は、バッチまたはファイルを完了した後にstop()メソッドが呼び出されると考えていました。しかし、それはそうではないように見えます。だから私はあなたの質問に対する答えを知らない。 – KiranM

答えて

0

我々は手動でそれがどのイベントを決定する必要があり、すべての新しいファイルのために呼ばれる何の専門的な方法ではありません。

イベントコード&を最初の要素に基づいて設定するようにコードを修正しました。私のコードは次のようになります:

public Status process() throws EventDeliveryException { 
     //....other code... 

      sEvtBody = new String(event.getBody()); 
      sFields = sEvtBody.split(","); 

      //check first field to know record type 
      enumRec = RecordType.valueOf(checkRecordType(sFields[0].toUpperCase())); 
      switch(enumRec) 
      { 
       case CUST_ID: 
        sInterfaceType = "T_CUST"; 
        bHeader = true; 
        break; 
       case TXN_ID: 
        sInterfaceType = "T_CUST_TXNS"; 
        bHeader = true; 
        break; 
       default: 
        bHeader = false; 
      } 
      //insert if not header 
      if(!bHeader) 
      { 

       if(sInterfaceType == "T_CUST") 
       { 
        if(sFields.length == 14) 
         this.bInsertStatus = daoClass.insertHeader(sFields); 
        else 
         throw new Exception("INCORRECT_COLUMN_COUNT"); 
       }else if(sInterfaceType == "T_CUST_TXNS") 
       { 
        if(sFields.length == 10) 
         this.bInsertStatus = daoClass.insertData(sFields); 
        else 
         throw new Exception("INCORRECT_COLUMN_COUNT"); 
       } 

       //if(!bInsertStatus) 
       // logTransaction(sFields); 
      } 
      //....Other code.... 
関連する問題