2016-11-09 1 views
0

私はKfkaとmapwithsta関数を使用してスパークストリーミングアプリケーションを書いています。私はストレージレベルのアプリケーションのスナップショットを紹介しましたenter image description hereメモリからのみ他のモードにmapwithstatedstreamのデフォルトの保存モードをchchching

あなたがKafkaストリームがメモリとディスクの両方で記録されているのを見てもわかるように、私はmapwithste内部ストリームのデフォルトの存在を変更する方法を見つけることができません..このコードのPICE私は巨大になることができ、私のアプリケーションのsatesで

val messages=KafkaUtils.createDirectStream[String, String, (String,String)](ssc, 
    kafkaParams, 
    fromOffsets, 
(r:org.apache.kafka.clients.consumer.ConsumerRecord[String,String]) =>(r.topic(),r.value())) 
    .persist(StorageLevel.MEMORY_AND_DISK_SER) 
.... 
val mapped1=message.map(x=>(x._2.hashCode().toString(),x)).mapWithState(stateSpec1) 

を使用していますので、私はemeoryで内部satesをpresisteする必要があるとdisk..Iは、この上の任意のヘルプをappreciteでしょう。

答えて

0

mapWithStateは、分散メモリ内の状態ストアです。それはOpenHashMapBasedStateMapと呼ばれる内部構造の中にあなたの状態を保存します。あなたが現在守っているのは、KafkaRDDKafkaUtils.createDStreamによって作成されたものです。同じ入力を2回繰り返していない場合は、それを維持する必要はありません。

内部状態が大きい場合でも、クラスタ内に均等に分散する必要があります。これは、すべての卵を1つのバスケットに入れるのではなく、クラスタ全体に広げることを意味します。状態が拡大した場合は、ノードを追加していつでもクラスタをスケールアウトできます。

+0

こんにちは、あなたの応答のおかげで、私はMapWithStaeRDD mapwithState関数の結果としてコンパクトにする方法を探しています。デフォルトでは、既存のモードはメモリのみに基づいているため、状態は大きくなります。私はSpark 1.6.1を使用しています。私の場合、キーが大きく、タイムアウトは何とか無限です。おそらく、内部状態がMemory_Serlizedである代わりにUpdateStaeBykeyを使用する方が良いでしょう。しかし、Mapwithstae RDDをコンパクトにする方法を見つけるのが面白いです(たとえKyroを使用できない.. – mahdi62

+0

また、時にはmapwithsrateRDDがシリアライズ(シリアライズされた1xレプリケート)されることに気付きました。シリアライズされた1xレプリケートされたものは、より少ないスペースを占めます。しかし、ほとんどの場合、これはメモリの逆シリアル化された1xレプリケートです。 ! – mahdi62

関連する問題