2016-04-22 13 views
0

私はHighland.jsを使用してワークフローを設計しようとしています。私はHighland.jsがどのようにそれに使用できるのか把握できませんでした。Highland.jsでフィルタを使用する(DBを使用する)

Iは、(擬似コード)は以下のようなストリームベースのワークフローを有する、

read      //fs.createReadStream(...) 
    .pipe(parse)   //JSONStream.parse(...) 
    .pipe(filterDuplicate) //mongoClient.db.collection.count({}) > 0 
    .pipe(transform)  //fn(item) { return tranform(item); } 
    .pipe(write);   //mongoClient.db.collection.insert(doc) 

filterDuplicateは、読み出したレコードが(条件を使用して)存在し、ブール結果を返すかどうかを確認するためにデータベースを検索します。フィルタが機能するには、アクティブなDB接続が必要です。これは、ストリームが完成するまで再利用したいものです。 1つの方法は、writeの 'finish'イベントで読み取りと終了の前にオープンな接続を持つことです。これは、両方のメソッドが同じデータベースを使用する場合に動作する、フィルタリングと書き込みのためのパラメータとして接続を渡す必要があることを意味します。

上記のワークフローでは、filterDuplicateとwriteは異なるデータベースを使用する場合もあります。だから、私は接続が各機能と一緒に内蔵され管理されていることを期待しています。これにより、自己完結型の再利用可能なユニットになります。

ハイランドを使用してこれをどのように設計できるかについての入力があります。

ありがとうございました。

答えて

0

多くの場合、pipeを使用するのと同じくらい簡単にはなりません。タスクに最も適切なAPIメソッドを使用する必要があります。 (もしあれば)私は状態を維持するために高地のメカニズムのために特別に探していた

read 
    .through(JSONStream.parse([true])) 
    .through((x) => { 
    h((next, push) => { // use a generator for async operations 
     h.wrapCallback(mongoCountQuery)(params) // you don't have to do it this way 
     .collect() 
     .toCallback((err, result) => { 
      if (result > 0) push(err, x); // if it met the criteria, hold onto it 
      return push(null, h.nil); // tell highland this stream is done 
     }); 
    }); 
    }) 
    .merge() // because you've got a stream of streams after that `through` 
    .map(transform) // just your standard map through a transform 
    .through((x) => { 
    h((next, push) => { // another generator for async operations 
     h.wrapCallback(mongoUpdateQuery)(params) 
     .toCallback((err, results) => { 
      push(err, results); 
      return push(null, h.nil); 
     }); 
    }); 
    }) 
    .merge() // another stream-of-streams situation 
    .toCallback(cb); // call home to say we're done 
+0

:ここ

あなたはおそらく近い終わるしようとしているもののラフな例です。この場合は、mongoCountQueryとmongoUpdateQueryのdb接続を開き、適切な終了時にトリガーします。いくつかの読書の後、私は状態が外部で維持され、ストリーム処理関数へのコンテキストとして明示的に渡されるべきだと思う。そのように、ストリーム処理関数はコンテキストを使用してジョブを実行し、Highlandはストリーム関数の調整に重点を置いています。 – Krishnan

+0

4か月後、私は自分の答えを下降させる傾向がある。 – amsross

関連する問題