2017-02-14 7 views
2

RDDの各レコードをHDFSの個々のファイルに書きたいという要件があります。スパーク:RDDの各レコードをHDFSディレクトリの個々のファイルに書き込む

通常のファイルシステムでやったのですが、明らかにHDFSでは機能しません。 書き込みは、ファイルシステムへの書き込みを行う機能である

stream.foreachRDD{ rdd => 
    if(!rdd.isEmpty()) { 
     rdd.foreach{ 
      msg => 
      val value = msg._2 
      println(value) 
      val fname = java.util.UUID.randomUUID.toString 
      val path = dir + fname 
      write(path, value) 
     } 
     } 
    } 

スパークの中でそれを行う方法はありますか?各レコードについて、私はネイティブにKafka ConnectまたはFlumeのような他のツールを使用せずにHDFSに書き込むことができますか?


EDIT:例えばのためのより多くの説明

: 私DstreamRDDは、次のレコードを持っている場合は、

  • ABCD
  • EFGH
  • IJKL
  • MNOP

"abcd"とは異なるファイル、 "efgh"とは異なるファイルなど、レコードごとに異なるファイルが必要です。

私はstreamRDD内でRDDを作成しようとしましたが、RDDのシリアル化ができないため、許可されていないことが分かりました。

あなたはいくつかの方法で行うことができます
+0

解決策を投稿するか、正しいものを受け入れてください。それは、同様の問題を抱えている他の人に役立ちます。 – Explorer

+0

@LiveAndLetLiveこの問題の解決策はまだ見つかりませんでした。前のコメントの1つで述べたように、レコードの保存から、複数のレコードを含むRDD全体の保存に移行しました。だから、この質問はまだ開いています。 –

+0

独自のMultipleTextOutputFormatを使用することができます。この返信を参照してください。https://stackoverflow.com/a/26051042/609597 – softwarevamp

答えて

-1

..

RDDから、あなたはsparkCOntextを得た後、あなたが並列化の方法を使用して、文字列のリストとして文字列を渡すことができ、sparkCOntextを得ることができます。例えば

val sc = rdd.sparkContext 
sc.parallelize(Seq("some string")).saveAsTextFile(path) 

また、ファイルに書き込み、その後DFに文字列を変換するためにsqlContextを使用することができます。例のため

import sqlContext.implicits._ 
Seq(("some string")).toDF 
+0

私のデータはrddの範囲内にあるので、rddのネストとして指定した方法はrddを作成できません。許可されます。 –

+0

シャンカーズのアプローチは私にとって正しいようです。 @BiplobBiswas他に何を試して解決できましたか? –

+0

@RamGhadiyaram個々のレコードを別々のファイルとして保存すると、私たちの将来の問題は解決されましたが、RDD全体をHDFSに保存するようになりました。 –

0

あなたが強制的に何にRDDを再分割することはできません。多くの場合、レコードを保存して保存してください

val rddCount = rdd.count() 
rdd.repartition(rddCount).saveAsTextFile("your/hdfs/loc") 
関連する問題