2016-06-19 10 views
2

Spark Streamingプログラムをローカルモードで実行して、TCPソケット接続からバッチごとにJSONメッセージを受信しました。Spark Streamingでパラレルのキー/値パーティションをマップする方法

これらのメッセージのIDはキー/値JavaPairDStreamを作成するために使用され、JavaDStream内のRDDの各パーティションにはキー/値ペアがあり、パーティションごとに1つのメッセージがあります。

ここでの目標は、同じパーティションに同じIDを持つメッセージをグループ化して、各パーティションを別のコアで処理するように、それらを並列にマップできるようにすることです。

JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]), 
      StorageLevels.MEMORY_AND_DISK_SER); 
JavaDStream<String>streamData1=streamData2.repartition(1); 

JavaPairDStream<String,String> streamGiveKey= streamData1.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() { 
     @Override 
     public Iterable<Tuple2<String, String>> call(Iterator<String> stringIterator) throws Exception { 

      ArrayList<Tuple2<String,String>>a= new ArrayList<Tuple2<String, String>>(); 

      while (stringIterator.hasNext()){ 
       String c=stringIterator.next(); 
       if(c==null){ 
        return null; 

       } 

       JsonMessage retMap = new Gson().fromJson(c,JsonMessage.class); 
       String key= retMap.getSid(); 
       Tuple2<String,String> b= new Tuple2<String,String>(key,c); 
       a.add(b); 

      } 

      return a; 
     } 
    }); 

ので、このコードの最後に、私はその中にすべてのキー/値のペアで、理由repartition{1}のパーティションを1つだけ持っているRDDとDSTREAMを持っている:

は私のコードです。

同じキーを持つメッセージをグループ化し、別々のパーティションに入れて別々にマップできるようにするにはどうすればよいですか?

+0

他の質問に対する私の前の答えは、このhttp://stackoverflow.com/questions/37908890/howをカバーしていグループ・キー・バリュー・バイ・パーティション・イン・スパーク?そうでない場合は、ここにもっと必要なものをお知らせください。 – javadba

+0

はい、ありがとう、ありがとうございます。私は実際にこれについて別の質問がありますが、私は新しいものを作成します。ありがとうございました。 –

答えて

関連する問題