2017-12-28 29 views
1

私は既存のカフカのトピックとそこから読み取り、HDFSに書き込むflumeエージェントを持っています。私はflumeエージェントを再構成して、既存のセットアップから離れるようにしたい。カフカチャンネルを使用するために、カフカソース、HDFSシンクへのファイルチャンネル。ソースを指定せずにFlumeのカフカチャンネルを使用する方法

これは、カフカチャンネルとHDFSシンク(水蒸気源なし)のみを使用してこれを達成することができると読んでいます(スティックの端が間違っていない限り)。この設定は動作していません。それは箱の上に水路のプロセスを開始さえしていません。

  • HDPクイックスタートVM 2.6.3
  • 水路バージョン1.5.2
  • HDFSディレクトリが
  • ps -ef | grep flume存在しない唯一のI一度処理を戻す:私が使用している

    # Test 
    test.channels = kafka-channel 
    test.sinks = hdfs-sink 
    
    test.channels.kafka-channel.type = 
    org.apache.flume.channel.kafka.KafkaChannel 
    test.channels.kafka-channel.kafka.bootstrap.servers = localhost:9092 
    test.channels.kafka-channel.kafka.topic = test 
    test.channels.kafka-channel.parseAsFlumeEvent = false 
    
    test.sinks.hdfs-sink.channel = kafka-channel 
    test.sinks.hdfs-sink.type = hdfs 
    test.sinks.hdfs-sink.hdfs.path = hdfs://localhost:8082/data/test/ 
    

    kafka-sourceを追加しましたが、これは正しくないことがあります。これは、トピックに公開されたメッセージに対して無限ループを作成するためです。

カフカチャンネルとHDFSシンクのみを使用することはできますか、またはkafka-sourceを使用する必要がありますが、メッセージの無限ループを防ぐために他の設定を変更する必要がありますか?

Kafka-source - >kafka-channel - >HDFS Sink - これは私にとっては正しいことではありません。

+0

flumeの起動時にどのようなエラーがありますか? – Erms

+0

flumeプロセスはボックスで開始されていないため、flumeログはありません。 Ambariを介してflumeサービスを開始してもエラーメッセージは表示されません。 – darkCode

+0

起動時にどのflumeエラーが発生したかを調べるには、エージェントを手動で起動する必要があります。
'$ bin/flume-ngエージェント-n $エージェント名-c conf -f conf/flume-conf.properties.template' – Erms

答えて

0

ちょっと掘り下げたあと、私はAmbariが特定のエージェントのflume confファイルを作成していないことに気付きました。私がtest.sources = kafka-sourceを指定した場合、アンバリはflume設定を作成/更新するように見えます。これをflume config(ambariを介して)に追加すると、設定がボックスに作成され、flumeエージェントが正常に起動しました。

最終水路の設定はこのように見えた:私は(これは私が私の質問に記載された無限ループの問題を引き起こす)ソースのプロパティのいずれかを設定しなかった

test.sources=kafka-source 
test.channels = kafka-channel 
test.sinks = hdfs-sink 

test.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel 
test.channels.kafka-channel.kafka.bootstrap.servers = localhost:9092 
test.channels.kafka-channel.kafka.topic = test 
test.channels.kafka-channel.parseAsFlumeEvent = false 

test.sinks.hdfs-sink.channel = kafka-channel 
test.sinks.hdfs-sink.type = hdfs 
test.sinks.hdfs-sink.hdfs.path = hdfs:///data/test 

お知らせ、それだけで必要Ambariはflume configを作成し、エージェントを起動します。

0

これはFlumeについてのあなたの質問に直接答えるものではありませんが、一般にApache Kafkaを既に使用しているので、このパターンはKafka Connect(Apache Kafkaの一部です)を使用して解決するのが最適です。 使いやすいKafka Connect HDFSコネクタがあります(this guide here)。

+0

Robin、それは戦略的な計画ですが、今のところこれを短期的な解決策として必要としています。私はまた、これがどのように達成されているのか知りたいと思っています。 – darkCode

+0

コンフルエントコネクタはKafka Connectの一部ではありません。あなたのリンクはConfluent Kafkaが利用可能であることを前提としています –

+0

[standalone](https://github.com/confluentinc/kafka-connect-hdfs) –

関連する問題