状態がタイムアウト(state.isTimingout() == true
)の場合、関数は再び例外を発生させる可能性のある状態を更新します。
はい、正しいです。 mapWithState
に明示的なタイムアウトを設定し、タイムアウトが発生したときに状態を更新できないため、例外がスローされる最後のタイムアウト繰返しに状態がある間にstate.update
を呼び出します。つまり、(それはすでに削除されている場合
状態を更新することはできません(つまり、 削除()が既に呼び出されている)またはそれが原因 タイムアウトに削除されようとしている。これは、明示的にin the documentation記載されています、isTimingOut()
がtrue)。あなたの例では
、追加のチェックが順に次のとおりです。
def trackStateFunc(batchTime: Time,
key: String,
value: Option[Int],
state: State[Long]): Option[(String, Long)] = {
val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L)
val output = (key, sum)
if (!state.isTimingOut) state.update(sum)
Some(output)
}
か、タイムアウトが発生するとvalue
だけNone
なければならないので、あなたにもパターンマッチングを使用することができます。
def trackStateFunc(batchTime: Time,
key: String,
value: Option[Int],
state: State[Long]): Option[(String, Long)] = {
value match {
case Some(v) =>
val sum = v.toLong + state.getOption.getOrElse(0L)
state.update(sum)
Some((key, sum))
case _ if state.isTimingOut() => (key, state.getOption.getOrElse(0L))
}
}
ステートフルストリーミングについては、this blog post(免責条項:私は著者です)。
こんにちは@Yuval、特定のキーがタイムアウトすると、すべての状態はなくなりましたか?あなたは一から始める必要がありますか? – marios
@mariosはい、タイムアウトが経過すると、キーに削除マークが付きます。 –
タイムアウト後に状態を維持する必要がある場合は、自分で行う必要があると思いますか?ありがとうございました! – marios