2017-10-19 24 views
0

コンソールKafkaプロデューサからHadoopファイルシステム(HDFS)への簡単なデータパイプラインを設定しようとしています。私は64ビットUbuntu仮想マシンに取り組んでおり、HadoopとKafkaの両方に別々のユーザーを作成しています。 Kafkaで生産された入力を消費者コンソールで消費し、HDFSが稼働しているようです。FlumeのKafkaからのEOFException

ここで、Flumeを使用して入力をHDFSにパイプしたいとします。今私は、次のコマンド

bin/flume-ng agent --conf ./conf -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n tier1 

で水路を実行したときに、私は何度も何度もコンソール出力で同じ例外を取得

tier1.sources = source1 
tier1.channels = channel1 
tier1.sinks = sink1 

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource 
tier1.sources.source1.zookeeperConnect = 127.0.0.1:2181 
tier1.sources.source1.topic = test 
tier1.sources.source1.groupId = flume 
tier1.sources.source1.channels = channel1 
tier1.sources.source1.interceptors = i1 
tier1.sources.source1.interceptors.i1.type = timestamp 
tier1.sources.source1.kafka.consumer.timeout.ms = 2000 

tier1.channels.channel1.type = memory 
tier1.channels.channel1.capacity = 10000 
tier1.channels.channel1.transactionCapacity = 1000 

tier1.sinks.sink1.type = hdfs 
tier1.sinks.sink1.hdfs.path = hdfs://flume/kafka/%{topic}/%y-%m-%d 
tier1.sinks.sink1.hdfs.rollInterval = 5 
tier1.sinks.sink1.hdfs.rollSize = 0 
tier1.sinks.sink1.hdfs.rollCount = 0 
tier1.sinks.sink1.hdfs.fileType = DataStream 
tier1.sinks.sink1.channel = channel1 

:私は、次の設定ファイルを使用しています

2017-10-19 12:17:04,279 (lifecycleSupervisor-1-2) [DEBUG - org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:467)] Completed connection to node 2147483647 
2017-10-19 12:17:04,279 (lifecycleSupervisor-1-2) [DEBUG - org.apache.kafka.common.network.Selector.poll(Selector.java:307)] Connection with Ubuntu-Sandbox/127.0.1.1 disconnected 
java.io.EOFException 
    at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83) 
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71) 
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:153) 
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:134) 
    at org.apache.kafka.common.network.Selector.poll(Selector.java:286) 
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:256) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193) 
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163) 
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:222) 
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:311) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:890) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853) 
    at org.apache.flume.source.kafka.KafkaSource.doStart(KafkaSource.java:529) 
    at org.apache.flume.source.BasicSourceSemantics.start(BasicSourceSemantics.java:83) 
    at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:71) 
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
    at java.lang.Thread.run(Thread.java:748) 

Flumeを停止する唯一の方法は、Javaプロセスを強制終了することです。

私は、それがHadoopとKafkaの別々のユーザーと関係していると思っていましたが、Kafkaユーザーのすべてを実行しても同じ結果が得られます。私はEOFExceptionメソッドについてオンラインでも何も見つけられませんでした。私はちょうど「Getting Started」ガイドに従っていて、すべてのためにかなり標準的な設定を使用していたことを考えると変です。

おそらくそれは前の行(「Ubuntu-Sandbox/127.0.1.1 disconnected」)と関係があり、それで私のVMの設定は何ですか?

ご協力いただきありがとうございます。

答えて

0

代わりにKafka Connect(Apache Kafkaの一部)とHDFS connectorを使用したことはありますか?これは一般的にFlumeに取って代わるものと見なされます。 Flumeと同様のファイルベースの設定で、使いやすいです。

+0

助けてくれてありがとう、ロビン。私はConfluentに慣れ親しんでおり、すべてをより簡単にするようです。しかし、やはり、クイックスタートガイドに従うだけで、カフカからHDFSにデータを書き込むことはできません...今回は私も例外がなく、「接続スタンドアロン」プロセスは完了せず、 HDFSのフォルダは作成されていても空です...これは本当にイライラしています! – stefanS

関連する問題