2017-10-24 14 views
0

更新:カフカ:埋め込みカフカサーバーを作成できません

ここに私が従おうとしているサンプルコードです。

https://gist.github.com/asmaier/6465468

それは私が

 EmbeddedZookeeper zkServer = new EmbeddedZookeeper(); 
     String zkConnect = ZKHOST + ":" + zkServer.port(); 
     ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); 
     ZkUtils zkUtils = ZkUtils.apply(zkClient, false); 

     // setup Broker 
     Properties brokerProps = new Properties(); 
     brokerProps.setProperty("zookeeper.connect", zkConnect); 
     brokerProps.setProperty("broker.id", "0"); 
     brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); 
     brokerProps.setProperty("listeners", "PLAINTEXT://:" + BROKERPORT); 
     brokerProps.setProperty("advertised.listeners", "PLAINTEXT://" + BROKERHOST +":" + BROKERPORT); 
     //brokerProps.setProperty("security.inter.broker.protocol","PLAINTEXT"); 
     KafkaConfig config = new KafkaConfig(brokerProps); 
     Time mock = new MockTime(); 
     KafkaServer kafkaServer = TestUtils.createServer(config, mock); 
     logger.info("TestKafkaServer created"); 

として埋め込まカフカサーバーを作成しようとしていますが、私は

java.lang.NoSuchFieldError: DEFAULT_SASL_ENABLED_MECHANISMS 
    at kafka.server.Defaults$.<init>(KafkaConfig.scala:183) 
    at kafka.server.Defaults$.<clinit>(KafkaConfig.scala) 
    at kafka.log.Defaults$.<init>(LogConfig.scala:35) 
    at kafka.log.Defaults$.<clinit>(LogConfig.scala) 
    at kafka.log.LogConfig$.<init>(LogConfig.scala:246) 
    at kafka.log.LogConfig$.<clinit>(LogConfig.scala) 
    at kafka.server.KafkaConfig$.<init>(KafkaConfig.scala:270) 
    at kafka.server.KafkaConfig$.<clinit>(KafkaConfig.scala) 
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:795) 
    at kafka.server.KafkaConfig.<init>(KafkaConfig.scala:797) 
    at com.ibm.whi.bap.helper.test.kafka.KafkaServerTest.<init>(KafkaServerTest.java:56) 
    at com.ibm.whi.bap.helper.test.kafka.KafkaTest.checkAllProperties(KafkaTest.java:115) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44) 
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) 
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41) 
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) 
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28) 
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31) 
    at org.junit.rules.TestWatchman$1.evaluate(TestWatchman.java:48) 
    at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79) 
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71) 
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49) 
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193) 
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52) 
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191) 
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42) 
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184) 
    at org.junit.runners.ParentRunner.run(ParentRunner.java:236) 
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) 
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) 
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) 
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675) 
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) 
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) 

java.lang.NullPointerException 
    at com.ibm.whi.bap.helper.test.kafka.KafkaTest.tearDown(KafkaTest.java:58) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44) 
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) 
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41) 
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:37) 
    at org.junit.rules.TestWatchman$1.evaluate(TestWatchman.java:48) 
    at org.junit.runners.BlockJUnit4ClassRunner.runNotIgnored(BlockJUnit4ClassRunner.java:79) 
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:71) 
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:49) 
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:193) 
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:52) 
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:191) 
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:42) 
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:184) 
    at org.junit.runners.ParentRunner.run(ParentRunner.java:236) 
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) 
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) 
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) 
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675) 
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) 
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) 

を取得カフカの0.10.x

で動作するはずです

KafkaConfig config = new KafkaConfig(brokerProps); 

何が間違っているのですか?私は遊ぶことを試みた

brokerProps.setProperty("security.inter.broker.protocol","PLAINTEXT"); 

しかし何も働かなかった。私は単体テスト用の組み込みカフカサーバーを作成したいだけです。私はどんな種類のセキュリティも設定したくありません。

は、ここではこれは、コードの問題ではありません、私のMaven

 <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka-clients</artifactId> 
      <version>0.10.1.0</version> 
      <classifier>test</classifier> 
      <scope>test</scope> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka_2.11</artifactId> 
      <version>0.10.1.0</version> 
      <classifier>test</classifier> 
      <scope>test</scope> 
     </dependency> 
+1

かもしれない。なぜなら、カフカはアーセナルファンを好まないからかもしれない。 JK。飼い猫とブローカーをネイティブに展開するHDPのSandboxを試すことができます。問題はポートを開くことだけです(ドッカーのように)。 –

+0

笑、問題ありません!私はkafkaのファンではない:P問題は、私はいくつかの単体テストを持っているので、組み込みの動物園主が欲しいということです。私はこれらのテストの外にカフカサーバーは必要ありません。私は上記のコードがkafka 0.8のために働いていたことを知っていますか、あるいはおそらくo.9ですが、今は0.10ではうまくいきません。私はtiが設定されていないブローカーのプロパティと関係していると推測しています。何か案は? – AbtPst

+0

ああ、0.8と0.10の間には多くの変更があります。たとえば、消費者は今ブローカーに直接接続します。あなたのコードをもう一度読んで、あなたがScalaパッケージを見つけられていないと思います。その依存関係を追加しようとしましたか? –

答えて

1

です。これはバージョンの不一致です。使用しているライブラリ(https://gist.github.com/asmaier/6465468)は、使用しているカフカバージョン()とは異なるKafka 0.10.0.0に対してコンパイルされています。そのため、フィールドが見つからないのはDEFAULT_SASL_ENABLED_MECHANISMSです。あなたは自分のプロジェクトのソースにJavaクラスにこれらをコピーすることができます/またはあなたがスキーマレジストリを削除するなどのいくつかの詳細を(変更することができ

embeeded zookeeper

embedeed kafka

- :あなたは、以下のリンクをご覧になる場合があります)。これは、いくつかのライブラリ(現在はhttps://gist.github.com/asmaier/6465468に依存している)に依存するよりも、メモリカフカクラスタを作成するよりも簡単になります。結局のところ、これはほんの数行のコードです。お役に立てれば。

+0

感謝を掲示した更新を見てください!私はkafkaを0.10.1.0に更新して上記を試しましたが、 'effectiveConfig.put(KafkaConfig $ .MODULE $ .BrokerIdProp()、0);で' java.lang.NoSuchFieldError:DEFAULT_SASL_ENABLED_MECHANISMS'を取得しました; – AbtPst

+0

このエラーは、 https://gist.github.com/asmaier/6465468またはコンフルエントなソリューションを使用していますか?コンフルエントの – mrnakumar

+0

。私はhttps://github.com/diegoicosta/kafka-confluent-examples/blob/master/kafka-streams/src/test/java/io/confluent/examples/streams/kafka/KafkaEmbeddedを使ってみました。java – AbtPst

関連する問題