N個の非同期タイムスタンプ付きデータストリームがあります。各ストリームは固定されたレートを持ちます。私はすべてのデータを処理したいが、キャッチすることは、データが可能な限り到着した時刻に近い順番でデータを処理する必要があることです(リアルタイムのストリーミングアプリケーションです)。N個のデータストリームを時間ソートするアルゴリズム
これまでのところ、Kメッセージの固定ウィンドウを作成して、優先度キューを使用してタイムスタンプでソートしました。次に、次のウィンドウに移動する前に、このキューの全体を順番に処理します。これは問題ありませんが、バッファのサイズに比例して遅延が発生し、バッファの終わりが処理された直後にメッセージが到着した場合にメッセージを廃棄することがあります。
// Priority queue keeping track of the data in timestamp order.
ThreadSafeProrityQueue<Data> q;
// Fixed buffer size
int K = 10;
// The last successfully processed data timestamp
time_t lastTimestamp = -1;
// Called for each of the N data streams asyncronously
void receiveAsyncData(const Data& dat) {
q.push(dat.timestamp, dat);
if (q.size() > K) {
processQueue();
}
}
// Process all the data in the queue.
void processQueue() {
while (!q.empty()) {
const auto& data = q.top();
// If the data is too old, drop it.
if (data.timestamp < lastTimestamp) {
LOG("Dropping message. Too old.");
q.pop();
continue;
}
// Otherwise, process it.
processData(data);
lastTimestamp = data.timestamp;
q.pop();
}
}
データについての情報::彼らは自身のストリーム内でソートすることが保証されていることは次のようになります。彼らの料金は5〜30hzです。それらは画像やその他のデータで構成されています。
これが表示されるよりも難しい理由のいくつかの例です。
(stream, time)
(A, 2)
(B, 1.5)
(A, 3)
(B, 2.5)
(A, 4)
(B, 3.5)
(A, 5)
を参照してください。私は、私はそれらを受け取ったときのために、データを処理した場合にどのように、Bが希望常に:私は二つの流れ、AとBの両方1Hzで実行していると私は、次の順序でデータを取得すると仮定落ちる?これは私が避けたいものです。私のアルゴリズムでは、Bは10フレームごとに落とされ、過去10フレームの遅れでデータを処理します。
アプリケーションはマルチスレッドですか? (そうでない場合は、なぜですか?) – rici
はい、受信側はデータを順番に処理する必要があります。私は特定のアプリケーションに名前をつけませんが、N個のストリーミングビデオソースのようなものを、時間同期した方法で画面に描くようなものとして想像することができます。 – mklingen
各ソースは順序付きストリームを生成することが保証されていますか? – rici