2017-03-14 10 views
0

テキストファイルがあり、これらのファイルを使用してRDDを作成します。 テキストファイルは「Folder_1」と「Folder_2」に保存され、ファイルがローカルストレージに格納されている場合、これらのフォルダは、フォルダ「text_data」HDFSディレクトリからファイルを読み込み、PythonでSparkでRDDを作成する

に格納され、次のコードは動作します:

#Reading the corpus as an RDD 

data_folder = '/home/user/text_data' 

def read_data(data_folder): 
    data = sc.parallelize([]) 
    for folder in os.listdir(data_folder): 
     for txt_file in os.listdir(data_folder + '/' + folder ): 
      temp = open(data_folder + '/' + folder + '/' + txt_file) 
      temp_da = temp.read() 
      temp_da = unicode(temp_da, errors = 'ignore') 
      temp.close() 
      a = [ (folder, temp_da) ] 
      data = data.union(sc.parallelize(a)) 
    return data 

関数read_dataは、テキストファイルからなるRDDを返します。

'text_data'フォルダをHDFSディレクトリに移動するとどうすればできますか?

コードは、SPARKを実行しているHadoop-Yarn Clusterにデプロイされます。

答えて

0

hdfs_folder = 'hdfs://<namenode>/home/user/text_data/*' 

def read_data(hdfs_folder): 
    data = sc.parallelize([]) 
    data = sc.textFile(hdfs_folder) 
    return data 

の下にあなたのHadoop環境の名前ノードを置き換えこれは、スパーク1.6.2に返信用のバージョン

>>> hdfs_folder = 'hdfs://coord-1/tmp/sparktest/0.txt' 
>>> def read_data(hdfs_folder): 
...  data = sc.parallelize([]) 
...  data = sc.textFile(hdfs_folder) 
...  return data 
... 
>>> read_data(hdfs_folder).count() 
17/03/15 00:30:57 INFO SparkContext: Created broadcast 14 from textFile at NativeMethodAccessorImpl.java:-2 
17/03/15 00:30:57 INFO SparkContext: Starting job: count at <stdin>:1 
17/03/15 00:30:57 INFO SparkContext: Created broadcast 15 from broadcast at DAGScheduler.scala:1012 
189 
>>> 
+0

感謝を試験しました。 @サビビリス それはほとんど問題を解決します。唯一の欠点は、各テキストファイルに別々にアクセスし、タプルを作成する必要があることです。 'a = [(folder、temp_da)] タプルをRDD に追加してください。data = data.union(sc.parallelize a)) ' この機能をコードに組み込む方法は? – nidhink1995

関連する問題