2015-11-20 27 views
10

複数のログファイルが書き込まれるときにそれらを読み込み、asyncioで入力を処理したいと考えています。コードはウィンドウで実行する必要があります。私がstackoverflowとウェブの両方を検索することから理解しているところから、非同期ファイルの入出力はほとんどのオペレーティングシステムでは難しいです(例えば、selectは意図したとおりに動作しません)。私は他のメソッド(スレッドなど)でこれを行うことができると確信していますが、私はasyncioを試してみて、それがどのようなものかを見ていきます。最も有益な答えは、おそらく、この問題の解決策の「アーキテクチャ」、すなわち異なる機能とコルーチンをどのように呼び出すか、またはスケジュールするべきかを説明するものでしょう。以下は私に(許容されるポーリングを通じて、)行ごとにファイルを読み込み、発電与えasyncioでファイルを1行ずつ読み込みます

import time 

def line_reader(f): 
    while True: 
     line = f.readline() 
     if not line: 
      time.sleep(POLL_INTERVAL) 
      continue 
     process_line(line) 

いくつか監視するためのファイルやプロセスでは、スレッドを必要とするコードのこの種。私はasyncioイベントループを通してそれをスケジュールする際に作品の

import asyncio 

def line_reader(f): 
    while True: 
     line = f.readline() 
     if not line: 
      yield from asyncio.sleep(POLL_INTERVAL) 
      continue 
     process_line(line) 

この種が、process_dataブロックならば、それはもちろんありません良いです。私はそれが少しasyncioともっと使えるように変更しました。始めたとき、私は解決策が

def process_data(): 
    ... 
    while True: 
     ... 
     line = yield from line_reader() 
     ... 

ようになりますが、私は(少なくともない状態のかなりのビットを管理process_dataせずに)その仕事を作る方法を見つけ出すことができなかった想像しました。

この種のコードをどのように構造化するべきかについてのアイデアはありますか?

+0

ファイルが変更されます。 – josteinb

答えて

10

私はstackoverflowのとWebの両方の周りに検索から理解してどのようなことから、非同期ファイルI/Oは注意が必要ですほとんどのオペレーティングシステムでは(selectは意図したとおりに動作しません)。私は他のメソッド(スレッドなど)でこれを行うことができると確信していますが、私はasyncioを試してみて、それがどのようなものかを見ていきます。

asyncioフードの下の* nixシステムに基づいてselectあるので、あなたは、スレッドを使用せずに、非ブロッキングのファイルI/Oを行うことができません。 Windowsでは、asyncioは、非ブロックのファイルI/OをサポートするIOCPを使用できますが、asyncioではサポートされていません。

I/Oが遅い場合にイベントループをブロックしないように、I/O呼び出しをスレッドでブロックする以外は、コードは問題ありません。幸運にも、loop.run_in_executor関数を使用してスレッドに作業をロードするのは本当に簡単です。

まず、セットアップあなたのI/Oのための専用スレッドプール:その後、

from concurrent.futures import ThreadPoolExecutor 
io_pool_exc = ThreadPoolExecutor() 

とは、単に任意のブロックI/Oは、エグゼキュータへの呼び出しをオフロード:

... 
line = yield from loop.run_in_executor(io_pool_exc, f.readline) 
... 
0

asyncioはまだファイル操作をサポートしていません。申し訳ありません。

このように、あなたの問題を解決することはできません。

2

あなたのコードの構造は、私にはよさそうだ、次のコードは、私のマシン上で正常に動作します:

import asyncio 

PERIOD = 0.5 

@asyncio.coroutine 
def readline(f): 
    while True: 
     data = f.readline() 
     if data: 
      return data 
     yield from asyncio.sleep(PERIOD) 

@asyncio.coroutine 
def test(): 
    with open('test.txt') as f: 
     while True: 
      line = yield from readline(f) 
      print('Got: {!r}'.format(line)) 

loop = asyncio.get_event_loop() 
loop.run_until_complete(test()) 
+0

それはよさそうだね!私はあなたが 'test'という名前の関数でファイルを開くべきであることを忘れていました。それは私の頭痛を修正します。ありがとう! – josteinb

+0

私はそれが少し誤解を招いていると思います - あなたは実際に読み込み自体をオフロードしていません。非同期で 'sleep'をやっているだけです。 IIUC。 – guyarad

+0

@guyaradあなたは絶対に正しいです。その答えは単にOPのコードを修正することでした。多分私はそれを削除します。 – Vincent

12

aiofilesを使用します:

async with aiofiles.open('filename', mode='r') as f: 
    async for line in f: 
     print(line) 

EDIT 1

@Jashandeepが述べたように、あなたは操作を阻止する気にする必要があります

もう一つの方法は、selectとかepoll次のとおりです。

from select import select 

files_to_read, files_to_write, exceptions = select([f1, f2], [f1, f2], [f1, f2], timeout=.1) 

timeoutパラメータがここでは重要です。

参照:https://docs.python.org/2/library/select.html#select.select

EDIT 2

あなたがして、読み取り/書き込みのためにファイルを登録することができます:私は、コードの最上位バージョンをテストしている、そしてそれは読み取ることができloop.add_reader()

関連する問題