Apache Kafkaのバージョン0.8.2.1から0.9.0.0へのアプリケーションのコードの移行に関連する問題が発生しています。カフカ0.8.2.1からカフカ0.9.0への移植アプリケーション。オフセットオフセット問題
私たちは、Clouderaのリリースによってカフカのバージョンに、この場合には、参照している:
kafka_2.10-0.8.2.0-カフカ-1.3.2
kafka_2.11-0.9.0- kafka-2.0.2
__consumer_offsetsメタデータのトピックでオフセットを読み書きするときに問題が検出されました。 特に、BlockingChannelを使用してKafkaブローカに接続し、receive()メソッド呼び出し時にEOFExceptionを取得します。特に
:
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel (NetworkReceive.java:83)
at kafka.network.BlockingChannel.readCompletely (BlockingChannel.scala: 129)
at kafka.network.BlockingChannel.receive (BlockingChannel.scala: 120)
1つの原因は、カフカのAPIの2つのバージョン間の違いである可能性があります。
def receive(): Receive = {
if(!connected)
throw new ClosedChannelException()
val response = new BoundedByteBufferReceive()
response.readCompletely(readChannel)
response
}
を次のように0.8.2
は、我々のアプリでは、我々は
ConsumerMetadataResponse.readFrom(channel.receive().buffer())
受け取るメソッドを呼び出すカフカ
は、我々はそれがkafka.networkを返す見ることができるようです.Receiveは、特性kafka.network.Transmissionを拡張する特性です。これで バッファ法が存在し、受信
0.9.0我々は
GroupCoordinatorResponse.readFrom(channel.receive().payload())
に前の行を変更カフカ
def buffer: ByteBuffer = {
expectComplete()
contentBuffer
}
kafka.network.BoundedByteBufferReceiveでオーバーライドされ、受信しますこのバージョンのAPIのメソッドは、次のとおりです。
def receive(): NetworkReceive = {
if(!connected)
throw new ClosedChannelException()
val response = readCompletely(readChannel)
response.payload().rewind()
response
}
private def readCompletely(channel: ReadableByteChannel): NetworkReceive = {
val response = new NetworkReceive
while (!response.complete())
response.readFromReadableChannel(channel)
response
}
ここからわかるように、これはkafka.network.NetworkReceiveを返します。これは、kafka.network.Receiveというインターフェースを実装したクラスで、javaで書かれ、以前のものとまったく異なっています。 は、ここにはバッファ法が、どのように我々は解決することができ
private ByteBuffer buffer;
の内容を返すだけのペイロード方法はありませんか? 事前に感謝します