2015-11-16 12 views
7

これは私がここに初めて来たので、私がうまく投稿しないと申し訳ありません、私の悪い英語を申し訳ありません。シンクを設定するelasticsearch apache-flume

私はApache FlumeとElasticsearchシンクを設定しようとしています。すべてが大丈夫です、それはうまく動作しているようですが、エージェントを起動すると2つの警告があります。以下のもの:

2015-11-16 09:11:22,122 (lifecycleSupervisor-1-3) [ERROR - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)] Unable to start SinkRunner: { policy:[email protected] counterGroup:{ name:null counters:{} } } - Exception follows. 
java.lang.NoSuchMethodError: org.elasticsearch.common.transport.InetSocketTransportAddress.<init>(Ljava/lang/String;I)V 
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.configureHostnames(ElasticSearchTransportClient.java:143) 
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:77) 
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48) 
    at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:357) 
    at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46) 
    at org.apache.flume.SinkRunner.start(SinkRunner.java:79) 
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
2015-11-16 09:11:22,137 (lifecycleSupervisor-1-3) [WARN - org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:260)] Component SinkRunner: { policy:[email protected] counterGroup:{ name:null counters:{} } } stopped, since it could not besuccessfully started due to missing dependencies 

マイエージェント構成:

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost 
a1.sources.r1.port = 44444 

# Describe the sink ES 
a1.sinks = k1 
a1.sinks.k1.type = elasticsearch 
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300 
a1.sinks.k1.indexName = items 
a1.sinks.k1.indexType = item 
a1.sinks.k1.clusterName = elasticsearch 
a1.sinks.k1.batchSize = 500 
a1.sinks.k1.ttl = 5d 
a1.sinks.k1.serializer=org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer 
a1.sinks.k1.channel = c1 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

それはnetcatをを開始し、すべての罰金ですが、私は論文の警告に関する怖い、私はそれを理解していません。

+0

指定された設定が正しく実行されていますか?最初のログトレースは警告ではありませんが、 'ElasticSearchSink'に何らかの問題があります。おそらく、いくつかの依存関係の問題に関連しています(見つからないメソッドがあります)。 – frb

+0

私は警告トレースによって与えられた特定のメッセージに気が付かなかったが、それは私の診断を確認する: 'Component SinkRunner:{policy:[email protected] counterGroup:{name:null counters:{} }依存関係がないためにうまく始まらなかったので、停止しました。 ' – frb

答えて

1

ログに出席すると、一部の不足している依存関係に問題があります。

あなたはElasticSearchSinkドキュメントへの表情を持っている場合は、以下を参照してくださいよ:

ご使用の環境に必要なelasticsearchとLuceneのコアのjarファイルは、Apache水路のlibディレクトリに配置する必要がありますインストール。 Elasticsearchでは、クライアントJARのメジャーバージョンがサーバーのメジャーバージョンと一致し、両方がJVMの同じマイナーバージョンを実行していることが必要です。これが間違っていると、SerializationExceptionsが表示されます。必要なバージョンを選択するには、最初に、ターゲットクラスタが実行されているelasticsearchとJVMのバージョンを確認します。次に、メジャーバージョンと一致するelasticsearchクライアントライブラリを選択します。 0.19.xクライアントは0.19.xクラスタと通信できます。 0.20.xは0.20.xと通信でき、0.90.xは0.90.xと通信できます。 elasticsearchのバージョンが決定したら、pom.xmlファイルを読んで、使用する正しいlucene-core JARバージョンを決定します。 ElasticSearchSinkを実行しているFlumeエージェントは、ターゲットクラスタがマイナーバージョンまで実行しているJVMと一致する必要があります。

ほとんどの場合、必要なJava jarsを配置していないか、またはバージョンが適切ではありません。

2

私はApache Flume 1.6.0とElasticsearch 2.0 cantが正しいと思われる理由を見つけました。

私が変更した第三者のシンクが見つかりました。

Here is the link

そして、これが私の最終的な構成では、

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

# Describe/configure the source 
a1.sources.r1.type = netcat 
a1.sources.r1.bind = localhost 
a1.sources.r1.port = 44444 

# Describe the sink ES 
a1.sinks = k1 
a1.sinks.k1.type = com.frontier45.flume.sink.elasticsearch2.ElasticSearchSink 
a1.sinks.k1.hostNames = 127.0.0.1:9300 
a1.sinks.k1.indexName = items 
a1.sinks.k1.indexType = item 
a1.sinks.k1.clusterName = elasticsearch 
a1.sinks.k1.batchSize = 500 
a1.sinks.k1.ttl = 5d 
a1.sinks.k1.serializer = com.frontier45.flume.sink.elasticsearch2.ElasticSearchDynamicSerializer 
a1.sinks.k1.indexNameBuilder = com.frontier45.flume.sink.elasticsearch2.TimeBasedIndexNameBuilder 
a1.sinks.k1.channel = c1 

# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 

# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1 

それは私のために動作します。

お返事ありがとうございます。

P.S.はい、私は図書館を移動しなければなりませんでした。

1

のみ水路/ libのディレクトリにある2つのJARの下に追加し、それが働いていた、他のすべてのLucene JARを追加する必要はありません。

elasticsearch-1.7.1.jar

luceneの、コア - 4.10.4。

をjarファイル水路開始するコマンド:ESにデータをロードするflume-env.sh

export JAVA_HOME=/usr/java/default 

export JAVA_OPTS="-Xms3072m -Xmx3072m -XX:MaxPermSize=48m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=5445 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false" 

FLUME_CLASSPATH="/usr/flume/flume1.6/apache-flume-1.6.0-bin/;/usr/flume/flume1.6/apache-flume-1.6.0-bin/lib" 

水路アグリゲータ設定するには、以下の追加してください

bin/flume-ng agent --conf conf --conf-file conf/flume-aggregator.conf --name agent2 -Dflume.root.logger=INFO,console 

を:flume-aggregator.conf

agent2.sources = source1 
agent2.sinks = sink1 
agent2.channels = channel1 

################################################ 
# Describe Source 
################################################ 

# Source Avro 
agent2.sources.source1.type = avro 
agent2.sources.source1.bind = 0.0.0.0 
agent2.sources.source1.port = 9997 

################################################ 
# Describe Interceptors 
################################################ 
# an example of nginx access log regex match 
# agent2.sources.source1.interceptors = interceptor1 
# agent2.sources.source1.interceptors.interceptor1.type = regex_extractor 
# 
# agent2.sources.source1.interceptors.interceptor1.regex = "^(\\S+) \\[(.*?)\\] \"(.*?)\" (\\S+) (\\S+)(\"(.*?)\" \"(.*?)\")?" 
# 
# # agent2.sources.source1.interceptors.interceptor1.regex = ^(.*) ([a-zA-Z\\.\\@\\-\\+_%]+) ([a-zA-Z\\.\\@\\-\\+_%]+) \\[(.*)\\] \\"(POST|GET) ([A-Za-z0-9\\$\\.\\+\\@#%_\\/\\-]*)\\??(.*) (.*)\\" ([a-zA-Z0-9\\.\\/\\s\-]*) (.*) ([0-9]+) ([0-9]+) ([0-9\\.]+) 
# # agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 s9 s10 s11 s12 s13 
# 
# agent2.sources.source1.interceptors.interceptor1.serializers = s1 s2 s3 s4 s5 s6 s7 s8 
# agent2.sources.source1.interceptors.interceptor1.serializers.s1.name = clientip 
# agent2.sources.source1.interceptors.interceptor1.serializers.s2.name = datetime 
# agent2.sources.source1.interceptors.interceptor1.serializers.s3.name = method 
# agent2.sources.source1.interceptors.interceptor1.serializers.s4.name = request 
# agent2.sources.source1.interceptors.interceptor1.serializers.s5.name = response 
# agent2.sources.source1.interceptors.interceptor1.serializers.s6.name = status 
# agent2.sources.source1.interceptors.interceptor1.serializers.s7.name = bytes 
# agent2.sources.source1.interceptors.interceptor1.serializers.s8.name = requesttime 
# 

################################################ 
# Describe Sink 
################################################ 

# Sink ElasticSearch 
# Elasticsearch lib ---> flume/lib 
# elasticsearch/config/elasticsearch.yml cluster.name clusterName. data/clustername data. 
agent2.sinks.sink1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink 
agent2.sinks.sink1.hostNames = 10.20.156.16:9300,10.20.176.20:9300 
agent2.sinks.sink1.indexName = pdi 
agent2.sinks.sink1.indexType = pdi_metrics 
agent2.sinks.sink1.clusterName = My-ES-CLUSTER 
agent2.sinks.sink1.batchSize = 1000 
agent2.sinks.sink1.ttl = 2 
#this serializer is crucial in order to use kibana 
agent2.sinks.sink1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer 



################################################ 
# Describe Channel 
################################################ 

# Channel Memory 
agent2.channels.channel1.type = memory 
agent2.channels.channel1.capacity = 10000000 
agent2.channels.channel1.transactionCapacity = 1000 

################################################ 
# Bind the source and sink to the channel 
################################################ 

agent2.sources.source1.channels = channel1 
agent2.sinks.sink1.channel = channel1 
+0

正確なjarをリストアップしていただきありがとうございます。 – user99999991

関連する問題