さまざまなストリーミングソースから多数のJSONドキュメントを取得するMongoDBコレクションがあります。他の言葉では、一連のMongoDBコレクションにデータを連続的に挿入する多数のプロセスがあります。MongoDbリアルタイムで(またはほぼリアルタイムで)挿入されたデータをストリーミング
MongoDBからダウンストリームアプリケーションにデータをストリームする方法が必要です。
App Stream1 -->
App Stream2 --> MONGODB ---> Aggregated Stream
App Stream3 -->
ORこの:だから私は概念的にはこのようになりますシステムたく
App Stream1 --> ---> MongoD Stream1
App Stream2 --> MONGODB ---> MongoD Stream2
App Stream3 --> ---> MongoD Stream3
を質問は、私は継続的にデータベースを照会/ポーリングすることなく、モンゴの外にデータをストリームはどうすればよいのですか?
明白な疑問の答えは、「なぜあなたはその後、彼らはこのように一度ごモンゴストリーミングプロセスやモンゴに送っているウサギ、ゼロまたはActiveMQのようなキューにメッセージを送信するために、これらのアプリのストリーミングプロセスを変更いけない」のようになります。
理想的な世界では MONGODB
/|\
|
App Stream1 --> | ---> MongoD Stream1
App Stream2 --> SomeMQqueue ---> MongoD Stream2
App Stream3 --> ---> MongoD Stream3
はい、それは良いでしょうが、私たちは、メッセージが最初に保存されていることを確認するために、重複を避けるために、そしてMongoの永続性として真ん中に座って持っているIDがすべてなどに生成されていることを確認モンゴを必要とします層。
そうですね、これらのダウンストリームアプリケーションにMongoコレクション(GridFSなどを使用しない)からメッセージをストリーミングするにはどうすればいいですか?基本的な考え方は、新しい文書をポーリングするだけであり、収集される各文書は、処理されたタイムスタンプを格納するSQLテーブルの処理フラグと同様に、データベースに格納されたJSON文書に別のフィールドを追加することによって更新されます。私。処理されたドキュメント== null .... = 1(追加された= now()....ドキュメントを1秒ごとにポーリングします。
計算効率の良い方法がありますか?
FYI - これはすべてJavaプロセスです。
乾杯!
リンクありがとうございます。残念ながらキャッピングされたコレクションを使用していませんが、メッセージングサービスでは悪い機能ではありません。処理されたフラグとポーリングのインデックスのような音が唯一のオプションです...インデックス項目がヌルであるかどうかはまだインデックスで参照されていますか? – NightWolf
または私は、キャッシュのような固定サイズの動作でキャップ付きのコレクションを持つことができ、その後アイテムを1つの購入から取り出して通常のコレクションに戻すことができます。次に、アプリの実行の間に位置カーソルを保存する方法が問題になります。私は、Mongoの自動_idフィールドを使用して、そのIDフィールドよりも大きいものすべてを選択すると仮定しています...すべてのmongoが_IDを昇順で生成していますか? – NightWolf
インデックスは 'null'のエントリを格納します。あなたがキャップを付けられたコレクションをテールしている場合は、最後に見たエントリを保存する必要があります(これを保存することもできますが、別のmongoコレクションを使っても問題ありません)。 'と' skip(1) 'を再開します。 http://www.mongodb.org/display/DOCS/Advanced+Queries#AdvancedQueries-%24minand%24maxを参照してください。 – dcrosta