ファイル名のリストが与えられています。このファイルにはカンマ区切りのデータが含まれており、ファイル名に基づいて情報を含む列でさらに拡張する必要があります。したがって、私は小さなread_file
関数を実装しました。この関数は、最初のクリーニングと追加の列の計算の両方を処理します。 db.from_sequence(files).map(read_file)
を使用して、読み込み関数をすべてのファイルにマッピングして、それぞれの辞書のリストを取得します。複数のファイルから複数のファイルからDaskデータフレームへダークバッグ
しかし、辞書のリストではなく、入力ファイルの個々の行をエントリとして格納するようにします。その後、辞書のキーをdaskデータフレーム内の列名にマップする必要があります。
from dask import bag as db
def read_file(filename):
ret = []
with open(filename, 'r') as fp:
... # reading line of file and storing result in dict
ret.append({'a': val_a, 'b': val_b, 'c': val_c})
return ret
from dask import bag as db
files = ['a.txt', 'b.txt', 'c.txt']
my_bag = db.from_sequence(files).map(read_file)
# a,b,c are the keys of the dictionaries returned by read_file
my_df = my_bag.to_dataframe(columns=['a', 'b', 'c'])
このコードを実行するために変更する必要があることを教えてもらえますか?もっと適切なアプローチがありますか?
編集: 私は3つのテストファイルa_20160101.txt
,a_20160102.txt
,a_20160103.txt
を作成しました。それらのすべてには、それぞれ1つの文字列を含む数行しかありません。
asdf
sadfsadf
sadf
fsadff
asdf
sadfasd
fa
sf
ads
f
は、以前私がread_file
の小さな誤りがあったが、今、読者にマッピングした後my_bag.take(10)
を呼び出すと、正常に動作します:
([{'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfsadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fsadff', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'asdf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sadfasd', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'fa', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'sf', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'ads', 'c': 'XY'}, {'b': datetime.datetime(2016, 2, 1, 0, 0), 'a': 'f', 'c': 'XY'}],)
しかしmy_df = my_bag.to_dataframe(columns=['a', 'b', 'c'])
、その後 my_df.head(10)
はまだdask.async.AssertionError: 3 columns passed, passed data had 10 columns
のシングルフラット化コレクションである私達の袋を連結することができますか? – MRocklin