2016-05-24 26 views
2

私はパンダのコードを(py)Sparkに移植しようとしています。残念ながら、バイナリデータを読み込んでSpark Dataframeに入れたい入力部分では既に失敗しています。hdfsからバイナリファイルをSparkデータフレームに読み込むにはどうすればよいですか?

これまでのところ私はnumpyのからfromfile使用しています:

dt = np.dtype([('val1', '<i4'),('val2','<i4'),('val3','<i4'),('val4','f8')]) 
data = np.fromfile('binary_file.bin', dtype=dt) 
data=data[1:]           #throw away header 
df_bin = pd.DataFrame(data, columns=data.dtype.names) 

をしかし、スパークのために、私はそれを行う方法を見つけることができませんでした。私の回避策はバイナリファイルの代わりにcsv-Filesを使うことでしたが、それは理想的な解決策ではありません。私はnumpyのfromfileをsparkに使用すべきではないことを認識しています。 すでにhdfsにロードされているバイナリファイルを読み込むにはどうすればよいですか?

私は

fileRDD=sc.parallelize(['hdfs:///user/bin_file1.bin','hdfs:///user/bin_file2.bin]) 
fileRDD.map(lambda x: ???) 

ような何かをしようとしたが、それは私にNo such file or directoryエラーを与えています。

私はこの質問を見た: spark in python: creating an rdd by loading binary data with numpy.fromfile しかし、それはドライバノードのホームにファイルが保存されている場合にのみ動作します。

答えて

2

だから、私とつまずきがここで、バイナリファイルをuopnとしてスパークで始まる誰もが、私はそれを解決する方法であるために:

dt=np.dtype([('idx_metric','>i4'),('idx_resource','>i4'),('date','>i4'), 
      ('value','>f8'),('pollID','>i2')]) 
schema=StructType([StructField('idx_metric',IntegerType(),False), 
        StructField('idx_resource',IntegerType(),False), 
        StructField('date',IntegerType),False), 
        StructField('value',DoubleType(),False), 
        StructField('pollID',IntegerType(),False)]) 

filenameRdd=sc.binaryFiles('hdfs://nameservice1:8020/user/*.binary') 

def read_array(rdd): 
    #output=zlib.decompress((bytes(rdd[1])),15+32) # in case also zipped 
    array=np.frombuffer(bytes(rdd[1])[20:],dtype=dt) # remove Header (20 bytes) 
    array=array.newbyteorder().byteswap() # big Endian 
    return array.tolist() 

unzipped=filenameRdd.flatMap(read_array) 
bin_df=sqlContext.createDataFrame(unzipped,schema) 

そして今、あなたがあなたのデータフレームとスパークに好きな凝っ行うことができます。 unpack_formatとsparkSchemaは "の同期" する必要が

from struct import unpack_from 

# creates an RDD of binaryrecords for determinted record length 
binary_rdd = sc.binaryRecords("hdfs://" + file_name, record_length) 

# map()s each binary record to unpack() it 
unpacked_rdd = binary_rdd.map(lambda record: unpack_from(unpack_format, record)) 

# registers a data frame with this schema; registerTempTable() it as table_name 
raw_df = sqlc.createDataFrame(unpacked_rdd, sparkSchema) 
raw_df.registerTempTable(table_name) 

2

編集: ここに述べたようにsc.binaryFilesの使用を検討してください: https://stackoverflow.com/a/28753276/5088142


使用してみてください:

hdfs://machine_host_name:8020/user/bin_file1.bin 

あなたはホスト名をfs.defaultFS中にcore-site.xml

+0

fs.defaultFSはnameservice1言うが、また、 'HDFSと、以下のように定義することができ:// nameservice1:8020/user/bin_file1.bin'まだファイルが見つかりません。エラーです。 地図に載せた機能とリンクすることはできますか? 'DEF read_bin: をFとしてオープン( "myfileの"、 "RB")で:! バイト= f.read(1) しばらくバイト= "": バイト= f.read(1)' – WilliamEllisWebb

+0

れていますあなたは "ファイルが見つかりませんでした"というエラーメッセージが表示されますか?どのように "read_bin"関数を使用する予定ですか?オープンメソッドはHDFSで動作しないようです.... – Yaron

+0

エラーは、read_binの2行目にあります。あなたが正しいです、オープンメソッドはHDFSを好きではありません。私は 'sc.textfile(filename).map(lambda line:line.split( '、'))と似たものを探しています。map(lambda x:(int [x] ....) ' – WilliamEllisWebb

0

私は最近、このような何かをしました。

で例を参照してください。両方とも同時に。(より大きなコードベースのそれの一部なので、readbilityのために、ここで掲載しない)

unpack_formatとsparkSchemaは、例えば、

from pyspark.sql.types import * 

unpack_format = '<' # '<' means little-endian: https://docs.python.org/2/library/struct.html#byte-order-size-and-alignment 
sparkSchema = StructType() 
record_length = 0 

unpack_format += '35s' # 35 bytes that represent a character string 
sparkSchema.add("FirstName", 'string', True) # True = nullable 
record_length += 35 

unpack_format += 'H' # 'H' = unsigned 2-byte integer 
sparkSchema.add("ZipCode", 'integer', True) 
record_length += 2 

# and so on for each field.. 
関連する問題