apache-kafka-streams

    2

    1答えて

    私はカフカを使ってログイベントを処理しています。私は単純なコネクタとストリーム変換のためのKafka ConnectとKafka Streamsの基礎知識を持っています。 今は次の構造を持つログファイルがある: : timestamp event_id event ログイベントはEVENT_ID(例えばメールログ) 例により接続されている複数のログ・ラインを有します 1234 1 START

    2

    1答えて

    DSMSはデータストリーム管理システムに対応しています。これらのシステムにより、ユーザは、ユーザによって削除されるまで連続して実行されるクエリを提出することができる。 StormやFlinkなどのシステムをDSMSと見なすことができますか、それともより一般的なものですか?彼らは異なるユースケースを解決しようとして おかげ

    7

    1答えて

    2つのカフカトピックが異なるソースから正確に同じコンテンツをストリーミングするので、ソースのいずれかが失敗した場合に高い可用性を得ることができます。 Kafka Streams 0.10.1.0を使用して、2つのトピックを1つの出力トピックにマージしようとしています。そのため、すべてのソースが上がっても重大なエラーメッセージは表示されません。 KStreamのleftJoinメソッドを使用すると、

    3

    1答えて

    私はKafka StreamsでPoCを開発しています。ストリームコンシューマでオフセット値を取得し、それを使用してメッセージごとに一意の鍵(topic-offset)->hashを生成する必要があります。その理由は、プロデューサはsyslogであり、IDの数はわずかです。再処理の場合、同じキーを再生成する必要があるため、私はコンシューマにUUIDを生成できません。 私の問題は:org.apach

    3

    1答えて

    StateStoreとやり取りしてメッセージをフィルタ処理して複雑なロジックを実行するプロセッサがあります。 process(key,value)メソッドでは、私はcontext.forward(key,value)を使用して、必要なキーと値を送信します。デバッグ目的のために、私もそれらを印刷します。 私は2つの他のストリームのジョインした結果KStream mergedStreamを持っています

    1

    1答えて

    2つのトピック(外部結合を使用してマージ)から得られるデータを再配列する必要があります。 StateStoreを使用して、最新のシーケンスを保持し、再シーケンスされたメッセージでダウンストリームのストリーム値を変更することをお勧めします。 簡素化問題:は (トピックAから配列、トピックBからSEQ) - (10,100)(StateStoreにおける現在のシーケンスを維持する)を出力する>新規配列

    1

    1答えて

    Kafka Streams(バージョン0.10.0.1)とKafka Broker(0.10.0.1)を使用しています。メッセージキーに基づいてカウントを生成しようとしています。これはカフカにメッセージを送信します 1,{"value":10} 鍵を持っている:私は上記のコマンドを実行すると ./bin/kafka-console-producer.sh --broker-list local

    1

    1答えて

    同様に、この質問と同じように少し異なります:KStream batch process windows、私はKStreamからのメッセージを消費者にプッシュダウンする前にバッチしたいと思います。 ただし、このプッシュダウンは、固定された時間枠でスケジュールするのではなく、キーごとに固定メッセージ数のしきい値に設定する必要があります。まず第2質問について が頭に浮かぶ: 1)これは、処理されるべき方

    4

    1答えて

    Apache Kafka 0.9と0.10の新機能で調査中、 はKStreamsとKTablesを使用しました。興味深い事実は、KafkaがRocksDBを内部的に使用している です。 Introducing Kafka Streams: Stream Processing Made Simpleを参照してください。 RocksDBはJVN互換言語で書かれていないため、余分な共有ライブラリ(OSに

    1

    1答えて

    私のKafkaストリームアプリの1つとして、DSLとProcessor APIの両方の機能を使用する必要があります。ストリーミングアプリの流れは source -> selectKey -> filter -> aggregate (on a window) -> sink です。集計後、シンクに1つの集約メッセージを送信する必要があります。私はカスタムStateStoreを定義し、 publ