2016-08-23 69 views
4

私はカフカを使用してストリーミングするために毎日何千ものファイルを生成しています。 ファイルを読み込もうとすると、各行は別のメッセージとして扱われます。カフカを使用してファイルを転送することはできますか?

私は、カフカのトピックの1つのメッセージとして、そして消費者と一緒に、個々のファイルのコンテンツをカフカのトピックから別のファイルに書き込む方法を知りたいと思います。

+0

あなたはKafka Connectを見ましたか? http://docs.confluent.io/3.0.0/connect/index.html –

+0

はい私はそれを認識しています。ここでどうすれば使えますか?このシナリオでは、ファイルを読み込むと、各行は別々のメッセージとして扱われますが、各ファイルを長い単一のメッセージにしたいのです。 (ファイルのサイズが30〜40行の場合があります) – Nahush

+0

Javaクライアント、コンソールプロデューサ、その他を使用していますか? –

答えて

3

ファイルを処理するための独自のシリアライザ/デシリアライザを作成できます。たとえば :

プロデューサーの小道具:

props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer); 
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, YOUR_FILE_SERIALIZER_URI); 

消費者小道具:

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); 
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, YOUR_FILE_DESERIALIZER_URI); 

シリアライザ

public class FileMapSerializer implements Serializer<Map<?,?>> { 

@Override 
public void close() { 

} 

@Override 
public void configure(Map configs, boolean isKey) { 
} 

@Override 
public byte[] serialize(String topic, Map data) { 
    ByteArrayOutputStream bos = new ByteArrayOutputStream(); 
    ObjectOutput out = null; 
    byte[] bytes = null; 
    try { 
     out = new ObjectOutputStream(bos); 
     out.writeObject(data); 
     bytes = bos.toByteArray(); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } finally { 
     try { 
      if (out != null) { 
       out.close(); 
      } 
     } catch (IOException ex) { 
      // ignore close exception 
     } 
     try { 
      bos.close(); 
     } catch (IOException ex) { 
      // ignore close exception 
     } 
    } 
    return bytes; 
} 
} 

デシリアライザ

public class MapDeserializer implements Deserializer<Map> { 

@Override 
public void close() { 

} 

@Override 
public void configure(Map config, boolean isKey) { 

} 

@Override 
public Map deserialize(String topic, byte[] message) { 
    ByteArrayInputStream bis = new ByteArrayInputStream(message); 
    ObjectInput in = null; 
    try { 
     in = new ObjectInputStream(bis); 
     Object o = in.readObject(); 
     if (o instanceof Map) { 
      return (Map) o; 
     } else 
      return new HashMap<String, String>(); 
    } catch (ClassNotFoundException e) { 
     e.printStackTrace(); 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } finally { 
     try { 
      bis.close(); 
     } catch (IOException ex) { 
     } 
     try { 
      if (in != null) { 
       in.close(); 
      } 
     } catch (IOException ex) { 
      // ignore close exception 
     } 
    } 
    return new HashMap<String, String>(); 
} 
} 

final Object kafkaMessage = new ProducerRecord<String, Map>((String) <TOPIC>,Integer.toString(messageId++), messageMap); 

messageMap次の形式で

作曲のメッセージは、キーと値としてファイルの内容としてファイル名が含まれています。 値は直列化可能なオブジェクトにすることができます。 したがって、各メッセージには、File_NameとFileContentマップのMapが含まれます。単一値または複数値にすることができます。

関連する問題