2017-04-06 4 views
0

そう仮定する断続時間をセッション時間を決定するX分/秒どのようにカフカの低レベルのプロセッサのAPIで、MS

{ 
props.put("group.max.session.timeout.ms", X*2); 
props.put("session.timeout.ms", x); 
props.put("request.timeout.ms", X*2); 
} 

を超えていることはカフカのセッションタイムアウトを設定する正しい方法では、低レベルのプロセッサのAPIをストリーム?

+0

org.apache.kafka.clients.consumer.CommitFailedException:グループが既に再調整されて割り当て済みであるため、コミットを完了できませんパーティションを別のメンバーに割り当てます。これは、後でpoll()を呼び出す間の時間が、設定されたsession.timeout.msよりも長いことを意味します。これは、通常、ポーリングループが過度の時間のメッセージ処理に費やしていることを意味します。これは、セッションタイムアウトを増やすか、poll()で返されたバッチの最大サイズをmax.poll.recordsで減らすことで対処できます。 –

+0

kafkaのどのバージョンを使用しているのですか?彼らは独立した0.10.2.0 – katsharp

+0

@katsharpで別々のスレッドにポーリングと処理を分割します - まだ 'max.poll.interval.ms'があります –

答えて

0
  • group.max.session.timeout.msは、ストリーム内の消費者の設定のためのブローカーの設定(参照http://kafka.apache.org/documentation/#brokerconfigs
  • あり、接頭辞consumer.を使用することをお勧めします:props.put("consumer.session.timeout.ms", X)
  • ます(ストリームはこのためのデフォルト値を変更しますInteger.MAX_VALUEmax.poll.interval.msを設定する必要がありますInteger.MAX_VALUEも)
関連する問題