2016-09-09 12 views
1

私は寄木張りとしてcsvファイルを書き込むためにflinkを使用しようとしています。 次のコードを使用していますが、エラーが表示されます。Flinkを寄木細工のエラーに変換する

val parquetFormat = new HadoopOutputFormat[Void, String](new AvroParquetOutputFormat, job) 
FileOutputFormat.setOutputPath(job, new Path(outputPath)) 

次のビルドエラーが発生します。誰かが助けてくれますか?

タイプミスマッチ;見つかった:parquet.avro.AvroParquetOutputFormat 必須: org.apache.hadoop.mapreduce.OutputFormat [Void、String] ingestion.scala/flink-scala/src/main/scala/com/sc/edl/flink line スカラ問題

答えて

1

OutputFormat[Void, String]が必要なHadoopOutputFormat[Void, String]を作成するとします。

を含むAvroParquetOutputFormatを提供します。 ParquetOutputFormatは、ParquetOutputFormat<T> extends FileOutputFormat<Void, T>と定義される。

を入力し、HadoopOutputFormat[Void, String]にはOutputFormat[Void, String]が必要です。

あなたはparquetFormatアウト書きたいDataSetはタイプ(Void, IndexedRecord)でない場合、あなたは(Void, IndexedRecord)ペアにデータを変換しMapFunctionを追加する必要があります

val parquetFormat = new HadoopOutputFormat[Void, IndexedRecord](
    new AvroParquetOutputFormat, job) 
FileOutputFormat.setOutputPath(job, new Path(outputPath)) 

に変更する必要があります。

+0

ありがとうございましたFabian、申し訳ありませんが、私はこれで新しくなりました。正しい構文や何が間違っているかを教えてください。 – Niki

+0

私は自分の答えを広げました –

1

Flink Tupleは現在のところNULLキーをサポートしていないため、依然として問題は残ります。 次のエラーが発生します。 Caused by: org.apache.flink.types.NullFieldException: Field 1 is null, but expected to hold a value.

をより良い選択は、この例で説明したようにKiteSDKを使用することです: https://github.com/nezihyigitbasi/FlinkParquet ですから、動的なスキーマを必要とする場合は、スキーマを遵守する必要があるため、このアプローチは動作しません、厳密にさらに、これは執筆のためではなく、朗読の方が良い。

Spark DataFrameは、Parquetだけでなくパフォーマンス面でも非常にうまく機能します。しかし、もしあなたがFlinkを使いたいのであれば、Flinkコミュニティがapiをリリースするのを待つか、またはparquet-hadoopコードを編集して大きな努力をする必要があります。

のみ、これらのコネクタは、だから、まだ https://github.com/apache/flink/tree/master/flink-connectors を実装しているあなたはそれが生産のユースケースを考慮し、より成熟したAPIを持って、それのために行く火花を使用することができれば、私の個人的な提案は、だろう。あなたがフリンクで基本的な必要性に立ち往生すると、あなたは他の場所にもぶつかるかもしれません。

今のところFlinkで周りを見回す時間を無駄にしないで、Hive、Spark、MRのような標準的なオプションを使用して、かなりの時間を無駄にしました。

関連する問題