2016-05-09 9 views
2

私は、汎用レコード型の2つのDataStreamに結合演算子を適用しています。Apache Flink結合演算子が間違った応答を返す

package com.gslab.com.dataSets; 
import java.io.File; 
import java.util.ArrayList; 
import java.util.List; 
import org.apache.avro.Schema; 
import org.apache.avro.generic.GenericData; 
import org.apache.avro.generic.GenericData.Record; 
import org.apache.avro.generic.GenericRecord; 
import org.apache.flink.api.common.functions.MapFunction; 
import org.apache.flink.streaming.api.datastream.DataStream; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 

public class FlinkBroadcast { 
    public static void main(String[] args) throws Exception { 

     final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
     env.setParallelism(2); 

     List<String> controlMessageList = new ArrayList<String>(); 
     controlMessageList.add("controlMessage1"); 
     controlMessageList.add("controlMessage2"); 

     List<String> dataMessageList = new ArrayList<String>(); 
     dataMessageList.add("Person1"); 
     dataMessageList.add("Person2"); 
     dataMessageList.add("Person3"); 
     dataMessageList.add("Person4"); 

     DataStream<String> controlMessageStream = env.fromCollection(controlMessageList); 
     DataStream<String> dataMessageStream = env.fromCollection(dataMessageList); 

     DataStream<GenericRecord> controlMessageGenericRecordStream = controlMessageStream.map(new MapFunction<String, GenericRecord>() { 
      @Override 
      public GenericRecord map(String value) throws Exception { 
       Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/controlMessageSchema.avsc"))); 
       gr.put("TYPE", value); 
       return gr; 
      } 
     }); 

     DataStream<GenericRecord> dataMessageGenericRecordStream = dataMessageStream.map(new MapFunction<String, GenericRecord>() { 
      @Override 
      public GenericRecord map(String value) throws Exception { 
       Record gr = new GenericData.Record(new Schema.Parser().parse(new File("src/main/resources/dataMessageSchema.avsc"))); 
       gr.put("FIRSTNAME", value); 
       gr.put("LASTNAME", value+": lastname"); 
       return gr; 
      } 
     }); 

     //Displaying Generic records 
     dataMessageGenericRecordStream.map(new MapFunction<GenericRecord, GenericRecord>() { 
      @Override 
      public GenericRecord map(GenericRecord value) throws Exception { 
       System.out.println("data before union: "+ value); 
       return value; 
      } 
     }); 

     controlMessageGenericRecordStream.broadcast().union(dataMessageGenericRecordStream).map(new MapFunction<GenericRecord, GenericRecord>() { 
      @Override 
      public GenericRecord map(GenericRecord value) throws Exception { 
       System.out.println("data after union: " + value); 
       return value; 
      } 
     }); 
     env.execute("stream"); 
    } 
} 

出力:あなたが見ることができるように

05/09/2016 13:02:13 Map(2/2) switched to FINISHED 
data after union: {"TYPE": "controlMessage1"} 
data before union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2: lastname"} 
data after union: {"TYPE": "controlMessage1"} 
data before union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1: lastname"} 
data after union: {"TYPE": "controlMessage2"} 
data after union: {"TYPE": "controlMessage2"} 
data after union: {"FIRSTNAME": "Person1", "LASTNAME": "Person1"} 
data before union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4: lastname"} 
data before union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3: lastname"} 
data after union: {"FIRSTNAME": "Person2", "LASTNAME": "Person2"} 
data after union: {"FIRSTNAME": "Person3", "LASTNAME": "Person3"} 
05/09/2016 13:02:13 Map -> Map(2/2) switched to FINISHED 
data after union: {"FIRSTNAME": "Person4", "LASTNAME": "Person4"} 
05/09/2016 13:02:13 Map -> Map(1/2) switched to FINISHED 
05/09/2016 13:02:13 Map(1/2) switched to FINISHED 
05/09/2016 13:02:13 Map(2/2) switched to FINISHED 
05/09/2016 13:02:13 Job execution switched to status FINISHED. 

dataMessageGenericRecordStream内のレコードは、労働組合の後に正しくありません。すべてのフィールド値が最初のフィールド値に置き換えられています。

+0

他の質問にも投稿しました。各DataStreamの 'TypeInformation'を印刷してください。 'DataStream.getType()'、つまり 'System.out.println(dataMessageGenericRecordStream.getType())'を使って取得できます。 – aljoscha

+0

印刷dataMessageGenericRecordStream.getType():GenericType 印刷controlMessageGenericRecordStream.getType():GenericType

+0

とき、私はこれだけGenericRecordための再現性がありますそれを変更してその作業をマップします。どのようなワークアラウンドを提案することができますか –

答えて

1

DataSet APIで同様の問題が発生しました。私はGenericRecordsとしていくつかのAvroファイルを読んでいて、この奇妙な動作を見ました。この回避策をGenericRecordsとして読み込むのではなく、特定のレコード(MyAvroObjectなど)として読み込んだ後、マップを使用してGenericRecordsとして変換/型変換します。

私は、DataSetのAPIを使用して、ユースケースをテストするためにいくつかのコードを書き、それが上記workaround-で動作します

public static void maintest(String[] args) throws Exception { 
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
    env.setParallelism(2); 

    List<String> queryList1 = new ArrayList<String>(); 
    queryList1.add("query1"); 
    queryList1.add("query2"); 

    List<String> queryList2 = new ArrayList<String>(); 
    queryList2.add("QUERY1"); 
    queryList2.add("QUERY2"); 
    queryList2.add("QUERY3"); 
    queryList2.add("QUERY4"); 

    DataSet<String> dataset1 = env.fromCollection(queryList1); 
    DataSet<String> dataset2 = env.fromCollection(queryList2); 

    DataSet<GenericRecord> genericDS1 = dataset1.map(new MapFunction<String, GenericRecord>() { 
     @Override 
     public GenericRecord map(String value) throws Exception { 
      Query query = Query.newBuilder().setQuery(value).build(); 
      return (GenericRecord) query; 
     } 
    }); 

    DataSet<GenericRecord> genericDS2 = dataset2.map(new MapFunction<String, GenericRecord>() { 
     @Override 
     public GenericRecord map(String value) throws Exception { 
      SearchEngineQuery searchEngineQuery = SearchEngineQuery.newBuilder().setSeQuery(value).build(); 
      return (GenericRecord) searchEngineQuery; 
     } 
    }); 

    genericDS2.map(new MapFunction<GenericRecord, GenericRecord>() { 
     @Override 
     public GenericRecord map(GenericRecord value) throws Exception { 
      System.out.println("DEBUG: data before union: " + value); 
      return value; 
     } 
    }); 

    genericDS1.union(genericDS2).map(new MapFunction<GenericRecord, GenericRecord>() { 
     @Override 
     public GenericRecord map(GenericRecord value) throws Exception { 
      System.out.println("DEBUG: data after union: " + value); 
      return value; 
     } 
    }).print(); 
} 
クエリとSEARCHENGINEクエリがあなたのコントロールメッセージリストとデータメッセージに似た私のアブロオブジェクト(ある

リスト)。

出力:

{"query": "query1"} 
{"se_query": "QUERY1"} 
{"se_query": "QUERY3"} 
{"query": "query2"} 
{"se_query": "QUERY2"} 
{"se_query": "QUERY4"} 
2

私は別の問題のためにこれを調査し(まだGenericRecordを含む)数日を費やしてきたし、根本的な原因と解決策を発見しました。

根本原因:Apacheのアブロ「Schema.class」内の「フィールド」の位置は一時的なものでFLINKパイプライン内直列化復元時にKryoによってシリアライズされませんので、位置「0」に初期化されます。

「JIRA AVRO-1476」を参照してください。具体的には、カイロのシリアル化について説明しています。

この

はソリューション

アブロ1.7.7で修正されました:FLINK(またはそれ以降)アブロ1.7.7を使用する必要があります。私はflink-dist_2.11-1.1.3.jar内のAvroクラスを置き換えることで、ローカルマシンの修正を確認し、問題を修正しました。今、このためのPRがあるhttps://issues.apache.org/jira/browse/FLINK-5039

私はこのためにJIRAの問題を更新しhttps://github.com/apache/flink/pull/2953

が、私はそれが1.1.4と1.2.0のビルドFLINKに含まれることを期待しています。

関連する問題