2016-03-29 10 views
0

私はRxJSを試しています。RxJs - ファイルを解析し、トピックごとに行をグループ化しますが、最後が足りません

私の使用例は、ログファイルを解析してトピックごとにグループ化することです(つまり、グループの先頭がファイル名で、その後にユーザー、日付/時刻などの行があります) できますregExpを使って行を解析します。私はグループの始まりを知ることができます。

".scan"を使用して行をグループ化します。新しい行の先頭が始まったら、蓄積した行にオブザーバを作成します。

問題はファイルの最後です。私は新しいグループを始めました。私はラインを蓄積していますが、私は最後の情報を持っていないので、最後のシーケンスを起動することはできません。私は完全な(しかし、そうではない)の情報を持っていると期待していたでしょう

ここでは数字を使った例です。グループの開始3または5(発言:私はtypescriptですで働く)のマルチできる

import * as Rx from "rx"; 
 

 
let r = Rx.Observable 
 
     .range(0, 8) 
 
     .scan(function(acc: number[], value: number): number[]{ 
 
      if ((value % 3 === 0) || (value % 5 === 0)) { 
 
       acc.push(value); 
 
       let info = acc.join("."); 
 
       Rx.Observable 
 
        .fromArray(acc) 
 
        .subscribe((value) => { 
 
         console.log(info, "=>", value); 
 
        }); 
 
       acc = []; 
 
      } else { 
 
       acc.push(value); 
 
      } 
 
      return acc; 
 
     }, []) 
 
     .subscribe(function (x) { 
 
      // console.log(x); 
 
     });

このEMITを:

0 => 0 
1.2.3 => 1 
1.2.3 => 2 
1.2.3 => 3 
4.5 => 4 
4.5 => 5 
6 => 6 

私は

を放出する方法を探しています
0 => 0 
1.2.3 => 1 
1.2.3 => 2 
1.2.3 => 3 
4.5 => 4 
4.5 => 5 
6 => 6 
7.8 => 7  last items are missing as I do not know how to detect end 
7.8 => 8   

アイテムをグループ化するのに手伝ってもらえますか? スキャンを使用していなくても良いアイデアは大歓迎です。あなたはmaterialize演算子を使用することができ、事前

答えて

0

ありがとうございます。ドキュメントheremarblesここではexample of useのSOを参照してください。 *として

インポート:(いくつかの構文エラーがあるかもしれないので、私はtypescriptですについての事を知らないことに注意して、テストされていないが、うまくいけば、あなたはそれを自分で完了することができます)お使いの場合には

、私のような何かをしようとするだろう"rx"からのRx;

let r = Rx.Observable 
     .range(0, 8) 
     .materialize() 
     .scan(function(acc: number[], materializedNumber: Rx.Notification<number>): number[]{ 
      let rangeValue: number = materializedNumber.value; 

      if ((rangeValue % 3 === 0) || (rangeValue % 5 === 0)) { 
       acc.push(rangeValue); 
       generateNewObserverOnGroupOf(acc); 
       acc = []; 
      } else if (materializedNumber.kind === "C") { 
       generateNewObserverOnGroupOf(acc); 
       acc = []; 
      } else { 
       acc.push(rangeValue); 
      } 
      return acc; 
     }, []) 
     // .dematerialize() 
     .subscribe(function (x) { 
      // console.log(x); 
     }); 

function generateNewObserverOnGroupOf(acc: number[]) { 
    let info = acc.join("."); 
       Rx.Observable 
        .fromArray(acc) 
        .subscribe((value) => { 
         console.log(info, "=>", value); 
        }); 

アイデアは、「E」、「C「N」は、それぞれ(ストリームによって渡されるメッセージはnextの一つ、errorcompleted種類であるかどうかをコードする、通知とmaterializedematerialize動作することです'値はkindプロパティ)。 next通知がある場合、渡された値は通知オブジェクトのvalueフィールドにあります。ストリームの通常の動作に戻るためには、非マテリアライズする必要があるので、完了したらリソースを完了して解放する必要があることに注意してください。

+0

偉大な、あなたの助けを借りて、魅力的なように...-) – rlasjunies

関連する問題