apache-kafka-streams

    0

    2答えて

    私はカフカで自分の手を汚し始めました。私はthisに行った。それは、カフカストリームDSLのデータ/トピック管理のみを示しています。誰もkafkaストリームのProcessor APIのための同じ種類のデータ管理のためのリンクを共有できますか?私は特別にプロセッサAPIのユーザーと内部のトピック管理に興味があります。 TopologyBuilder builder = new TopologyBu

    2

    1答えて

    私はトポロジービルダーでKafkaのStreams APIを使用しています。 あるデータ型を別のデータ型に変換できるプロセッサを、パイプライン内の次のプロセッサが使用できるようにするには、どうすればいいのか教えてください。 [topic]--(string)-->[processor: parse json]--(object)-->[processor 2]--(object)-->[sink]

    2

    1答えて

    org.apache.kafka.streams.processor.TopologyBuilder/org.apache.kafka.streams.kstream.KStreamBuilderオブジェクトは再利用可能ですか? 同じ設定のカフカストリーミングアプリケーションを再起動する機能を提供したいと思います。 は、これまでのところ、次のコードが取り組んできたが、私はドキュメント内の任意のハー

    0

    1答えて

    私は、nのメッセージをKakfaのトピックに追加し、nというメッセージが届くと、新しいトピックについてメッセージを出します。私はこれを行うためにストリームAPIを使用しており、それは簡単です。しかし、システムの信頼性のためにnを受信することはできませんが、nメッセージのx%(たとえば95%)が受信され、y秒の新しいメッセージが記録されていない場合でもメッセージを送信したいと考えています。これはカフ

    2

    1答えて

    現在、私は、カフカストリームでレコードを分類するための、直接的かつ効率的な方法を探しています。 すべてのレコードには、少なくともidとfailedのプロパティが含まれています。 (idは単なる文字列であるとfailedはブールです) アイデアは、初めに、「メッセージ」として、すべての着信記録を分類することです。 受信したレコードの1つに失敗したフィールドが設定されている場合、これはどこかに「持続」

    1

    1答えて

    私はシステムに複数のマイクロサービス(java spring boot)を構築し、grpcとavroメッセージで通信しています。カフカのキューを使用しています。 私の問題は、サービスが整っていて、私のサービスの1つが、他の人がメッセージを送信して作成する前に、カフカの話題を購読していることです。私はエラーを返さないので、うまくいくようですが、そうではありません。トピックはその時点では存在しませんで

    2

    1答えて

    私の質問のタイトルをより明確にするために、まず私の質問の目標を述べます。私の目標は私の目標です: 私の目標は、IntegerをIDとし、値としてStringを持つデータストリーム(KStream)を持つことです。この例では、値に誰かの名前を格納しています。次に、用語をキーとして格納するGlobalKTable(ストリームの値と一致する用語)とブール値フラグ(この用語に一致する名前はすべて「スパマー