2017-01-18 15 views
3

Apache Kafkaのバージョン0.8.2.1から0.9.0.0へのアプリケーションのコードの移行に関連する問題が発生しています。カフカ0.8.2.1からカフカ0.9.0への移植アプリケーション。オフセットオフセット問題

私たちは、Clouderaのリリースによってカフカのバージョンに、この場合には、参照している:

kafka_2.10-0.8.2.0-カフカ-1.3.2

kafka_2.11-0.9.0- kafka-2.0.2

__consumer_offsetsメタデータのトピックでオフセットを読み書きするときに問題が検出されました。 特に、BlockingChannelを使用してKafkaブローカに接続し、receive()メソッド呼び出し時にEOFExceptionを取得します。特に

java.io.EOFException 
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel (NetworkReceive.java:83) 
at kafka.network.BlockingChannel.readCompletely (BlockingChannel.scala: 129) 
at kafka.network.BlockingChannel.receive (BlockingChannel.scala: 120) 

1つの原因は、カフカのAPIの2つのバージョン間の違いである可能性があります。

def receive(): Receive = { 
    if(!connected) 
     throw new ClosedChannelException() 

    val response = new BoundedByteBufferReceive() 
    response.readCompletely(readChannel) 

    response 
} 

を次のように0.8.2

は、我々のアプリでは、我々は

ConsumerMetadataResponse.readFrom(channel.receive().buffer()) 

受け取るメソッドを呼び出すカフカ

は、我々はそれがkafka.networkを返す見ることができるようです.Receiveは、特性kafka.network.Transmissionを拡張する特性です。これで バッファ法が存在し、受信

0.9.0我々は

GroupCoordinatorResponse.readFrom(channel.receive().payload()) 

に前の行を変更カフカ

def buffer: ByteBuffer = { 
    expectComplete() 
    contentBuffer 
    } 

kafka.network.BoundedByteBufferReceiveでオーバーライドされ、受信しますこのバージョンのAPIのメソッドは、次のとおりです。

def receive(): NetworkReceive = { 
    if(!connected) 
     throw new ClosedChannelException() 

    val response = readCompletely(readChannel) 
    response.payload().rewind() 

    response 
    } 

    private def readCompletely(channel: ReadableByteChannel): NetworkReceive =  { 
    val response = new NetworkReceive 
    while (!response.complete()) 
     response.readFromReadableChannel(channel) 
    response 
    } 

ここからわかるように、これはkafka.network.NetworkReceiveを返します。これは、kafka.network.Receiveというインターフェースを実装したクラスで、javaで書かれ、以前のものとまったく異なっています。 は、ここにはバッファ法が、どのように我々は解決することができ

    private ByteBuffer buffer; 

の内容を返すだけのペイロード方法はありませんか? 事前に感謝します

答えて

0

カフカ0.9は、カフカ0.8.2ブローカーとの下位互換性を達成するために古いカフカ消費者を維持します。 あなたはKafka 0.9のメッセージを読むためにKafka 0.9にまだ存在する古い消費者を使用しています。 Kafka 0.9ブローカーからデータを読み取るには、Kafka 0.9の新しいコンシューマーAPIを使用するようにしてください。

これが役に立ちます。

関連する問題