Spark StreamingでTCP/IPソケット接続から1秒あたりの整数を受け取るJavaDStreamReceiverがあるとします。 それから私は100の整数があるまでリストにそれを保存します。 その後、私はそのRDDを4つのパーティションに分割したいと思います。私のPCのコアごとに1つずつ、これらのパーティションをパラレルでマップします。だからこのようなもの:RDD内のすべてのパーティションを処理した後で、Spark Streamingで入力を受け取る方法は?
public final class sparkstreaminggetjson {
private static final Pattern SPACE = Pattern.compile(" ");
private static Integer N=100;
private static List<Integer> allInputValues= new List<Integer>();
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("sparkstreaminggetjson");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
JavaReceiverInputDStream<Integer> receivedStream = ssc.socketTextStream(
args[0],Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<List<Integer>> storeValuesInList=receivedStream.map(// if N<100, keeps inserting integers; if N>100, inserts the new value into the List and removes the oldest value);
JavaDStream<List<Integer>> partitionedList=storeValuesInList.repartition(4);
JavaDStream<List<Integer>> someCalculations=partionedList.map(//some calculations)
JavaDStream<List<Integer>> otherCalculations=someCalculations.map(//other calculations)
...
finalStream.print();
これは私の質問です。私は新しい入力を受け取り、自分のRDDの最初のパーティションに配置し、RDDの最後のパーティションから最後の要素を削除する、FILOモデルを実装したいと思います。だから基本的に私は、元のサイズを維持し、私のリストから整数をポーリングします。その後、私はいつものように各パーティションを並列に処理します。
私の問題は次のとおりです。私のパーティションの処理が終了すると、アプリケーションはpartitionedList
ではなくreceivedStream
に戻ります。つまり、パーティションごとに新しい入力が得られますが、これは私が望むものではありません。私は各パーティションを処理したいだけで、新しい入力を得るためにはreceivedStream
に戻ってください。
どうすればいいですか? receivedStream
の後にmap()
を置き換える必要がありますか?
ありがとうございました。
私は答えを出そうとしましたが、私はあなたが尋ねたことを誤解している可能性があることを認識しています。 – Vale
@Valeあなたの答えをありがとうが、あなたが正しいです、それは私が知りたかったものではありません。今は、4つのパーティション、それぞれ25のintを持つRDDを持っています。私が望むのは、すべてのパーティションを処理し、新しい入力のみを受け取ることです。私のコードが今、パーティションが処理されるたびに、私は新しい入力を受け取ります(それは受信側、map()メソッドの最初に移動します) –
理想的には、パラレルの各パーティションを同時に、コアだけを入力してから入力してください。混乱していないことを願っています。 –