2017-04-05 14 views
1

私はkafkaから時系列データをhadoopに保存するプログラムを書いています。そして私はこのようなディレクトリ構造体を設計した:時系列データを月単位と日単位で区切ったhdfsに書き込みますか?

event_data 
|-2016 
    |-01 
    |-data01 
    |-data02 
    |-data03 
|-2017 
    |-01 
    |-data01 

デーモンの作業ですので、私は開かれたファイルを管理し、リソースの漏れを避けるために、時間内に非アクティブなファイルを閉じるにはLRUベースの管理を書きますが、所得のデータストリームが時間順にソートされていない場合、既存のファイルを開いて新しいデータを追加するのは非常に一般的です。

ファイルが存在するとOutputStreamを開こうとしましたが、私のhdfsクラスタでエラーが発生します。

次に、目標を達成するために別の方法を使用します。 同じ名前のファイルが存在する場合、ファイル名にシーケンスサフィックスを追加します。今私は自分のhdfsにたくさんのファイルを持っています。それは非常に汚れています。

私の質問は次のとおりです。状況のベストプラクティスは何ですか?

+0

HDFSは実際にファイルに追加するためのものではありません –

答えて

0

これはあなたのプログラミングの問題への直接の答えではありませんが、すべてのオプションのために開いているのではなく自分でそれを実装する場合、私はfluentdであなたの経験を共有したいと、それはHDFS (WebHDFS) Output Pluginだと申し訳ありません。

Fluentdはオープンソースのプラッガブルなデータコレクタで、データパイプラインを簡単に構築でき、入力からデータを読み取り、処理して、指定された出力に書き出します。 kafkaであり、出力はHDFSです。何をする必要がある:

  • コンフィグがfluentd kafka plugininputをfluentd、あなたのカフカ/トピック情報
  • source一部をconfigのでしょう、あなたのHDFSクラスタのwebhdfsappend操作を有効にし、あなたはどのように見つけることができますHDFS (WebHDFS) Output Plugin
  • HDFSにデータを書き込むようにmatchの部分を設定すると、プラグインのドキュメントページに例があります。あなたは、その後にあなたのMapReduceジョブを書くことができ、あなたのデータを収集するには、このオプションを使用すると

    path "/event_data/%Y/%m/data%d"

:パーティションの月と日によってあなたのデータは、あなたがタイムスライスプレースホルダ、何かのようでpathパラメータを設定することができますETLや何でも好きなことをする。

これが問題に適しているかどうかわかりません。ここでもう1つのオプションを指定してください。

+0

問題を解決できる場合のみ、任意のオプションで公開されています。実際には、いくつかの詳細が私の質問に掲載されていないです。 kafkaのメッセージはプレーンテキストではなく、 'protobuf'メッセージであり、' timestamp'は 'protobuf'メッセージのフィールドです。私は 'fluentd'の' protobuf'に対する明確な支持を今はまだ見つけていません。もう1つの詳細は、hdfsのメッセージストアのフォーマットはutf-8エンコーディングの 'json'の行です。 – aLeX

+0

サービス内のカフカからデータを処理/変換し、適切なフォーマットのデータをフルードなどのサードパーティのサービスにプッシュすることで、データをHDFSにロードするプロセスを管理することができます。プロセスのみであり、データパイプライン全体のステップの1つになります。とにかく、それはあなたの技術スタックと好みに基づいてあなたの選択です。申し訳ありませんあなたのバグのための手がかりは、私はそれを再現することはできません。 – shizhz

関連する問題