2017-02-06 1 views
0

ディスク上の多数のファイルを繰り返し処理し、各ファイルを開き、解析する必要があります。ファイル名を持つファイルがあります。のみ、それらのファイル名を反復処理する必要があります。私は次のことを実行しようとするとファイルのリストを繰り返し、その内容を抽出しますか? (SparkContextエラー)

%python 

def parse(filename): 
    try: 
    tf = sc.textFile(filename) 
    # run parsing code, produce text 
    return text 
    except: 
     return None 

私はマップ()にこの関数を渡す

parsed_contents = filenames.map(parse) 
parsed_contents.top(5) 

私はこのエラーを取得する:

例外:それブロードキャスト変数、アクション、または変換からSparkContextを参照しようとしているようです。 SparkContextはドライバでのみ使用でき、ワーカーで実行されるコードでは使用できません。詳細は、SPARK-5063を参照してください。

tryブロック内のコードは、ファイル名を指定して別に実行すると機能します。

指定されたファイルのリストに対して、その内容をどのように反復処理する必要がありますか?

答えて

1

あなたは(この場合、通話filnames.map(parse))RDDに変換を実行すると、ドライバは労働者が自分のRDDの各パーティションを処理するために割り当てます。したがって、あなたの地図呼び出しは基本的にあなたのrddに適用されるワーカーに送信されます。あなたが提供したコードでは、基本的にはワーカーで実行されているコードからsparkContextインスタンスを呼び出すため、エラーが発生します。ファイルの読み込みは、ドライバプロセスで行う必要があります。

sc.textFileは、あなたが読みたいファイル名を指定して、文字列をカンマ区切りを受け入れ だからあなたのような何かを行うことができます:。あなたはまた、sc.textFileメソッドへの入力としてパターンを指定することができ

filenames = sc.textFile("filesToRead.txt") 

parsed_contents = sc.textFile(",".join(filenames.collect())) 

parsed_contents.top(5) 

を。例えば、

parsed_contents = sc.textFile("file[0-5].txt") 

UPDATE ディスク上に存在するファイルをフィルタリングするために。

def check_exists(name): 
    try: 
     open(name, 'r') 
     True 
    except: 
     False 

existingFiles = filenames.filter(check_exists) 
+0

sc.textFile()に渡すファイル名のリストにあるファイルの一部が存在しない場合はどうなりますか?私は現在、rdd1.intersection(rdd2) を実行しています。最初のRDDは、ファイル名を含むファイルとディレクトリリストの2番目のファイルから作成されます。 –

+0

現在の方法は問題ありません。私は別の方法を追加しました。 – septra

関連する問題