2017-05-17 3 views
0

私は、camel-kafkaコンシューマーを開始しています。ここで、トピックからすべてのデータを取り出し、ハッシュマップに移動します。キャメル・カフカのためのジャーバージョンcxfbeans.xmlファイル2.18.1Camel-Kafka Zookeeper Exception

で次のようになります。

<route id="Test1" streamCache="true"> 
     <from uri="file:C:/data" /> 
     <split streaming="true"> 
      <tokenize token="\n" /> 
      <to uri="bean:proc1" /> 
      <to 
       uri="kafka:localhost:9092?topic=Checking&amp;zookeeperHost=localhost&amp;zookeeperPort=2181&amp;serializerClass=kafka.serializer.StringEncoder" /> 
     </split> 
    </route> 

私は以下の例外取得しています戦争展開する場合:

原因:org.apache.camel.ResolveEndpointFailedException:エンドポイントの解決に失敗しました:kafka:// localhost:9092?serializerClass = kafka.serializer.StringEncoder & topic =確認中& zookeeperHost = localhost & zookeeperPort = 2181原因:エンドポイントに設定できなかった2つのパラメータがあります。パラメータのスペルが正しく、エンドポイントのプロパティであることを確認してください。不明なパラメータ= [{zookeeperHost = localhost、zookeeperPort = 2181}]

zookeeperのポートとホストを削除してみましたが、それは展開されましたが、コンシューマはプロセッサを消費して配置しません。

誰でもこの問題の解決にお手伝いできますか?私はバージョンを低下させましたが、toタグにde-serializerクラスを指定する必要があります。

答えて

1

ラクダカフカを使用する場合、バージョン< = 2.16と> = 2.17の違いがあります。

あなたが使用するのは2.16に適しています。

> = 2.17については、http://camel.apache.org/kafka.html 2.17以降を参照してください。

の例では、そこにある:

from("direct:start").process(new Processor() { 
        @Override 
        public void process(Exchange exchange) throws Exception { 
         exchange.getIn().setBody("Test Message from Camel Kafka Component Final",String.class); 
         exchange.getIn().setHeader(KafkaConstants.PARTITION_KEY, 0); 
         exchange.getIn().setHeader(KafkaConstants.KEY, "1"); 
        } 
       }).to("kafka:localhost:9092?topic=test");