2017-03-03 8 views
0

フォルダにはgzipファイルがたくさんあります。各gzipファイルにはxmlファイルが含まれています。私はflumeを使用してファイルをHDFSにストリーミングしました。私はHDFSにファイルをストリーミングした後、私は次のコードを使用して、それを読むためにスパークを使用し、flume to gz files

agent1.sources = src 
agent1.channels = ch 
agent1.sinks = sink 

agent1.sources.src.type = spooldir 
agent1.sources.src.spoolDir = /home/tester/datafiles 
agent1.sources.src.channels = ch 
agent1.sources.src.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder 

agent1.channels.ch.type = memory 
agent1.channels.ch.capacity = 1000 
agent1.channels.ch.transactionCapacity = 1000 

agent1.sinks.sink.type = hdfs 
agent1.sinks.sink.channel = ch 
agent1.sinks.sink.hdfs.path = /user/tester/datafiles 
agent1.sinks.sink.hdfs.fileType = CompressedStream 
agent1.sinks.sink.hdfs.codeC = gzip 
agent1.sinks.sink.hdfs.fileSuffix = .gz 
agent1.sinks.sink.hdfs.rollInterval = 0 
agent1.sinks.sink.hdfs.rollSize = 122000000 
agent1.sinks.sink.hdfs.rollCount = 0 
agent1.sinks.sink.hdfs.idleTimeout = 1 
agent1.sinks.sink.hdfs.batchSize = 1000 

df = sparkSession.read.format('com.databricks.spark.xml').options(rowTag='Panel', compression='gzip').load('/user/tester/datafiles') 

をしかし、私はそれを読むために問題を抱えています以下は、私の設定ファイルです。手動で1つのgzipファイルをHDFSフォルダにアップロードして上記のSparkコードを再実行すると、問題なく読み込めます。私はそれが水路に起因するのかどうかはわかりません。

flumeでストリームされたファイルをダウンロードして解凍しようとしましたが、内容を見たときにxml形式が表示されなくなりました。これは読めない文字です。誰かが私にこれについていくつかの光を当てることができますか?ありがとう。

+0

ファイルをどのように解凍しますか? 'gunzip'を使う?そしてあなたはsparkでファイルを読み込むことにどのような問題がありますか? spark-xmlに手動でスキーマを指定しようとしましたか? – Mariusz

+0

Mariusz、ファイルを解凍しませんでした。私はgzファイルをストリームし、sparkを使ってそれを読み込もうとしていました。私は手動でスキーマを指定しませんでした。ストリーミングされたgzipファイルを読み込んでコンテンツを表示すると、特殊文字が表示されます。しかし、HDFSでgzipファイルを手動でアップロードしようとすると、何の問題もなく読み込めるようになりました。問題なくコンテンツとスキーマを表示できます。私はそれがFlumeによると思う? – kcyea

答えて

0

私はあなたがそれをやっていると思います間違った!!!なぜですか?

"分割不可" ZIPのソースがあります。それらを部分的にレコードとして記録することはできません。圧縮解除しないと、GZIPInputStreamが得られます。これは、flumeソースに入っています。

入力レコードとしてそのGZIP入力ストリームを読み取った後、圧縮されたシンクタイプを選択すると、すでにジップストリームを別のGZIPストリームに保存しています。

あなたはHDFSのGzipの中にZip Streamedを持っています。 :)

私はあなたの問題を解決するローカルからHDFSへのコピーを行うためにスクリプトをスケジュールすることをお勧めします。

+0

ああ、そうですね... Flumeがgzipファイルをピックアップして元のフォーマットのHDFSに入れる方法はありませんか?ローカルからHDFSへのコピーを行うためにcronでスクリプトをスケジュールすると、HDFSに小さなファイルがたくさんあると思っていました。これはHDFSの多くのブロックを占めてしまうでしょう。これは、gzipファイルのサイズがそれぞれ約20kbしかないためです。 – kcyea

+0

あなたができることは、sparkを使ってそれらのzipファイルを読み込み、処理後にhadoopに書き込むことです。 – RBanerjee