2017-07-11 8 views
2

私は入力データをS3上に1つの大きなファイルとして保存しました。 私はファイルを自動的にチョップし、作業者に配布し、データフローを管理したいと思っています。したがって、分散コレクションを使用する考え方。バッグ。各作業者の外部コードを使用した分散型Daskコレクションの処理

私は、ファイル(複数可)からデータを読み込むコマンドラインツール(Javaの)持っています。したがって、私はデータの全体をファイルに書き出し、外部CLI /コードを呼び出してデータを処理し、出力ファイルから結果を読み取るようにしたいと思います。これは、一度に1つのレコードではなく、データのバッチを処理するように見えます。

この問題を解決するにはどうすればよいでしょうか?作業者のディスクにパーティションを書き込んで、それを全体として処理することは可能ですか?

PS。データの他の操作は、レコードごとにデータレコードを処理するより単純なPython関数かもしれないので、分散コレクションモデルにとどまる必要はありませんが、望ましいです。

+0

こんにちはとスタックオーバーフローを歓迎する、[歓迎ツアーを通過するための時間をとってください](https://stackoverflow.com/tour)ここで(あなたの最初のバッジを得るために)あなたのやり方を知り、[最小限の完全で検証可能な例を作成する方法を読む](https://stackoverflow.com/help/mcve)また、[How to Ask Questions(How to Ask Good Questions)](https://stackoverflow.com/help/how-to-ask)もチェックして、フィードバックや役に立つ回答を得る機会を増やしてください。 – DarkCygnus

答えて

3

おそらくread_bytes機能が必要です。これにより、ファイルがデリミタ(エンドラインのように)できれいに分割された多数のチャンクに分割されます。これらのバイトブロックを指すdask.delayedオブジェクトのリストを返します。

、このマニュアルページでより多くの情報があります:ここでhttp://dask.pydata.org/en/latest/bytes.html

はドキュメンテーション文字列からの例です:

>>> sample, blocks = read_bytes('s3://bucket/2015-*-*.csv', delimiter=b'\n') 
+0

read_bytes()は、私の容疑者のリストにありました。しかし、私はレコードの区切り文字について質問があります。私は明らかに、作業長(例えば20MB)が読み取るおおよそのチャンクサイズと、同時にデリミタを指定したいと思います。レコードの長さが変わるからです。フレームワークは正確な区切り文字の位置をどのように見つけ出すでしょうか?スケジューラはファイル全体を読み込みますか(望ましくない)?ファイルがちょうど普通の断片に切り刻まれば、いくつかのレコードは半分にカットされます。この場合、作業者は別の( "より早い")索引を読むことを知る必要がありますか? – evilkonrex

+0

'read_bytes'関数は、チャンクサイズに基づいて場所を探し出し、区切り文字が見つかるまで前方に読み込みます。区切り文字の間隔がいくらか頻繁にあると仮定すると、これはかなり効率的で、おおよそのチャンクサイズを尊重し、常に区切り文字で終了し、後に開始します。 – MRocklin

+0

私はread_bytesでいくつかの実験を行いましたが、それはうまくいきました。APIを見ると、データフレーム上でmap_partitions()を呼び出し、各パーティションから抽出したデータに対してカスタム処理を実行し、変更されたデータフレームこのようにして、私はdaskデータフレームAPIの限界内にとどまることができるかもしれません。それは正しいですか、私は何かが欠けていますか? (ちょうど明確にするために、私は私の入力データがCSVファイルであると仮定しました) – evilkonrex

関連する問題