2017-07-06 11 views
1

N個の非同期タイムスタンプ付きデータストリームがあります。各ストリームは固定されたレートを持ちます。私はすべてのデータを処理したいが、キャッチすることは、データが可能な限り到着した時刻に近い順番でデータを処理する必要があることです(リアルタイムのストリーミングアプリケーションです)。N個のデータストリームを時間ソートするアルゴリズム

これまでのところ、Kメッセージの固定ウィンドウを作成して、優先度キューを使用してタイムスタンプでソートしました。次に、次のウィンドウに移動する前に、このキューの全体を順番に処理します。これは問題ありませんが、バッファのサイズに比例して遅延が発生し、バッファの終わりが処理された直後にメッセージが到着した場合にメッセージを廃棄することがあります。

// Priority queue keeping track of the data in timestamp order. 
ThreadSafeProrityQueue<Data> q; 
// Fixed buffer size 
int K = 10; 
// The last successfully processed data timestamp 
time_t lastTimestamp = -1; 

// Called for each of the N data streams asyncronously 
void receiveAsyncData(const Data& dat) { 
    q.push(dat.timestamp, dat); 
    if (q.size() > K) { 
     processQueue(); 
    } 
} 

// Process all the data in the queue. 
void processQueue() { 
    while (!q.empty()) { 
     const auto& data = q.top(); 
     // If the data is too old, drop it. 
     if (data.timestamp < lastTimestamp) { 
      LOG("Dropping message. Too old."); 
      q.pop(); 
      continue; 
     } 
     // Otherwise, process it. 
     processData(data); 
     lastTimestamp = data.timestamp; 
     q.pop(); 
    } 
} 

データについての情報::彼らは自身のストリーム内でソートすることが保証されていることは次のようになります。彼らの料金は5〜30hzです。それらは画像やその他のデータで構成されています。

これが表示されるよりも難しい理由のいくつかの例です。

(stream, time) 
(A, 2) 
(B, 1.5) 
(A, 3) 
(B, 2.5) 
(A, 4) 
(B, 3.5) 
(A, 5) 

を参照してください。私は、私はそれらを受け取ったときのために、データを処理した場合にどのように、Bが希望常に:私は二つの流れ、AとBの両方1Hzで実行していると私は、次の順序でデータを取得すると仮定落ちる?これは私が避けたいものです。私のアルゴリズムでは、Bは10フレームごとに落とされ、過去10フレームの遅れでデータを処理します。

+0

アプリケーションはマルチスレッドですか? (そうでない場合は、なぜですか?) – rici

+0

はい、受信側はデータを順番に処理する必要があります。私は特定のアプリケーションに名前をつけませんが、N個のストリーミングビデオソースのようなものを、時間同期した方法で画面に描くようなものとして想像することができます。 – mklingen

+0

各ソースは順序付きストリームを生成することが保証されていますか? – rici

答えて

0

私はプロデューサ/コンシューマ構造を提案します。各ストリームにデータをキューに入れ、別のスレッドでキューを読み取ります。つまり:

// your asynchronous update: 
void receiveAsyncData(const Data& dat) { 
    q.push(dat.timestamp, dat); 
} 

// separate thread that processes the queue 
void processQueue() 
{ 
    while (!stopRequested) 
    { 
     data = q.pop(); 
     if (data.timestamp >= lastTimestamp) 
     { 
      processData(data); 
      lastTimestamp = data.timestamp; 
     } 
    } 
} 

これにより、バッチ処理中に現在の実装で表示される「遅れ」を防ぐことができます。

processQueue機能は、別個の永続スレッドで実行されています。 stopRequestedは、シャットダウンするときにプログラムが設定するフラグで、スレッドを強制的に終了させます。一部の人はこれにvolatileフラグを使用します。私はマニュアルリセットイベントのようなものを使うことを好みます。

これを行うには、同時更新を許可する優先度キューの実装が必要です。または、同期化ロックでキューをラップする必要があります。特に、キューが空の場合にq.pop()が次の項目を待つことを確認する必要があります。または、キューが空の場合はq.pop()と決して呼ばないでください。私はあなたのThreadSafePriorityQueueの詳細を知らないので、あなたがそれをどのように書くか正確に言うことはできません。

タイムスタンプチェックは、それより前のアイテムよりも後のアイテムを処理する可能性があるため、まだ必要です。例:

  1. イベントはデータストリーム1から受信されますが、スレッドはスワップアウトされてキューに追加されます。
  2. イベントはデータストリーム2から受信され、キューに追加されます。
  3. データストリーム2からのイベントは、processQueue関数によってキューから削除されます。
  4. 上記のステップ1からのスレッドは別のタイムスライスを取得し、アイテムがキューに追加されます。

これは珍しいことではなく、まれにしかありません。時間差は通常、マイクロ秒のオーダーになります。

定期的にアップデートが届かない場合は、人為的な遅延を導入することができます。たとえば、更新された質問では、メッセージの順序が500ミリ秒遅れて表示されます。 500ミリ秒があなたがサポートしたいと思う最大の公差であると仮定しよう。つまり、メッセージが500ミリ秒以上遅れて到着した場合、メッセージは破棄されます。

優先度キューに追加するときに、タイムスタンプに500ミリ秒を追加します。つまり:

q.push(AddMs(dat.timestamp, 500), dat); 

ループを処理するループでは、タイムスタンプの前に何かをデキューしません。ような何か:

while (true) 
{ 
    if (q.peek().timestamp <= currentTime) 
    { 
     data = q.pop(); 
     if (data.timestamp >= lastTimestamp) 
     { 
      processData(data); 
      lastTimestamp = data.timestamp; 
     } 
    } 
} 

これは、すべての項目の処理で500ミリ秒の遅延を導入するが、それは500ミリ秒のしきい値に収まる「後半」のアップデートを落とす防ぐことができます。 「リアルタイム」アップデートの希望と、アップデートの破棄を防ぐための欲求とのバランスを取る必要があります。

+0

申し訳ありませんが、これは私がすでに書いたものです。これにより、常に同期外のメッセージが削除されます。なぜこれが現れているよりも難しいのかの例で私の質問を修正します。 – mklingen

+0

@mklingen:私の更新を参照してください。 –

+0

人工遅延は完全に私の問題を解決する正しい方法です。 – mklingen

0

常に遅れがあり、その遅れは、あなたの最も遅い「固定値レート」ストリームをどれくらい待つかによって決まります。

提案:

  1. バッファ
  2. を保つ意味を持つブールフラグのアレイを保つ:
  3. 「バッファに少なくともサンプルストリームIXそこから発信される位置IXが真である場合、」
  4. ソート/プロセスとすぐにフル証拠はない真の

へのすべての旗を持っているとして(各バッファがソートされますが、別のバッファからあなたは、タイムスタンプの反転を有していてもよい)が、十分に多分良いですか?

処理をトリガするための「満たされた」フラグのカウントを使用して(ステップ3で)、ラグを小さくすることができますが、より多くのバッファ間タイムスタンプ逆転のリスクがあります。極端な場合、1つのフラグが満たされているだけの処理を受け入れるということは、「フレームを受け取るとすぐにフレームをプッシュし、タイムスタンプの並べ替えを行うこと」を意味します。
ラグ/タイムスタンプの反転バランスがあなたの問題に内在するという私の感情をサポートするためにこれを言及しました - 絶対的に等しいフレームレートを除いて、側面の1つを犠牲にしない完璧な解決策があります。

「解決策」はバランスを取る行為であるため、解決策(例えば「フラグの配列」)を助けるために余分な情報を収集/使用する必要があります。私が提案したことがあなたの事例では馬鹿げているのであれば(あなたが選んだ詳細はあまり多くありません)、目標とするレベルの「質の高さ」に関連するメトリクスを考えてから、これらの指標を収集/処理/使用する手助けをします。

+0

これは正しい解決策になると思います。それを試して、それがどのように行くかを教えてくれるでしょう。 – mklingen

関連する問題