2016-11-10 5 views
0

過去6ヶ月間、私はkafka-0.8.1.1の上に書かれた私たちの解答の開発者です。それは私たちのために安定しています。 kafka-0.9.0.1にアップグレードすると考えました。 サーバーのアップグレードでは、問題は発生しませんでした。kafka_2.11-0.9.0.1とscala 2.11.7の問題

私たちは独自のソリューションを使ってメッセージを抽出し、さまざまな宛先や嵐で読んだメッセージに書き込みます。我々は、次のMavenアーティファクト

<groupId>org.apache.kafka</groupId> 
<artifactId>kafka_2.9.2</artifactId> 
<version>0.8.1.1</version> 

を使用していた私たちのユニットテストのために私は、kafka_2.9.2のための0.9.0.1バージョンを見つけることができませんでした。そこで私は最初にkafka_2.11に移動しました。これは使用アーティファクトです:

<groupId>org.apache.kafka</groupId> 
<artifactId>kafka_2.11</artifactId> 
<version>0.9.0.1</version> 

私は次の問題に実行していた:

scala.ScalaObject not found issue 
java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object; 
kafkaConfig<init> issue with NoSuchMethodError (Ljava/util/map;)Ljava/util/map 

また、ほとんどの時間、私はkafka_2.10-0.9.0.1にし、kafka_2両方(KafkaServerStartableに実行します.11-0.9.0.1)ハングの問題。しかし、同じ単体テストでは、kafka_2.9.2でkafkaサーバーがハングすることはありませんでした。

問題を手伝ってもらえますか? 何も分からないのですか?

+0

クラスパスを確認して、2.10バージョンのscalaライブラリが存在するかどうかを確認してください。これは、バージョンの競合が原因である可能性があります。 – amethystic

+0

* ScalaObject not foundの問題を*と*どこで*実行していますか?あなたのカスタムアプリケーションでは、Storm/StormのKafkaスパウトで...? –

+0

私はstorm kafka spoutテストを実行しているときに、KafkaServerStartableを使って単体テスト用のkafkaサーバを起動します。以下の設定では、起動してKafkaConfigエラーになります。 – DivH

答えて

0

これは答えではありません。私の質問のフォローアップ: これは、既存のコードがテストサーバーを起動するために使用されたカフカの設定です:私が試し 依存関係:

<groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> # in this, scala 11.4, 11.7 used alternativel to verify <version>0.9.0.1</version>

<groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> # in this scala 10.4 is used <version>0.9.0.1</version>

公共KafkaTestServer(int型ポート、 ZookeeperTestServer zkServer、String brokerId、int defaultPartitionCount)例外をスローします。{ this.zkServer = zkServer;

KafkaConfig config = getKafkaConfig(zkServer.getConnectString(), port, brokerId, defaultPartitionCount); 
    kafkaServer = new KafkaServerStartable(config); 

    kafkaServer.startup(); 
    ProducerConfig conf = new ProducerConfig(getProducerConfig(getKafkaBrokerString())); 
    producer = new Producer<>(conf); 

} 

private KafkaConfig getKafkaConfig(String zkConnectString, int port, String brokerId, int defaultPartitionCount) { 
    Properties props = new Properties(); 
    props.setProperty("zookeeper.connect", zkConnectString); 
    props.setProperty("broker.id", brokerId); 
    props.setProperty("port", Integer.toString(port)); 
    createKafkaDataDirectory(); 
    props.setProperty("log.dirs", dataDirectory.getAbsolutePath()); 
    props.setProperty("num.partitions", Integer.toString(defaultPartitionCount)); 
    props.setProperty("retry.backoff.ms", "500"); 

    return new KafkaConfig(props, false); 
} 
関連する問題