1

このトピックについてはいくつかの回答がありますが、何も機能していませんでした。カフカコンシューマの作成に失敗しました

次のストリームプロセッサを実行しようとしています。

object simplestream extends App { 

    val builder: KStreamBuilder = new KStreamBuilder 

    val streamingConfig = { //ToDo - Move these to config 
     val settings = new Properties 
     settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "example11") 
     settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") 
     // Specify default (de)serializers for record keys and for record values. 
     settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass.getName) 
     settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass.getName) 
     settings 
    } 

    val users = builder.stream("tt2") 

    users.print() 
    val stream: KafkaStreams = new KafkaStreams(builder, streamingConfig) 
    stream.start() 

    } 
} 

依存性:

//kafka 
    "org.apache.kafka" % "kafka-streams" % "0.10.2.0", 
    "org.apache.kafka" % "kafka-clients" % "0.10.2.0" 

とエラー:

[error] (run-main-1) org.apache.kafka.common.KafkaException: Failed to construct kafka consumer 
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer 
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:717) 
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:566) 
    at org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getConsumer(DefaultKafkaClientSupplier.java:38) 
    at org.apache.kafka.streams.processor.internals.StreamThread.<init>(StreamThread.java:323) 
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:349) 
    at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:272) 
    at kafka.simplestream$.runStream(simplestream.scala:36) 
    at kafka.simplestream$.delayedEndpoint$kafka$simplestream$1(simplestream.scala:40) 
    at kafka.simplestream$delayedInit$body.apply(simplestream.scala:12) 
    at scala.Function0.apply$mcV$sp(Function0.scala:34) 
    at scala.Function0.apply$mcV$sp$(Function0.scala:34) 
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) 
    at scala.App.$anonfun$main$1$adapted(App.scala:76) 
    at scala.collection.immutable.List.foreach(List.scala:378) 
    at scala.App.main(App.scala:76) 
    at scala.App.main$(App.scala:74) 
    at kafka.simplestream$.main(simplestream.scala:12) 
    at kafka.simplestream.main(simplestream.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.Metadata.update(Lorg/apache/kafka/common/Cluster;J)V 

私は別のクライアントバージョン、運を試してみました。私はkafka 0.10.2.0バージョンを使用しています。私はまた、飼い葉桶でエラー以下になる。

[2017-08-18 13:08:10,260] INFO Got user-level KeeperException when processing sessionid:0x15df53e101e0001 type:delete cxid:0x29 zxid:0x4d txntype:-1 reqpath:n/a Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election (org.apache.zookeeper.server.PrepRequestProcessor) 
[2017-08-18 13:08:10,364] INFO Got user-level KeeperException when processing sessionid:0x15df53e101e0001 type:create cxid:0x35 zxid:0x4e txntype:-1 reqpath:n/a Error Path:/brokers Error:KeeperErrorCode = NodeExists for /brokers (org.apache.zookeeper.server.PrepRequestProcessor) 
[2017-08-18 13:08:10,364] INFO Got user-level KeeperException when processing sessionid:0x15df53e101e0001 type:create cxid:0x36 zxid:0x4f txntype:-1 reqpath:n/a Error Path:/brokers/ids Error:KeeperErrorCode = NodeExists for /brokers/ids (org.apache.zookeeper.server.PrepRequestProcessor) 

正確に何が原因かわかりません。私はコンシューマー/プロデュースすることができます。

答えて

3

java.lang.NoSuchMethodError - このエラーは、複数のバージョンのクライアントjarがクラスパスで使用可能な場合に発生します。クラスパスを一度確認してください。

飼い猫に投げられたKeeperExceptionは問題ではありません。飼い主には存在しないノード/フォルダを作成するだけです。

+0

はい、私は正しいクライアントバージョン0.10.2.1を使用しなければなりませんでした。 – jarvis11

関連する問題