2016-12-06 8 views
0

イベントドリブンデータパイプラインにコルーチンパイプラインを使用しています。これまでのところすべてがうまくいっています。入力の一部をバッチで処理したいと思っていましたが、上流のプロデューサが空になったら最終バッチが処理されるようにする方法が必要でした。下の考案された例では、これはprint(res)print_data_cpに一度produce_data_fromが行われた方法になります。より直接的なアナログは、長さ== 3のたびにresを印刷してリセットし、プロデューサが完了するとresの残りの値が確実に出力されるようにすることです。これを解決する方法はいくつかありますが、この問題に対する慣用的なアプローチがあります(例えば、センチネル値、剰余を返す、最後にクラスをラップする)か?プロデューサーが使い尽くされたときにコルーチンを知らせる方法はありますか?

今のところ私はクラスの一部としてコプロセッサ関数を持っており、コプロセッサ関数が完了した後にアクセスできるように、resをインスタンス変数としましょう。これは動作しますが、しばらくの間/最終的にはもっと一般的です。

def produce_data_from(data, consumer): 
    next(consumer) 
    for x in data: 
     consumer.send(x) 

def print_data_cp(): 
    res = [] 
    while True: 
     x = (yield) 
     res.append(x) 
     print(x) 

cons = print_data_cp() 
produce_data_from(range(10), cons) 

答えて

0

この変更では、try/finallyを使用して、プロデューサを変更してコンシューマコプロセッシングを終了します。 finallyブロックがトリガーされます。ここで、コプロセッサは、close()信号を送信するプロデューサに依存しているため、コンシューマ関数をバッチ処理に変更するには、上流のプロデューサ関数も変更する必要があります。理想的ではありませんが、それは動作し、十分にpythonic感じています。私は他のアプローチを見て喜んでいるでしょう。

def produce_data_from(data, consumer): 
    next(consumer) 
    for x in data: 
     consumer.send(x) 
    consumer.close() 

def print_data_cp(): 
    res = [] 
    try: 
     while True: 
      x = (yield) 
      res.append(x) 
      if len(res) >= 3: 
       print(res) 
       res = [] 
    finally: 
     print(res) 

cons = print_data_cp() 
produce_data_from(range(10), cons) 
関連する問題