2017-08-30 9 views
0

レコードのトピックをポーリングし、consumerRecords: ConsumerRecords[String, String]として保存するKafkaクライアントがあります。各レコードを繰り返し処理し、(offset, value)(k, v)としてHbaseテーブルに書きたいと思います。これらのレコードをSparkで並列化しようとしていますので、Hbaseに一括挿入するためにRDDにマップすることができます。Hbaseへの一括挿入:ConsumerRecordはシリアライズ不可能

val hbaseTable: String = "/app/raphattack/TEST" 
val conf: Configuration = HBaseConfiguration.create() 
val admin: Admin = ConnectionFactory.createConnection(conf).getAdmin 
val connection: Connection = ConnectionFactory.createConnection(admin.getConfiguration) 
val table: Table = connection.getTable(TableName.valueOf(hbaseTable)) 

val job = Job.getInstance(conf) 
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) 
job.setMapOutputValueClass(classOf[KeyValue]) 
HFileOutputFormat2.configureIncrementalLoadMap(job, table) 

val spark: SparkSession = SparkSession.builder.enableHiveSupport.getOrCreate 
val records: RDD[ConsumerRecord[String, String]] = spark.sparkContext.parallelize(consumerRecords.toSeq) 

val rdd: RDD[(ImmutableBytesWritable, KeyValue)] = records.map(record => { 
    val kv: KeyValue = new KeyValue(Bytes.toBytes(record.offset()), "cf".getBytes(), "c1".getBytes(), s"${record.value}".getBytes()) 
    (new ImmutableBytesWritable(Bytes.toBytes(record.offset())), kv) 
}) 

rdd.saveAsNewAPIHadoopFile("/tmp/test", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration) 

私はこの例外を打つています:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Failed to serialize task 0, not attempting to retry it. 
Exception during serialization: java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord 
Serialization stack: 
     - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = test, partition = 0, offset = 14691347, timestamp = 0, producer = null, key = 1, value = {"id":1.0,"name":"test"})) 

ConsumerRecordオブジェクトが直列作ることが可能ですか?そうでなければ、Hbaseへの書き込み速度を犠牲にすることなく、どのようにレコードを繰り返し処理できますか?

答えて

0

私はUnitTestで同じことをしようとしています。

基本的にあなたがSparkConf

.set("spark.serializer", classOf[KryoSerializer].getName) 
でシリアライザを設定する必要があります
関連する問題