私たちが知っているように、カーフカのTopic
の作成はサーバ初期化部分で処理する必要があります。ここではデフォルトのスクリプト./kafka-topics --zookeeper ...
を使用しますが、トピックを動的に作成する必要がある場合はどうなりますか?Apache Kafkaはコードからトピックを作成します
答えて
幸いにも、Kafka 0.10.1.0
が私たちにこの能力をもたらしました。私はConfluence Jiraのこれらの魅力的な機能を見ましたが、トピックに関連するドキュメンテーションを見つけることができませんでした、皮肉ですね。
私はソースコードに行き、その場でトピックを作成する方法を見つけました。うまくいけば、それはあなたのために役立つでしょう。もちろん、あなたがより良い解決策を持っているなら、私たちと分かち合うことをためらってください。
さて、始めましょう。
/** The method propagate topics **/
public List<String> propagateTopics(int partitions, short replication, int timeout) throws IOException {
CreateTopicsRequest.TopicDetails topicDetails = new CreateTopicsRequest.TopicDetails(partitions, replication);
Map<String, CreateTopicsRequest.TopicDetails> topicConfig = mTopics.stream()
.collect(Collectors.toMap(k -> k, v -> topicDetails)); // 1
CreateTopicsRequest request = new CreateTopicsRequest(topicConfig, timeout); // 2
try {
CreateTopicsResponse response = createTopic(request, BOOTSTRAP_SERVERS_CONFIG); // 3
return response.errors().entrySet().stream()
.filter(error -> error.getValue() == Errors.NONE)
.map(Map.Entry::getKey)
.collect(Collectors.toList()); // 4
} catch (IOException e) {
log.error(e);
}
return null;
}
1
我々は簡単にするために、私はすべてのトピックの中で同じコンフィグを共有しましょう、TopicDetails
のインスタンスを必要としています。たとえば、mTopics
は作成したいすべてのトピックの文字列のリストです。を取得するために必要以上
2
基本的に、我々は今、私たちは、そのための特別なクラスを持って、私たちのカフカクラスタにリクエストを送信したい - 私たちは要求を送信し、CreateTopicsResponse
CreateTopicsRequest
とタイムアウト
3
を受け入れること
private static final short apiKey = ApiKeys.CREATE_TOPICS.id;
private static final short version = 0;
private static final short correlationId = -1;
private static CreateTopicsResponse createTopic(CreateTopicsRequest request, String client) throws IllegalArgumentException, IOException {
String[] comp = client.split(":");
if (comp.length != 2) {
throw new IllegalArgumentException("Wrong client directive");
}
String address = comp[0];
int port = Integer.parseInt(comp[1]);
RequestHeader header = new RequestHeader(apiKey, version, client, correlationId);
ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf());
header.writeTo(buffer);
request.writeTo(buffer);
byte byteBuf[] = buffer.array();
byte[] resp = requestAndReceive(byteBuf, address, port);
ByteBuffer respBuffer = ByteBuffer.wrap(resp);
ResponseHeader.parse(respBuffer);
return CreateTopicsResponse.parse(respBuffer);
}
private static byte[] requestAndReceive(byte[] buffer, String address, int port) throws IOException {
try(Socket socket = new Socket(address, port);
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
DataInputStream dis = new DataInputStream(socket.getInputStream())
) {
dos.writeInt(buffer.length);
dos.write(buffer);
dos.flush();
byte resp[] = new byte[dis.readInt()];
dis.readFully(resp);
return resp;
} catch (IOException e) {
log.error(e);
}
return new byte[0];
}
これはまったく魔法ではなく、要求を送信し、バイトストリームを応答に解析するだけです。
4
CreateTopicsResponse
key
はあなたが要求したトピック名であるだけMap<String, Errors>
あるプロパティerrors
を持っています。難しいのは、リクエストしたすべてのトピックが含まれていますが、エラーがないものは値Errors.None
です。そのため、応答をフィルタリングして正常に作成されたトピックのみを返します。アンドレイNechaevの拡張
Kafka 0.10.2.0に 'version = 1'を使う –
が10.2.0で
答え、CreateTopicsRequestのインスタンスを取得する方法が少し変更されました。 CreateTopicsRequestインスタンスを構築するには、Builder内部クラスを使用する必要があります。ここにコードサンプルがあります。
CreateTopicsRequest.Builder builder = new CreateTopicsRequest.Builder(topicConfig, timeout, false);
CreateTopicsRequest request = builder.build();
- 1. apache kafkaでトピックを作成するには?
- 2. イベントvsトピックApache Kafka
- 3. Kafkaプロデューサーはトピックとパーティションを作成できますか?
- 4. kafkaフィルタリング/動的トピック作成
- 5. Javaアプリケーションからapache kafkaのトピックを購読するには?
- 6. パーティションを使用したApache Kafka Scalingトピック
- 7. パブリッシャーはどのようにApache Kafkaのトピックにメッセージをパブリッシュしますか?
- 8. Kafka 0.8では、Javaコードを使用してパーティションとレプリケーションでトピックを作成できますか?カフカで
- 9. Distributed Kafka Connectトピック構成
- 10. nodejsにパーティションを持つkafkaトピックを作成するには?
- 11. C++コード内のkafkaからトピックを削除
- 12. Kafka Streamsがスキーマなしでavroトピックを作成する
- 13. トピックを作成するjava - kafkaバージョン> 0.10.0.0
- 14. iOSからKafkaトピックにメッセージを送信
- 15. Kafkaトピックからバイナリデータを読み込みます。
- 16. Spark StreamingのKafkaトピックから2つのDStreamを作成できない
- 17. Kafka Connector - Kafka用のJMSSourceConnectorトピック
- 18. Kafka 0.9 - java apiを使用してトピックを作成するには
- 19. Apache Camel - Kafkaプロデューサー:トピック名を動的に設定する
- 20. Apache KafkaトピックとストリームデータをHiveテーブルに登録する方法
- 21. Apache Kafka 0.10.0 API with Javaを使用してKafkaブローカクラスタを作成する
- 22. Confluent kafka C#ライブラリのKafkaトピックから最新のオフセットを取得するには?
- 23. Apache Kafka StreamsトピックへのKTablesのマテリアライズが遅いようです
- 24. Kafka 1.0.0管理クライアントはEOFExceptionでトピックを作成できません
- 25. Kafkaは多数のパーティション(64k)でトピックを作成できません
- 26. Apache Kafka:トピックの消費者グループを見つける方法
- 27. Apache Kafka:トピックのメッセージの有無を確認
- 28. server.propertiesのパーティション数とトピック作成時のパーティション数のあいまいさApacheのkafkaのパーティションパラメータ
- 29. Javaコードでkafkaトピックにメッセージを送信できません
- 30. Filter-Interceptor-Kafkaのトピック
もっと良い方法があれば、あなたの質問は何ですか? –
@NickVanderhovenそれは答えを探している人のためのヒントによく似ています。私はドキュメンテーションやここで答えを見つけることができませんでした –
@Andrey:これを質問として編集してください。「実行時にApache Kafkaのトピックを作成するにはどうすればよいですか?自分の投稿への返信はOKです。 –