2016-05-02 3 views
0

用ソケットエラーを読んでタイムアウトしまし入門 - > {TopicX、TopicY、TopicZ}、AppBを - > {TopicX、TopicZ}、 AppC - > {TopicX、TopicY}。プロデューサーとコンシューマーはアプリ固有のものになります。 3つのブローカーが異なるポートを持つ三つの異なる設定ファイル内のパーティション1,2,3を有する Iセットアップカフカクラスタ。そして、カフカのサーバー(クラスター)が、私はセットアップにAppAのように同じキューを有する三の同様のアプリケーションのためのカフカのクラスタ型のセットアップをしたいカフカクラスタ設定

は、私はだから私は

 $producer->setRequireAck(-1); 
     $producer->setMessages("TopicX", 0, array(json_encode($this->data))); 
     $producer->send(); 

などのアプリケーションAのための生産者コードと

などのアプリケーションBのために使用プロデューサーコードを使用 http://github.com/nmred/kafka-php

でカフカのPHPラッパーを使用しています開始します

 $producer->setRequireAck(-1); 
     $producer->setMessages("TopicX", 1, array(json_encode($this->data))); 
     $producer->send(); 

のように。

は、その後、私は

 $queues = array("TopicX", "TopicY", "TopicZ"); 
     while(true) { 
      foreach($queues as $queue) { 
       $consumer = \Kafka\Consumer::getInstance('localhost:2181'); 
       $consumer->setGroup('testgroup'); 
       $consumer->setPartition($queue, 0); 
       $result = $consumer->fetch(); 
      } 
     } 

のような3つのアプリケーションのための私の消費者のスクリプトを作った。しかし、私は任意のアプリケーションのために、消費者のスクリプトを実行しようとしたとき、私は

のようなエラーが出ます「750437のバイトを読みながらソケットの読み込みタイムアウトしました750323で「

を行くためにバイトが私はちょうど私が

ようないくつかのカフカの設定パラメータを変更しようとした私は、この問題を解決するにはどうすればよいかわかりません210
zookeeper.connection.timeout.ms=24000   # Initially 6000 
replica.socket.timeout.ms=15000      # Not exists in default file 

が、それは働いていません。あなたが実際にパーティションを設定し、結果をフェッチし、ソースを区別するためにサイクルを使用し、サイクルの外に移動し、あなたのforeachのサイクルにそれを宣言することにより、カフカの消費者を破壊している

答えて

1

$queues = array("TopicX", "TopicY", "TopicZ"); 
$consumer = \Kafka\Consumer::getInstance('localhost:2181'); 
$consumer->setGroup('testgroup'); 

foreach($queues as $queue) { 
    $consumer->setPartition($queue, 0); 
} 

while(true) { 
    $result = $consumer->fetch(); 
    foreach ($result as $topicName => $topic) { 
    foreach ($topic as $partId => $partition) { 
     var_dump($partition->getHighOffset()); 
     foreach ($partition as $message) { 
     var_dump((string)$message); 
     } 
    } 
    } 
} 

は、例を参照するにはカフカのPHPプロジェクトのgithubののREADMEを参照してください。

+0

なしエラーなしで働いているおかげで、 –