2016-12-04 16 views
4

私たちが知っているように、カーフカのTopicの作成はサーバ初期化部分で処理する必要があります。ここではデフォルトのスクリプト./kafka-topics --zookeeper ...を使用しますが、トピックを動的に作成する必要がある場合はどうなりますか?Apache Kafkaはコードからトピックを作成します

+0

もっと良い方法があれば、あなたの質問は何ですか? –

+0

@NickVanderhovenそれは答えを探している人のためのヒントによく似ています。私はドキュメンテーションやここで答えを見つけることができませんでした –

+0

@Andrey:これを質問として編集してください。「実行時にApache Kafkaのトピックを作成するにはどうすればよいですか?自分の投稿への返信はOKです。 –

答えて

5

幸いにも、Kafka 0.10.1.0が私たちにこの能力をもたらしました。私はConfluence Jiraのこれらの魅力的な機能を見ましたが、トピックに関連するドキュメンテーションを見つけることができませんでした、皮肉ですね。

私はソースコードに行き、その場でトピックを作成する方法を見つけました。うまくいけば、それはあなたのために役立つでしょう。もちろん、あなたがより良い解決策を持っているなら、私たちと分かち合うことをためらってください。

さて、始めましょう。

/** The method propagate topics **/ 
public List<String> propagateTopics(int partitions, short replication, int timeout) throws IOException { 
    CreateTopicsRequest.TopicDetails topicDetails = new CreateTopicsRequest.TopicDetails(partitions, replication); 
    Map<String, CreateTopicsRequest.TopicDetails> topicConfig = mTopics.stream() 
      .collect(Collectors.toMap(k -> k, v -> topicDetails)); // 1 

    CreateTopicsRequest request = new CreateTopicsRequest(topicConfig, timeout); // 2 

    try { 
     CreateTopicsResponse response = createTopic(request, BOOTSTRAP_SERVERS_CONFIG); // 3 
     return response.errors().entrySet().stream() 
       .filter(error -> error.getValue() == Errors.NONE) 
       .map(Map.Entry::getKey) 
       .collect(Collectors.toList()); // 4 
    } catch (IOException e) { 
     log.error(e); 
    } 

    return null; 
} 

1我々は簡単にするために、私はすべてのトピックの中で同じコンフィグを共有しましょう、TopicDetailsのインスタンスを必要としています。たとえば、mTopicsは作成したいすべてのトピックの文字列のリストです。

2基本的に、我々は今、私たちは、そのための特別なクラスを持って、私たちのカフカクラスタにリクエストを送信したい - 私たちは要求を送信し、CreateTopicsResponse

を取得するために必要以上 CreateTopicsRequestとタイムアウト

3を受け入れること

private static final short apiKey = ApiKeys.CREATE_TOPICS.id; 
    private static final short version = 0; 
    private static final short correlationId = -1; 

private static CreateTopicsResponse createTopic(CreateTopicsRequest request, String client) throws IllegalArgumentException, IOException { 
     String[] comp = client.split(":"); 
     if (comp.length != 2) { 
      throw new IllegalArgumentException("Wrong client directive"); 
     } 
     String address = comp[0]; 
     int port = Integer.parseInt(comp[1]); 

     RequestHeader header = new RequestHeader(apiKey, version, client, correlationId); 
     ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf()); 
     header.writeTo(buffer); 
     request.writeTo(buffer); 

     byte byteBuf[] = buffer.array(); 

     byte[] resp = requestAndReceive(byteBuf, address, port); 
     ByteBuffer respBuffer = ByteBuffer.wrap(resp); 
     ResponseHeader.parse(respBuffer); 

     return CreateTopicsResponse.parse(respBuffer); 
    } 

    private static byte[] requestAndReceive(byte[] buffer, String address, int port) throws IOException { 
     try(Socket socket = new Socket(address, port); 
      DataOutputStream dos = new DataOutputStream(socket.getOutputStream()); 
      DataInputStream dis = new DataInputStream(socket.getInputStream()) 
     ) { 
      dos.writeInt(buffer.length); 
      dos.write(buffer); 
      dos.flush(); 

      byte resp[] = new byte[dis.readInt()]; 
      dis.readFully(resp); 

      return resp; 
     } catch (IOException e) { 
      log.error(e); 
     } 

     return new byte[0]; 
    } 

これはまったく魔法ではなく、要求を送信し、バイトストリームを応答に解析するだけです。

4CreateTopicsResponsekeyはあなたが要求したトピック名であるだけMap<String, Errors>あるプロパティerrorsを持っています。難しいのは、リクエストしたすべてのトピックが含まれていますが、エラーがないものは値Errors.Noneです。そのため、応答をフィルタリングして正常に作成されたトピックのみを返します。アンドレイNechaevの拡張

+0

Kafka 0.10.2.0に 'version = 1'を使う –

1

が10.2.0で

答え、CreateTopicsRequestのインスタンスを取得する方法が少し変更されました。 CreateTopicsRequestインスタンスを構築するには、Builder内部クラスを使用する必要があります。ここにコードサンプルがあります。

CreateTopicsRequest.Builder builder = new CreateTopicsRequest.Builder(topicConfig, timeout, false); 
CreateTopicsRequest request = builder.build(); 
関連する問題