2016-06-11 14 views
1

Spark StreamingでTCP/IPソケット接続から1秒あたりの整数を受け取るJavaDStreamReceiverがあるとします。 それから私は100の整数があるまでリストにそれを保存します。 その後、私はそのRDDを4つのパーティションに分割したいと思います。私のPCのコアごとに1つずつ、これらのパーティションをパラレルでマップします。だからこのようなもの:RDD内のすべてのパーティションを処理した後で、Spark Streamingで入力を受け取る方法は?

public final class sparkstreaminggetjson { 
private static final Pattern SPACE = Pattern.compile(" "); 
private static Integer N=100; 
private static List<Integer> allInputValues= new List<Integer>(); 

public static void main(String[] args) throws Exception { 

    SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkstreaminggetjson"); 


    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); 

    JavaReceiverInputDStream<Integer> receivedStream = ssc.socketTextStream(
     args[0],Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); 

    JavaDStream<List<Integer>> storeValuesInList=receivedStream.map(// if N<100, keeps inserting integers; if N>100, inserts the new value into the List and removes the oldest value); 

    JavaDStream<List<Integer>> partitionedList=storeValuesInList.repartition(4); 


    JavaDStream<List<Integer>> someCalculations=partionedList.map(//some calculations) 

    JavaDStream<List<Integer>> otherCalculations=someCalculations.map(//other calculations) 

... 

finalStream.print(); 

これは私の質問です。私は新しい入力を受け取り、自分のRDDの最初のパーティションに配置し、RDDの最後のパーティションから最後の要素を削除する、FILOモデルを実装したいと思います。だから基本的に私は、元のサイズを維持し、私のリストから整数をポーリングします。その後、私はいつものように各パーティションを並列に処理します。

私の問題は次のとおりです。私のパーティションの処理が終了すると、アプリケーションはpartitionedListではなくreceivedStreamに戻ります。つまり、パーティションごとに新しい入力が得られますが、これは私が望むものではありません。私は各パーティションを処理したいだけで、新しい入力を得るためにはreceivedStreamに戻ってください。

どうすればいいですか? receivedStreamの後にmap()を置き換える必要がありますか?

ありがとうございました。

+0

私は答えを出そうとしましたが、私はあなたが尋ねたことを誤解している可能性があることを認識しています。 – Vale

+0

@Valeあなたの答えをありがとうが、あなたが正しいです、それは私が知りたかったものではありません。今は、4つのパーティション、それぞれ25のintを持つRDDを持っています。私が望むのは、すべてのパーティションを処理し、新しい入力のみを受け取ることです。私のコードが今、パーティションが処理されるたびに、私は新しい入力を受け取ります(それは受信側、map()メソッドの最初に移動します) –

+0

理想的には、パラレルの各パーティションを同時に、コアだけを入力してから入力してください。混乱していないことを願っています。 –

答えて

1

は、私の知る限り理解しているとして、あなたは窓を使用することができます:毎秒1つの整数はあなたがRDDあたり100 int型を持つことになります

JavaDstream integers = your stream; 
JavaDstream hundredInt = integers.window(Seconds(100)); 

この方法を使用することができますを意味します。バッファリングを1として

newInt ->[1...25][26...50][51...75][76...100] ->lastInt
これは私が理解しているものですので、あなたが最後の計算を維持したい場合は、あなたの新しい100個のint型をrdd.cache()し、そこから手の込んだことができます。そのかrdd.checkpointです。

関連する問題