2017-03-14 4 views
1

マスクIPでデータのIPアドレスを変更したい。これは私のFlumeエージェントの "バックアップ"部分で行われます(下記参照)。この構成ではFlumeカスタムインターセプターが動作しない

2つのチャネルがある:秒1は、バックアップのために使用されている間、最初のチャネルは、HBaseのにデータをダンプします。

a1.sources = r1 r2 
a1.channels = channel1 Backup_channel 
a1.sinks = FSink 

a1.sources.r1.handler = com.flume.handler.JSONHandler 
a1.sources.r1.type = avro 
a1.sources.r1.bind = x.x.x.x 
a1.sources.r1.port = 10008 

a1.sources.r2.handler = com.flume.handler.JSONHandler 
a1.sources.r2.type = avro 
a1.sources.r2.bind = x.x.x.x 
a1.sources.r2.port = 10009 
a1.sources.r2.interceptors = i1 
a1.sources.r2.interceptors.i1.type = com.flume.interceptor.DcInterceptor 

a1.channels.channel1.type = file 
a1.channels.channel1.checkpointDir = /root/flume/channels/Livechannel/checkpoint 
a1.channels.channel1.dataDirs = /root/flume/channels/Livechannel/data 

a1.sinks.FSink.type = hbase 
a1.sinks.FSink.table = Temp_Test 
a1.sinks.FSink.batchSize = 300 
a1.sinks.FSink.columnFamily = T 
a1.sinks.FSink.serializer = com.flume.sink.TestTP 

a1.sources.r1.channels = channel1 
a1.sources.r2.channels = Backup_channel 

a1.channels.Backup_channel.type = file 
a1.channels.Backup_channel.checkpointDir = /data/disk/flume/backup/checkpoint 
a1.channels.Backup_channel.dataDirs = /data/disk/flume/backup/data 

a1.sinks.FSink.channel = channel1 

は次のように私のカスタムJavaインターセプターコードです。このメソッドは、身体からIPアドレスを取得し、そのIPマスクを計算し、それをボディに追加するインターセプトメソッドを実装します。しかし、どういうわけか、それは働いていない:

public class DcInterceptor implements Interceptor { 
    private byte[] jsonTestBeans; 

    private final Type listType = new TypeToken < List <TestBeans>>() {}.getType(); 

    @Override 
    public void close() { 
     // TODO Auto-generated method stub 

    } 

    @Override 
    public void initialize() { 
     // TODO Auto-generated method stub 
     new Logger(); 
    } 

    @Override 
    public Event intercept(Event event) { 
     // TODO Auto-generated method stub 
     List <Row> actions = new ArrayList <Row>(); 
     this.jsonTestBeans = event.getBody(); 
     Logger.logger.debug("In Interceptor"); 
     System.out.println("In Interceptor"); 
     Gson _Gson = new Gson(); 
     String jsonstr = ""; 
     try { 
      jsonstr = new String(jsonTestBeans, "UTF-8"); 
     } catch (Exception e) { 
      // TODO: handle exception 
      Logger.logger.error(e.getMessage() + "In Interceptor"); 
      jsonstr = new String(jsonTestBeans); 
     } 
     List <TestBeans> TestBeanss = _Gson.fromJson(jsonstr, listType); 
     System.out.println("Json String :" + jsonstr); 
     List <String> gTouch = new ArrayList <String>(); 
     for (TestBeans TestBeans: TestBeanss) { 
      String str = TestBeans.getIp(); 
      Logger.logger.debug("IP : " + str); 
      String st = (str.substring(0, str.lastIndexOf(".") + 1) + "x"); 
      Logger.logger.debug("Mask IP : " + st); 
      TestBeans.setRemoteIp(st); 
     } 
     event.setBody(_Gson.toJson(TestBeanss).getBytes()); 
     Logger.logger.debug("Interceptor Ends"); 
     return event; 
    } 

    @Override 
    public List <Event> intercept(List <Event> events) { 
     // TODO Auto-generated method stub 
     System.out.println("In List Interceptor"); 
     Logger.logger.debug("In List Interceptor"); 
     for (Event event: events) { 
      intercept(event); 
     } 
     return events; 
    } 

    public static class CounterInterceptorBuilder implements Interceptor.Builder { 

     private Context ctx; 

     @Override 
     public Interceptor build() { 
      Logger.logger.debug("In Interceptor Build"); 
      System.out.println("In Build Interceptor"); 
      return new DcInterceptor(); 
     } 

     @Override 
     public void configure(Context context) { 
      this.ctx = context; 
     } 

    } 

答えて

0

少なくとも、私は見ることができます:

  • をごインターセプタに関する設定行は、エージェントを参照してください。それ以外の構成は、a1を参照しながら、ECircleTp_Testと呼ばれます。
  • com.flume.interceptor.DcInterceptor2を設定しましたが、開発したインターセプタクラスはDcInterceptor(最終2なし)と呼ばれています。
  • com.flume.interceptor.DcInterceptor2は、カスタム・インターセプターの完全修飾クラス名として構成されています。それにもかかわらず、インターセプタのコードはDcInterceptor(2)クラスのパッケージを宣言しません。
+0

これは完璧な入力ミスです。 #frbありがとうございます。 –

+0

ニース。したがって、インターセプタ 'package com.flume.interceptor; 'をファイルの先頭に追加してみてください。 – frb

+0

まだ失敗している場合は、あなたの質問に関連ログを追加してください。 – frb

関連する問題