1
マスクIPでデータのIPアドレスを変更したい。これは私のFlumeエージェントの "バックアップ"部分で行われます(下記参照)。この構成ではFlumeカスタムインターセプターが動作しない
2つのチャネルがある:秒1は、バックアップのために使用されている間、最初のチャネルは、HBaseのにデータをダンプします。
a1.sources = r1 r2
a1.channels = channel1 Backup_channel
a1.sinks = FSink
a1.sources.r1.handler = com.flume.handler.JSONHandler
a1.sources.r1.type = avro
a1.sources.r1.bind = x.x.x.x
a1.sources.r1.port = 10008
a1.sources.r2.handler = com.flume.handler.JSONHandler
a1.sources.r2.type = avro
a1.sources.r2.bind = x.x.x.x
a1.sources.r2.port = 10009
a1.sources.r2.interceptors = i1
a1.sources.r2.interceptors.i1.type = com.flume.interceptor.DcInterceptor
a1.channels.channel1.type = file
a1.channels.channel1.checkpointDir = /root/flume/channels/Livechannel/checkpoint
a1.channels.channel1.dataDirs = /root/flume/channels/Livechannel/data
a1.sinks.FSink.type = hbase
a1.sinks.FSink.table = Temp_Test
a1.sinks.FSink.batchSize = 300
a1.sinks.FSink.columnFamily = T
a1.sinks.FSink.serializer = com.flume.sink.TestTP
a1.sources.r1.channels = channel1
a1.sources.r2.channels = Backup_channel
a1.channels.Backup_channel.type = file
a1.channels.Backup_channel.checkpointDir = /data/disk/flume/backup/checkpoint
a1.channels.Backup_channel.dataDirs = /data/disk/flume/backup/data
a1.sinks.FSink.channel = channel1
は次のように私のカスタムJavaインターセプターコードです。このメソッドは、身体からIPアドレスを取得し、そのIPマスクを計算し、それをボディに追加するインターセプトメソッドを実装します。しかし、どういうわけか、それは働いていない:
public class DcInterceptor implements Interceptor {
private byte[] jsonTestBeans;
private final Type listType = new TypeToken < List <TestBeans>>() {}.getType();
@Override
public void close() {
// TODO Auto-generated method stub
}
@Override
public void initialize() {
// TODO Auto-generated method stub
new Logger();
}
@Override
public Event intercept(Event event) {
// TODO Auto-generated method stub
List <Row> actions = new ArrayList <Row>();
this.jsonTestBeans = event.getBody();
Logger.logger.debug("In Interceptor");
System.out.println("In Interceptor");
Gson _Gson = new Gson();
String jsonstr = "";
try {
jsonstr = new String(jsonTestBeans, "UTF-8");
} catch (Exception e) {
// TODO: handle exception
Logger.logger.error(e.getMessage() + "In Interceptor");
jsonstr = new String(jsonTestBeans);
}
List <TestBeans> TestBeanss = _Gson.fromJson(jsonstr, listType);
System.out.println("Json String :" + jsonstr);
List <String> gTouch = new ArrayList <String>();
for (TestBeans TestBeans: TestBeanss) {
String str = TestBeans.getIp();
Logger.logger.debug("IP : " + str);
String st = (str.substring(0, str.lastIndexOf(".") + 1) + "x");
Logger.logger.debug("Mask IP : " + st);
TestBeans.setRemoteIp(st);
}
event.setBody(_Gson.toJson(TestBeanss).getBytes());
Logger.logger.debug("Interceptor Ends");
return event;
}
@Override
public List <Event> intercept(List <Event> events) {
// TODO Auto-generated method stub
System.out.println("In List Interceptor");
Logger.logger.debug("In List Interceptor");
for (Event event: events) {
intercept(event);
}
return events;
}
public static class CounterInterceptorBuilder implements Interceptor.Builder {
private Context ctx;
@Override
public Interceptor build() {
Logger.logger.debug("In Interceptor Build");
System.out.println("In Build Interceptor");
return new DcInterceptor();
}
@Override
public void configure(Context context) {
this.ctx = context;
}
}
これは完璧な入力ミスです。 #frbありがとうございます。 –
ニース。したがって、インターセプタ 'package com.flume.interceptor; 'をファイルの先頭に追加してみてください。 – frb
まだ失敗している場合は、あなたの質問に関連ログを追加してください。 – frb