2017-05-08 10 views
2

は、私には、Sparkストリーミング+ Accumuloコネクタとの完全な使用例を探しています。スパークストリーミング+ Accumulo - SerializeをBatchWriterImpl

現在、私はAccumuloテーブルにスパークストリーミング結果を書き込むしようとしているが、私はBatchWriterのための持つNotSerializableExceptionを取得しています。 BatchWriterを直列化する方法の例を教えてもらえますか?下記のコードはAccumuloの文書に基づいています。

現在のコード:ランタイムエラーの間に

val accumuloInstanceName = "accumulo" 
val zooKeepers = "localhost:2181" 
val instance = new ZooKeeperInstance(accumuloInstanceName, zooKeepers) 
val accumuloUser = programOptions.accumuloUser() 
val accumuloPassword = programOptions.accumuloPassword() 
val passwordToken = new PasswordToken(accumuloPassword) 
val connector = instance.getConnector(accumuloUser, passwordToken) 

val accumuloBatchWriterConfig = new BatchWriterConfig 
val accumuloBatchWriterMaxMemory = 32 * 1024 * 1024 
accumuloBatchWriterConfig.setMaxMemory(accumuloBatchWriterMaxMemory) 
val accumuloBatchWriter = connector.createBatchWriter("Data", accumuloBatchWriterConfig) 
fullMergeResultFlatten.foreachRDD(recordRDD => 
    recordRDD.foreach(record => { 
    val mutation = new Mutation(Longs.toByteArray(record.timestamp)) 
    mutation.put("value", "", new Value(Longs.toByteArray(record.value))) 
    mutation.put("length", "", new Value(Longs.toByteArray(record.length))) 
    accumuloBatchWriter.addMutation(mutation) 
    }) 
) 

が発生:

17/05/05 16:55:25 ERROR util.Utils: Exception encountered 
java.io.NotSerializableException: org.apache.accumulo.core.client.impl.BatchWriterImpl 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 

私は、これは非常に一般的なケースであると仮定し、私は任意の単純スパークストリーミング+ accumulo例を見つけることができませんでした。

答えて

0

elserjが指摘したように、接続オブジェクトをシリアル化することは、典型的に正しいパターンではありません。私が見てきたパターンは、RDD.foreachPartition()を使って直接Sparkワーカーノードからの接続を開始することです。これは、作業のバッチごとに接続を作成することができるためです(各レコードごとに新しい接続を作成するのではなく、ほぼ効率的です)。

例:

fullMergeResultFlatten.foreachRDD(recordRDD => { 
    recordRDD.foreachPartition(partitionRecords => { 
    // this connection logic is executed in the Spark workers 
    val accumuloBatchWriter = connector.createBatchWriter("Data", accumuloBatchWriterConfig) 
    partitionRecords.foreach(// save operation) 
    accumuloBatchWriter.close() 
    }) 
}) 
0

あなたはBatchWriterクラスをシリアル化することはできません。私はあなたのコードを修正する方法の提案はありませんが、そのクラスを直列化しようとするのが適切な方法ではないと言うことができます。