私はkafkaからavroデータを消費しているjavaカフカの消費者を抱えています[トピックx]。コード生成なしでこのデータをHDFSにプッシュすることになっています。アブロdocumentationでは、彼らは次のようなものを使用している:これでAVROデータをHadoopのhdfsに書き込む
GenericRecord e1 = new GenericData.Record(schema);
e1.put("key", "value");
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
dataFileWriter.create(schema, new File("<HDFS file path>"));
dataFileWriter.append(e1);
dataFileWriter.close();
問題は、私はすでにアブロデータを持っている、です。この一連の手順を使用するには、avroパケットをデシリアライズした後に各キーと値のペアを抽出し、それをGenericRecordオブジェクトにプッシュしなければなりません。私は達成しようとしていることの例は見つけませんでした。関連する文書へのヒントやリンクは非常に高く評価されます。
独自のKafka-> HDFS摂取ツールを実装する代わりに、Kafkaの組み込みKafka Connectフレームワークと、[kafka-connect-hdfs](https:// github .com/confluentinc/kafka-connect-hdfs)?リンクされたHDFSシンクコネクタはAvroをそのまま使用できます。 –
いくつかのスキーマレジストリの問題のため、コンフルエントから離れました。したがって、コンフルエントなフレームワークは使用できません。 – Bitswazsky
実行しているスキーマレジストリの問題を精緻化していますか?もちろん、https://github.com/confluentinc/schema-registry/issuesで行うこともできます。 –