2016-05-08 10 views
1

sc().emptyRDD()のような新しいDStramを作成する方法。空のDStreamを作成する

空のJavaPairDStream<String, String>を作成して、別のJavaPairDStreamと結合する必要があります。新しいバッチで組合で使用するバッチの最後に、このdstreamを埋めます。

完全な要件は次のとおりです。 ステージ間のリストを更新したいとします。

  1. updateStateBykey()によって更新されたRDD(キーの)があります。いくつかの新しい キーがすべてのバッチでそれに追加されます。
  2. このRDDにいくつかの入力イベントが加わる(デカルト積)。
  3. 私は結果を計算し、鍵の新しいRDDを作成します。

ですが、次のマイクロバッチではこの更新されたRDDが必要です。 updateStateByKeyを使用することは、この要件のためには機能しません。だから私はバッチの始めにnewKeysとこの更新されたリストに参加したい。空DStreamを作成する

+0

空の 'DStream'は必要ないと思います。私はあなたが必要とするのは、反復ごとに更新される空のRDDへの変更可能な参照だと思います。私はこの答えで与えた例を見てください:http://stackoverflow.com/questions/36944976/in-spark-streaming-how-to-reload-a-lookup-non-stream-rdd-after-n-batches/ 37001685#37001685 – maasg

答えて

2

一つの方法は、JavaStreamingContext.queueStreamを使用することです:

val inputStream = ssc.queueStream(new mutable.Queue[RDD[String]]) 

しかし、あなたは@ zの星で述べたように、DStreamのバッチ間で状態を維持する必要がある場合、1はステートフルを使用することができますSpark 1.6.x以上の場合はPairDStreamFunctions.mapWithState、下位バージョンの場合はPairDStreamFunctions.updateStateByKeyの形式のDStream。

+0

ありがとうございますが、私は 'InputStream 'ではなく' DStream'にする必要があります。 'InputDStream'はすべてのバッチで更新されます。 –

+0

@majidhajibaba 'DStream'は抽象ですが、インスタンスを作成することはできません。 'InputDStream'は' DStream'を継承します。 –

関連する問題