あなたの問題を簡単に解決します。ここでoldschool/Goマインドセットを使うことができます。代わりに、例外関数はエラーコードなどを実際の結果とともに返します。
私はあなたのファイルを再現する必要はありませんので、ファイルがmp3の場合はパスをチェックして例外をスローする機能を使用して同様のエラーを作成します。のは、私はファイルのリストを持っているとしましょう:
import random
paths = [str(i) + '.' + random.choice(('mp3', 'ogg')) for i in xrange(100)]
std_audio_paths = '/'
def load_audio(path, sr=22050):
if path.endswith('mp3'):
raise IOError('Unknown format')
return 'audio data'
rdd = sc.parallelize(paths)
audio = rdd.map(lambda path: (path,load_audio(os.path.join(std_audio_path,path))))
audio.collect()
# will break because load_audio throws exception
例外を発生させますが、エラーを返していないload_audio
のラッパーを作成することを除いだから私は/試みる使用します。今
def try_to_load_audio(path):
try:
return load_audio(path), None
except IOError as e:
return '', 'error'
audio = rdd.map(lambda path: (path,try_to_load_audio(os.path.join(std_audio_path,path))))
audio.collect()
# returns collection of pairs (path, (result, error)):
# [('0.mp3', ('', 'error')), ('1.ogg', ('audio data', None)), ...
の第二部にあなたの質問。
failed = audio.filter(lambda f: f[1][1] == 'error').map(lambda f: f[0])
failed.collect()
# gives paths only of failed files (mp3 in my case):
# ['0.mp3', '3.mp3', ...
をそして、あなたがやるだけロードされたオーディオデータを持っている:
loaded_data = audio.filter(lambda f: f[1][1] is None).map(lambda f: f[1][0])
loaded_data.collect()
# Gives ['audio data', 'audio data',...
あなたがクラッシュを何を意味するのかを代わりに
collect()
のあなたが二回ファイルをロードし、その後はありませんので、audio.cache()
を行いますか?特定のエラーメッセージが表示されますか? – desertnaut破損したファイルに対してEOFErrorが発生しました。 –