2016-09-05 5 views
3

新しいファイルのディレクトリを監視し、binay Avro形式でKafkaトピックに送信する小さなJavaプログラムを作成しました。 私はAvroを初めて、私はAvroのドキュメントとオンラインの例を使用してこれを書きました。 監視部分はうまく動作しますが、実行時にAvroのシリアル化に失敗するとプログラムが失敗します。このエラースタックが発生します:Bはavroレコードをシリアル化しようとするとjava.nio.ByteBufferにキャストできません

Exception in thread "main" java.lang.ClassCastException: [B cannot be cast to java.nio.ByteBuffer 
    at org.apache.avro.generic.GenericDatumWriter.writeBytes(GenericDatumWriter.java:260) 
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:116) 
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73) 
    at org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:153) 
    at org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:143) 
    at org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:105) 
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:73) 
    at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:60) 
    at producers.AvroBinaryProducer.buildAvroData(AvroBinaryProducer.java:90) 
    at producers.AvroBinaryProducer.start(AvroBinaryProducer.java:120) 
    at producers.AvroBinaryProducer.main(AvroBinaryProducer.java:140) 
C:\Users\guys\AppData\Local\NetBeans\Cache\8.1\executor-snippets\run.xml:53: Java returned: 1 
BUILD FAILED (total time: 7 seconds) 

この行は失敗しています:writer.write(datum、encoder);

ByteBufferが期待されているようですが、ドキュメントや例ではGenericRecordを渡すべきです。私は間違って何をしていますか?

はここに私のコードです(そこファイルから設定のparamsを読み込みconfigという別のユーティリティクラスがあるが、私はここでそれを含んでいませんでした):

package producers; 

import java.io.ByteArrayOutputStream; 
import java.io.File; 
import java.io.IOException; 
import java.nio.ByteBuffer; 
import java.nio.file.FileSystems; 
import java.nio.file.Files; 
import java.nio.file.Path; 
import java.nio.file.WatchService; 
import java.util.Properties; 
import org.apache.avro.Schema; 
import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.Producer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import static java.nio.file.StandardWatchEventKinds.*; 
import java.nio.file.WatchEvent; 
import java.nio.file.WatchKey; 
import java.util.logging.Level; 
import java.util.logging.Logger; 
import org.apache.avro.generic.GenericData; 
import org.apache.avro.generic.GenericDatumWriter; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.avro.io.BinaryEncoder; 
import org.apache.avro.io.DatumWriter; 
import org.apache.avro.io.EncoderFactory; 


/** 
* 
* @author guys 
*/ 
public class AvroBinaryProducer { 
    String mySchema; 
    Schema avroSchema; 
    Config myConf; 
    Producer<String, byte[]> producer; 
    String topic, bootstrapServers, watchDir; 
    Path path; 
    ByteArrayOutputStream out; 
    BinaryEncoder encoder; 


    public AvroBinaryProducer(String configPath) throws IOException 
    { 
     // Read initial configuration 
     myConf=new Config(configPath); 

     // first setting the kafka producer stuff 
     Properties props = new Properties(); 
     props.put("bootstrap.servers",myConf.get("bootstrap.servers"));   
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 
     producer = new KafkaProducer<>(props); 
     topic=myConf.get("topic"); 
     watchDir=myConf.get("watchdir"); 
     path=FileSystems.getDefault().getPath(watchDir); 

     // Now define the Avro schema 
     mySchema="{\n" + 
     " \"type\": \"record\",\n" + 
     " \"name\": \"photo\",\n" + 
     " \"fields\": [\n" + 
     "  {\"name\": \"name\", \"type\": \"string\"},\n" + 
     "  {\"name\": \"data\", \"type\": \"bytes\"}\n" + 
     " ]\n" + 
     "}"; 

     Schema.Parser parser = new Schema.Parser(); 
     avroSchema=parser.parse(mySchema); 

     out = new ByteArrayOutputStream(); 
     encoder = EncoderFactory.get().binaryEncoder(out, null); 


    } 

    private byte[] buildAvroData(String name, byte[] data) throws IOException 
    {  
     out.reset();      
     GenericRecord datum=new GenericData.Record(avroSchema);   
     datum.put("name", name); 
     datum.put("data",data); 
     DatumWriter<GenericRecord> writer=new GenericDatumWriter<>(avroSchema);   
     writer.write(datum,encoder); 
    encoder.flush(); 
     return out.toByteArray();   
    } 

    private void start() throws IOException, InterruptedException 
    { 
     String fileName; 
     byte[] fileData;  

     WatchService watcher = FileSystems.getDefault().newWatchService(); 
     WatchKey key=path.register(watcher, ENTRY_CREATE); 

     while (true) 
     { 
      key = watcher.take(); 
      // The code gets beyond this point only when a filesystem event occurs 

      for (WatchEvent<?> event: key.pollEvents()) 
      { 
       WatchEvent.Kind<?> kind = event.kind(); 
       if (kind==ENTRY_CREATE) 
       { 
        WatchEvent<Path> ev = (WatchEvent<Path>)event; 
        Path filename = ev.context(); 
        fileName=filename.toString(); 
        System.out.println("New file "+fileName+" found !"); 
        // We need this little delay to make sure the file is closed before we read it 
        Thread.sleep(500); 
        fileData=Files.readAllBytes(FileSystems.getDefault().getPath(watchDir+File.separator+fileName)); 
        publishMessage(buildAvroData(fileName,fileData)); 
       } 
      } 
      key.reset(); 
     } 
    } 

    private void publishMessage(byte[] bytes) 
    {   
     ProducerRecord <String, byte[]> data =new ProducerRecord<>(topic, bytes); 
     producer.send(data); 

    } 

    public static void main (String args[]) 
    { 
     AvroBinaryProducer abp; 
     try { 
      abp=new AvroBinaryProducer(args[0]); 
      try { 
       abp.start(); 
      } catch (InterruptedException ex) { 
       Logger.getLogger(AvroBinaryProducer.class.getName()).log(Level.SEVERE, null, ex); 
      } 
     } catch (IOException ex) { 
      Logger.getLogger(AvroBinaryProducer.class.getName()).log(Level.SEVERE, null, ex); 
     } 
    } 
} 

ありがとうございます!

+0

同じエラーが発生しました。 –

答えて

3

これは私がそれを解決した方法です。 ByteBufferは、それがByteBufferを与えることを期待する場合。 私はに機能を変更:私はちょうどのByteBufferを使用してデータを包み、これは働いていた

private byte[] buildAvroData(String name, byte[] data) throws IOException 
{  
    out.reset(); 
    GenericRecord datum=new GenericData.Record(avroSchema);   
    datum.put("name", name); 
    datum.put("data",ByteBuffer.wrap(data)); 
    DatumWriter<GenericRecord> writer=new GenericDatumWriter<>(avroSchema);   
    writer.write(datum,encoder); 
encoder.flush(); 
    return out.toByteArray(); 

。 消費者側のByteBufferからバイト配列を抽出することを忘れないでください。

+0

あなた自身の答えを受け入れることをお勧めします:) –

+0

Worked!今後の読者は、Scalaソリューションをご希望の場合は私にpingしてください。 –

関連する問題