プロデューサとしてクライアントを作成しようとしています。 私は新しいWin32コンソールプロジェクトを作成するための例を続けました。 プログラムの最後にgetline()関数を追加しない限り、私のAPIが動作しないことがわかりました。Windows上でlibrdkafkaのKAFKAのプロデューサAPIとC++コードを使用する方法
getline()を削除しても、produce(..)メソッドは依然として成功の結果を返します。 しかし、私はkafka-console-consumerのコマンドウィンドウで応答を見ることができません
私はちょっと混乱しました。そうですか? getline()を使用せずにメッセージを送信するにはどうすればよいですか? 誰でも知っていますか?
私はそれが動作しない理由を知った。 プロデューサオブジェクトを削除するには速すぎます プロデューサがメッセージをブローカに送信できない原因となります。
生産メソッドと削除プロデューサオブジェクトの間にスリープ1000を追加すると、 プロデューサはメッセージを正しく送信できます。
したがって、質問はすぐにメッセージを送信する方法です。 プロデューサオブジェクトを破棄する前に、これらのメッセージが完全に送信されたことを確認するにはどうすればよいですか?
この問題を解決するには、実際に私はいくつかのsleep()をソースコードに追加したくありません。
win10 + vs2015 + kafka_2.10-0.9.0.1 +飼育係-3.4.6 + librdkafka 次のコード
// kafka_test_win32_nomfc.cpp
//
#include "stdafx.h"
#include <iostream>
#include "librdkafka/rdkafkacpp.h"
int static producer_1()
{
std::string brokers = "127.0.0.1";
std::string errstr;
std::string topic_str = "linli";
std::string mode;
std::string debug;
int32_t partition = RdKafka::Topic::PARTITION_UA;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
bool do_conf_dump = false;
int opt;
// MyHashPartitionerCb hash_partitioner;
int use_ccb = 0;
/*
* Create configuration objects
*/
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
conf->set("metadata.broker.list", brokers, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
exit(1);
}
std::cout << "% Created producer " << producer->name() << std::endl;
/*
* Create topic handle.
*/
RdKafka::Topic *topic = NULL;
if (!topic_str.empty()) {
topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
if (!topic) {
std::cerr << "Failed to create topic: " << errstr << std::endl;
exit(1);
}
}
RdKafka::ErrorCode resp = producer->produce(topic, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
const_cast<char *>("hello worlf"), 11,
NULL, NULL);
delete topic;
delete producer;
return 0;
}
int static producer_2()
{
std::string brokers = "127.0.0.1";
std::string errstr;
std::string topic_str = "linli";
std::string mode;
std::string debug;
int32_t partition = RdKafka::Topic::PARTITION_UA;
int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
bool do_conf_dump = false;
int opt;
// MyHashPartitionerCb hash_partitioner;
int use_ccb = 0;
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
conf->set("metadata.broker.list", brokers, errstr);
RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
if (!producer) {
std::cerr << "Failed to create producer: " << errstr << std::endl;
exit(1);
}
std::cout << "% Created producer " << producer->name() << std::endl;
RdKafka::ErrorCode resp = producer->produce(topic_str, partition,
RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
(void *)"hi", 2,
NULL, 0, 0, NULL);
std::string errs(RdKafka::err2str(resp));
std::cout << errs << std::endl;
//producer->poll(0);
delete producer;
return 0;
}
int main()
{
producer_2();
return 0;
}
再生いただきありがとうございます。実際には、プロデューサをマルチスレッドソケットサーバーアプリケーションに統合したいと考えています。プロデューサオブジェクトをソケットワークスレッドに入れる準備ができているので、プロデューサは長寿命スレッドでは動作しません。別の方法では、あなたが言及したflush()メソッドを試しました、私はそれをproducerメソッドとdeleteメソッドの間に追加しましたが、kafka-console-consumerはまだ肯定応答を受け取ることができません。これは、アプリケーションが終了する前にメッセージが送信されることを、flush()が確認できないことを証明します。 – CodeOverflow
プロデューサが長寿命のスレッドになければならない場合、プロデューサスレッドを管理するための新しいスレッドプールを作成する必要があるため、統合は非常に複雑になります。もっと良い解決策はありますか? – CodeOverflow
librdkafkaはスレッドセーフです。複数のアプリケーションスレッドから同じProducerインスタンスを使用できます(また使用する必要があります)。あなたが通常行うことは、アプリケーションスレッドがproduce()を呼び出させ、別のアプリケーションスレッドを作成する(またはメインスレッドのループを使用して)poll()に配信レポートやその他のコールバックを提供することです。 – Edenhill