2013-08-14 13 views
12

hereKafka 0.8では、Javaコードを使用してパーティションとレプリケーションでトピックを作成できますか?カフカで

bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 2 --partition 3 --topic test 

述べたようにトピックは以下のようなコマンドを使用して作成することができます0.8beta上記のコマンドは、3つのパーティションおよびパーティションごとに2つのレプリカと「テスト」という名前のトピックを作成します。

Javaを使用して同じことを実行できますか?

これまでのところ私が見つけたのJavaを使用している

Producer<String, String> producer = new Producer<String, String>(config); 
    producer.send(new KeyedMessage<String, String>("mytopic", msg)); 

の下に見られるように、私たちは、これは、パーティションの数は「num.partitions」属性を使用して指定して「mytopic」という名前のトピックを作成しますプロデューサーを作成することができます生産を開始する。

しかし、パーティションとレプリケーションも定義する方法はありますか?そのような例は見つけられませんでした。もしそれができないならば、以前はパーティションと複製を持つトピックを作成してから、そのトピック内でメッセージを生成するためにプロデューサを使用する必要があります。たとえば、同じ方法で(ただしnum.partitions属性をオーバーライドして)異なる数のパーティションを使用して「mytopic」を作成したい場合は可能でしょうか?

+1

.partitions'属性を設定ファイルに追加します。カスタムパーティションでトピックを作成するには、上記のようにコンソールスクリプトを使用するしかないと思います。その場合、トピックは前に作成されなければならず、次にプロデューサは同じものを作成することができます。 – Hild

+0

私は以下のリンク[Javaを使用してカフカトピックを作成]で同じ問題に応答しました(http://stackoverflow.com/questions/16946778/how-can-we-create-a-topic-in-kafka-from-the -ide-using-api/18480684#18480684)さらにヘルプやコードスニペットが必要な場合。コードをMaven依存関係と共有することを教えてください – Biks

答えて

8

注:私の答えはカフカ0.8.1+、4月2014

のように利用できる、すなわち、最新の安定版は、はい、あなたはカフカのAPIを介してプログラム的トピックを作成することができますカバーしています。もちろん、トピックの複製ファクタだけでなく、必要なパーティション数も指定できます。

最近リリースされたKafka 0.8.1+は、Kafka 0.8.0(Biksのリンク先の返信で使用されていたAPI)とは少し異なるAPIを提供しています。 Biksが上記を参照していた質問How Can we create a topic in Kafka from the IDE using APIに私の返信にcode example to create a topic in Kafka 0.8.1+を追加しました。私は、トピックを作成することができますが、パーティションの数が `NUMとして挙げられた値に基づいて行われます` kafka.javaapi.producer.Producer`のAPIを使用して、これまでにIS見つけたもの

+0

この例題はJavaでは動作しませんが、少なくともコンパイル方法を理解することはできません。 – quux00

+0

あなたが正しいです - サンプルコードはScalaにあります。残念ながら、あなたのJavaコードにコピーして貼り付けることはできません。 –

0

`

import kafka.admin.AdminUtils; 
import kafka.cluster.Broker; 
import kafka.utils.ZKStringSerializer$; 
import kafka.utils.ZkUtils; 

String zkConnect = "localhost:2181"; 
ZkClient zkClient = new ZkClient(zkConnect, 10 * 1000, 8 * 1000, ZKStringSerializer$.MODULE$); 
ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(zkConnect), false); 
    Properties pop = new Properties(); 
    AdminUtils.createTopic(zkUtils, topic.getTopicName(), topic.getPartitionCount(), topic.getReplicationFactor(), 
      pop); 
    zkClient.close();` 
関連する問題