2016-10-04 9 views
2

私はsample of mapWithState function on Databricks websiteに従っています。スパークストリーミングでmapWithStateでタイムアウトを指定する

trackstatefunctionためのコードは以下の通りである:

def trackStateFunc(batchTime: Time, key: String, value: Option[Int], state: State[Long]): Option[(String, Long)] = { 
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L) 
    val output = (key, sum) 
    state.update(sum) 
    Some(output) 
} 

状態がタイミングアウトされたときに(state.isTimingout()==true)を、私は、関数が再び例外を引き起こす可能性がサテを更新した場合の質問を持っていました。これはサンプルに当てはまりますか?

答えて

3

状態がタイムアウト(state.isTimingout() == true)の場合、関数は再び例外を発生させる可能性のある状態を更新します。

はい、正しいです。 mapWithStateに明示的なタイムアウトを設定し、タイムアウトが発生したときに状態を更新できないため、例外がスローされる最後のタイムアウト繰返しに状態がある間にstate.updateを呼び出します。つまり、(それはすでに削除されている場合

状態を更新することはできません(つまり、 削除()が既に呼び出されている)またはそれが原因 タイムアウトに削除されようとしている。これは、明示的にin the documentation記載されています、isTimingOut()がtrue)。あなたの例では


、追加のチェックが順に次のとおりです。

def trackStateFunc(batchTime: Time, 
        key: String, 
        value: Option[Int], 
        state: State[Long]): Option[(String, Long)] = { 
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L) 
    val output = (key, sum) 
    if (!state.isTimingOut) state.update(sum) 
    Some(output) 
} 

か、タイムアウトが発生するとvalueだけNoneなければならないので、あなたにもパターンマッチングを使用することができます。

def trackStateFunc(batchTime: Time, 
        key: String, 
        value: Option[Int], 
        state: State[Long]): Option[(String, Long)] = { 
    value match { 
    case Some(v) => 
     val sum = v.toLong + state.getOption.getOrElse(0L) 
     state.update(sum) 
     Some((key, sum)) 
    case _ if state.isTimingOut() => (key, state.getOption.getOrElse(0L)) 
    } 
} 

ステートフルストリーミングについては、this blog post(免責条項:私は著者です)。

+0

こんにちは@Yuval、特定のキーがタイムアウトすると、すべての状態はなくなりましたか?あなたは一から始める必要がありますか? – marios

+1

@mariosはい、タイムアウトが経過すると、キーに削除マークが付きます。 –

+0

タイムアウト後に状態を維持する必要がある場合は、自分で行う必要があると思いますか?ありがとうございました! – marios