2017-06-10 11 views
1

プロデューサとしてクライアントを作成しようとしています。 私は新しいWin32コンソールプロジェクトを作成するための例を続けました。 プログラムの最後にgetline()関数を追加しない限り、私のAPIが動作しないことがわかりました。Windows上でlibrdkafkaのKAFKAのプロデューサAPIとC++コードを使用する方法

getline()を削除しても、produce(..)メソッドは依然として成功の結果を返します。 しかし、私はkafka-console-consumerのコマンドウィンドウで応答を見ることができません

私はちょっと混乱しました。そうですか? getline()を使用せずにメッセージを送信するにはどうすればよいですか? 誰でも知っていますか?

私はそれが動作しない理由を知った。 プロデューサオブジェクトを削除するには速すぎます プロデューサがメッセージをブローカに送信できない原因となります。

生産メソッドと削除プロデューサオブジェクトの間にスリープ1000を追加すると、 プロデューサはメッセージを正しく送信できます。

したがって、質問はすぐにメッセージを送信する方法です。 プロデューサオブジェクトを破棄する前に、これらのメッセージが完全に送信されたことを確認するにはどうすればよいですか?

この問題を解決するには、実際に私はいくつかのsleep()をソースコードに追加したくありません。

win10 + vs2015 + kafka_2.10-0.9.0.1 +飼育係-3.4.6 + librdkafka 次のコード

// kafka_test_win32_nomfc.cpp 
// 

#include "stdafx.h" 
#include <iostream> 
#include "librdkafka/rdkafkacpp.h" 


int static producer_1() 
{ 
    std::string brokers = "127.0.0.1"; 
    std::string errstr; 
    std::string topic_str = "linli"; 
    std::string mode; 
    std::string debug; 
    int32_t partition = RdKafka::Topic::PARTITION_UA; 
    int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING; 
    bool do_conf_dump = false; 
    int opt; 
    // MyHashPartitionerCb hash_partitioner; 
    int use_ccb = 0; 

    /* 
    * Create configuration objects 
    */ 
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); 
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); 

    conf->set("metadata.broker.list", brokers, errstr); 

    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); 
    if (!producer) { 
     std::cerr << "Failed to create producer: " << errstr << std::endl; 
     exit(1); 
    } 

    std::cout << "% Created producer " << producer->name() << std::endl; 

    /* 
    * Create topic handle. 
    */ 
    RdKafka::Topic *topic = NULL; 
    if (!topic_str.empty()) { 
     topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr); 
     if (!topic) { 
      std::cerr << "Failed to create topic: " << errstr << std::endl; 
      exit(1); 
     } 
    } 

    RdKafka::ErrorCode resp = producer->produce(topic, partition, 
     RdKafka::Producer::RK_MSG_COPY /* Copy payload */, 
     const_cast<char *>("hello worlf"), 11, 
     NULL, NULL); 

    delete topic; 
    delete producer; 
    return 0; 
} 


int static producer_2() 
{ 
    std::string brokers = "127.0.0.1"; 
    std::string errstr; 
    std::string topic_str = "linli"; 
    std::string mode; 
    std::string debug; 
    int32_t partition = RdKafka::Topic::PARTITION_UA; 
    int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING; 
    bool do_conf_dump = false; 
    int opt; 
    // MyHashPartitionerCb hash_partitioner; 
    int use_ccb = 0; 

    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL); 
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC); 

    conf->set("metadata.broker.list", brokers, errstr); 

    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr); 
    if (!producer) { 
     std::cerr << "Failed to create producer: " << errstr << std::endl; 
     exit(1); 
    } 

    std::cout << "% Created producer " << producer->name() << std::endl; 

    RdKafka::ErrorCode resp = producer->produce(topic_str, partition, 
     RdKafka::Producer::RK_MSG_COPY /* Copy payload */, 
     (void *)"hi", 2, 
     NULL, 0, 0, NULL); 



    std::string errs(RdKafka::err2str(resp)); 
    std::cout << errs << std::endl; 
    //producer->poll(0); 


    delete producer; 

    return 0; 
} 


int main() 
{ 

    producer_2(); 

    return 0; 
} 

答えて

1

をチェックしてくださいlibrdkafkaプロデュース()API(CとC++の両方に)されています非同期では、メッセージは最初は内部プロデューサキューにのみエンキューされ、後で(queue.buffering.max.msコンフィグレーションプロパティを参照 - デフォルトは1秒)、他のメッセージとメッセージバッチ(MessageSet)を組み合わせてバックグラウンドスレッドからブローカに送信されます。

プログラムはproduce()を呼び出して、すぐに終了します。バックグラウンドプロデューサスレッドがメッセージをブローカに送信する機会があるとすぐに、ブローカからの確認を受けることは少なくなります。

すべての未処理のメッセージが送信されたことを確認するには、アプリケーションを終了する前にflush()に電話してください。

アプリケーションが長期間使用されている場合は、登録した配信レポートのコールバックを配信するために、定期的にpoll()に電話する必要があります。

+0

再生いただきありがとうございます。実際には、プロデューサをマルチスレッドソケットサーバーアプリケーションに統合したいと考えています。プロデューサオブジェクトをソケットワークスレッドに入れる準備ができているので、プロデューサは長寿命スレッドでは動作しません。別の方法では、あなたが言及したflush()メソッドを試しました、私はそれをproducerメソッドとdeleteメソッドの間に追加しましたが、kafka-console-consumerはまだ肯定応答を受け取ることができません。これは、アプリケーションが終了する前にメッセージが送信されることを、flush()が確認できないことを証明します。 – CodeOverflow

+0

プロデューサが長寿命のスレッドになければならない場合、プロデューサスレッドを管理するための新しいスレッドプールを作成する必要があるため、統合は非常に複雑になります。もっと良い解決策はありますか? – CodeOverflow

+0

librdkafkaはスレッドセーフです。複数のアプリケーションスレッドから同じProducerインスタンスを使用できます(また使用する必要があります)。あなたが通常行うことは、アプリケーションスレッドがproduce()を呼び出させ、別のアプリケーションスレッドを作成する(またはメインスレッドのループを使用して)poll()に配信レポートやその他のコールバックを提供することです。 – Edenhill

関連する問題