2017-06-18 8 views
0

私はカフカさんにとって非常に新しいので、プロデューサー/コンシューマーモデルで作業を始めました。
私はKafka producerのプログラムを書いています。これはMySQLからテーブルを読み込み、コマンドプロンプトでそれらのメッセージを読んでいます。
問題は、テーブルの最後の列のデータしか見ることができないことです。私のカフカプロデューサークラスのバグ

MySQLのテーブル構造とデータ:

+----------+---------+----------------+ 
| dept  | empName | salary(double) | 
+----------+---------+----------------+ 
| Engg  | Fred | 2000   | 
| Engg  | Bob  | 3000   | 
| Engg  | Joe  | 1000   | 
| Arts  | Jack | 5000   | 
| Commerce | Jill | 2400   | 
| Arts  | James | 3000   | 
| Commerce | Rob  | 8700   | 
+----------+---------+----------------+ 

カフカプロデューサークラス:私の消費者からの

package com.kafka.producer; 

import java.sql.Connection; 
import java.sql.DriverManager; 
import java.sql.PreparedStatement; 
import java.sql.ResultSet; 
import java.sql.SQLException; 
import java.util.Properties; 

import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 

@SuppressWarnings("deprecation") 
public class KafkaMySqlProducer { 
    public static void main(String[] args) { 

     Connection con; 
     PreparedStatement pstmnt; 
     ResultSet rs; 

     Properties props = new Properties(); 
     props.put("zk.connect", "localhost:2181"); 
     props.put("serializer.class","kafka.serializer.StringEncoder"); 
     props.put("metadata.broker.list","localhost:9092"); 
     ProducerConfig config = new ProducerConfig(props); 
     Producer producer = new Producer(config); 

     try { 
      Class.forName("com.mysql.jdbc.Driver"); 
      con = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb","root","cloudera"); 
      pstmnt = con.prepareStatement("select * from department"); 
      rs = pstmnt.executeQuery(); 
      while(rs.next()) { 
       String name = rs.getString(1); 
       String department = rs.getString(2); 
       String salary = " " + rs.getDouble(3); 
       producer.send(new KeyedMessage("dbrec", name, department, salary)); 
      } 
      con.close(); 
     } catch (ClassNotFoundException e) { 
      e.printStackTrace(); 
     } catch (SQLException e) { 
      e.printStackTrace(); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 

    } 
} 

出力:

[[email protected] kafka_2.11-0.10.2.0]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dbrec --from-beginning 
[2017-06-18 10:23:22,428] WARN Error while fetching metadata with correlation id 2 : {dbrec=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) 
[2017-06-18 10:23:22,628] WARN The following subscribed topics are not assigned to any members in the group console-consumer-33197 : [dbrec] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) 

2000.0 
3000.0 
1000.0 
5000.0 
2400.0 
3000.0 
8700.0 

私は最後列からデータを取得していますテーブルの
私がここでやっている間違いは誰に教えてもらえますか?

+0

新しいプロジェクトをKafkaで開始する場合は、新しいプロデューサークラスとコンシューマークラスの使用を開始することをお勧めします。私はあなたがブローカーに関する情報を入手するためにZookeeperに接続しているプロデューサーと共に古いものを使用していることを知っています。 – ppatierno

答えて

2

4つの引数を持つKeyedMessageコンストラクタの最後のパラメータが実際のメッセージです。

クラスはあなたのすべての値のリストを受け入れるだけではありません。

特に指定したパラメータは​​です。 messageからtopic

+1

これで訂正しました。 producer.send(新しいKeyedMessage( "dbrec"、名前+ "" +部+ "" +給与));今働いています。 – Sidhartha

+0

また、私はこれらの2つの行のために廃止されたメッセージを参照してください。 \t \t ProducerConfig config = new ProducerConfig(props); \t \tプロデューサープロデューサ= newプロデューサ(config); これらのメソッドの新しいオプションが何であるか教えてください。私はそれらのメソッドを実装できます。 – Sidhartha

+0

わかりませんが、JavaDocがおそらくそれに答える可能性があります。 –