2017-08-22 11 views
1

私は、マシンをホストするVMからのデータのストリームを送信すると、以下のように私は方法writeToSocket()を使用しています:ここでソケットを使用してFlinkでDataStreamを送信する。シリアル化の問題

joinedStreamEventDataStream.writeToSocket("192.168.1.10", 6998) ; 

joinedStreamEventDataStreamはタイプDataStream<Integer,Integer>です。

誰かが上記の方法にシリアライザをどのように渡すべきか教えてください。ソケットのホストとポートともあなたのデータをシリアル化するために使用さSerializationSchemaインタフェースの実装:

おかげでアドバンス

答えて

1

writeToSocket()方法は3つの引数を取ります。だからあなたの実装では、多分、このように:joinedStreamEventDataStreamDataStream<Integer>タイプを持っている場合

joinedStreamEventDataStream.writeToSocket(
    "192.168.1.10", // host name 
    6998, // port 
    new SerializationSchema<Integer>() { 

     @Override 
     public byte[] serialize(Integer element) { 
      return ByteBuffer.allocate(4).putInt(element).array(); 
     } 
    } 
); 

それは本当です。

3

ソケットからデータをどのように読み取るかは、少し異なります。あなたはそれがデータの文字列表現であることが予想される場合、その後、あなたが経由でそれを行うことができます:

joinedStreamEventDataStream.writeToSocket(
    hostname, 
    port, 
    new TypeInformationSerializationSchema<>(
     joinedStreamEventDataStream.getType(), 
     env.getConfig())); 

の場合:

joinedStreamEventDataStream.map(new MapFunction<Type, String>() { 
    @Override 
    public String map(Type value) throws Exception { 
     return value.toString(); 
    } 
}).writeToSocket(hostname, port, new SimpleStringSchema()); 

あなたはFLINKのシリアル化形式を維持したい場合は、書き込みを行うことができますあなた自身のシリアライゼーション形式で出力したい場合は、Alexによって指摘されたように独自のSerializationSchemaを実装する必要があります。

関連する問題