0
レコードのトピックをポーリングし、consumerRecords: ConsumerRecords[String, String]
として保存するKafkaクライアントがあります。各レコードを繰り返し処理し、(offset, value)
を(k, v)
としてHbaseテーブルに書きたいと思います。これらのレコードをSparkで並列化しようとしていますので、Hbaseに一括挿入するためにRDD
にマップすることができます。Hbaseへの一括挿入:ConsumerRecordはシリアライズ不可能
val hbaseTable: String = "/app/raphattack/TEST"
val conf: Configuration = HBaseConfiguration.create()
val admin: Admin = ConnectionFactory.createConnection(conf).getAdmin
val connection: Connection = ConnectionFactory.createConnection(admin.getConfiguration)
val table: Table = connection.getTable(TableName.valueOf(hbaseTable))
val job = Job.getInstance(conf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat2.configureIncrementalLoadMap(job, table)
val spark: SparkSession = SparkSession.builder.enableHiveSupport.getOrCreate
val records: RDD[ConsumerRecord[String, String]] = spark.sparkContext.parallelize(consumerRecords.toSeq)
val rdd: RDD[(ImmutableBytesWritable, KeyValue)] = records.map(record => {
val kv: KeyValue = new KeyValue(Bytes.toBytes(record.offset()), "cf".getBytes(), "c1".getBytes(), s"${record.value}".getBytes())
(new ImmutableBytesWritable(Bytes.toBytes(record.offset())), kv)
})
rdd.saveAsNewAPIHadoopFile("/tmp/test", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration)
私はこの例外を打つています:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 0, not attempting to retry it.
Exception during serialization: java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
Serialization stack:
- object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = test, partition = 0, offset = 14691347, timestamp = 0, producer = null, key = 1, value = {"id":1.0,"name":"test"}))
はConsumerRecord
オブジェクトが直列作ることが可能ですか?そうでなければ、Hbaseへの書き込み速度を犠牲にすることなく、どのようにレコードを繰り返し処理できますか?