私はKfkaとmapwithsta関数を使用してスパークストリーミングアプリケーションを書いています。私はストレージレベルのアプリケーションのスナップショットを紹介しましたメモリからのみ他のモードにmapwithstatedstreamのデフォルトの保存モードをchchching
あなたがKafkaストリームがメモリとディスクの両方で記録されているのを見てもわかるように、私はmapwithste内部ストリームのデフォルトの存在を変更する方法を見つけることができません..このコードのPICE私は巨大になることができ、私のアプリケーションのsatesで
val messages=KafkaUtils.createDirectStream[String, String, (String,String)](ssc,
kafkaParams,
fromOffsets,
(r:org.apache.kafka.clients.consumer.ConsumerRecord[String,String]) =>(r.topic(),r.value()))
.persist(StorageLevel.MEMORY_AND_DISK_SER)
....
val mapped1=message.map(x=>(x._2.hashCode().toString(),x)).mapWithState(stateSpec1)
を使用していますので、私はemeoryで内部satesをpresisteする必要があるとdisk..Iは、この上の任意のヘルプをappreciteでしょう。
こんにちは、あなたの応答のおかげで、私はMapWithStaeRDD mapwithState関数の結果としてコンパクトにする方法を探しています。デフォルトでは、既存のモードはメモリのみに基づいているため、状態は大きくなります。私はSpark 1.6.1を使用しています。私の場合、キーが大きく、タイムアウトは何とか無限です。おそらく、内部状態がMemory_Serlizedである代わりにUpdateStaeBykeyを使用する方が良いでしょう。しかし、Mapwithstae RDDをコンパクトにする方法を見つけるのが面白いです(たとえKyroを使用できない.. – mahdi62
また、時にはmapwithsrateRDDがシリアライズ(シリアライズされた1xレプリケート)されることに気付きました。シリアライズされた1xレプリケートされたものは、より少ないスペースを占めます。しかし、ほとんどの場合、これはメモリの逆シリアル化された1xレプリケートです。 ! – mahdi62