2017-10-09 15 views
0

私はむしろPySparkの新機能です。読み込みしようとしていて、いくつかのデータを前処理しています。しかし、データの一部が破損しているようで、これらのエラーを処理し、破損しているファイルを特定する必要があります。破損したファイル、プログラムがクラッシュをロードするPySpark - ファイルへのパスの読み込みと保存、一部破損

def load_audio(path, sr = 22050): 
    return librosa.load(path,sr=sr) 

rdd = sc.parallelize(paths) #Path to all files 
audio = rdd.map(lambda path: (path,load_audio(os.path.join(std_audio_path,path)))) # Loads all the audio files 

とすぐload_audioとしてトライ。私は試してキャッチが動作することがわかったが、私はこの場合どのように私の問題を解決する見ることができません。

理想的には、壊れていないすべてのファイルをロードすることです。私は、破損しているファイルのパスを欲しい/自分自身のrdd /リストに行くために読み込むときにエラーをスローします。

どうすればこの問題を解決できますか? :)

+0

あなたがクラッシュを何を意味するのかを代わりにcollect()のあなたが二回ファイルをロードし、その後はありませんので、audio.cache()を行いますか?特定のエラーメッセージが表示されますか? – desertnaut

+0

破損したファイルに対してEOFErrorが発生しました。 –

答えて

2

あなたの問題を簡単に解決します。ここでoldschool/Goマインドセットを使うことができます。代わりに、例外関数はエラーコードなどを実際の結果とともに返します。

私はあなたのファイルを再現する必要はありませんので、ファイルがmp3の場合はパスをチェックして例外をスローする機能を使用して同様のエラーを作成します。のは、私はファイルのリストを持っているとしましょう:

import random 
paths = [str(i) + '.' + random.choice(('mp3', 'ogg')) for i in xrange(100)] 
std_audio_paths = '/' 

def load_audio(path, sr=22050): 
    if path.endswith('mp3'): 
     raise IOError('Unknown format') 
    return 'audio data' 




rdd = sc.parallelize(paths) 
audio = rdd.map(lambda path: (path,load_audio(os.path.join(std_audio_path,path)))) 
audio.collect() 
# will break because load_audio throws exception 

例外を発生させますが、エラーを返していないload_audioのラッパーを作成することを除いだから私は/試みる使用します。今

def try_to_load_audio(path): 
    try: 
     return load_audio(path), None 
    except IOError as e: 
     return '', 'error' 

audio = rdd.map(lambda path: (path,try_to_load_audio(os.path.join(std_audio_path,path)))) 
audio.collect() 
# returns collection of pairs (path, (result, error)): 
# [('0.mp3', ('', 'error')), ('1.ogg', ('audio data', None)), ... 

の第二部にあなたの質問。

failed = audio.filter(lambda f: f[1][1] == 'error').map(lambda f: f[0]) 
failed.collect() 
# gives paths only of failed files (mp3 in my case): 
# ['0.mp3', '3.mp3', ... 

をそして、あなたがやるだけロードされたオーディオデータを持っている:

loaded_data = audio.filter(lambda f: f[1][1] is None).map(lambda f: f[1][0]) 
loaded_data.collect() 
# Gives ['audio data', 'audio data',... 
+0

入力いただきありがとうございますが、実際に私が求めているものではありません。私は多くのデータを持っており、オーディオの読み込みには長い時間がかかります。ロードされたオーディオのrdd/listと、破損してロードできなかったファイルへのパスを持つ別のものが必要です。 –

+0

@CarlRynegardh申し訳ありませんが、あなたの質問の2番目の部分を逃した。エラーのあるパスがあるRDDを作成するためのフィルタの使用方法に関する解説を更新しました。 – Bunyk

関連する問題