2016-04-22 8 views
2

私はusginライブラリclj-kafkaを試しています。clj-kafka - 消費者が空

ここに私のコード

(use [clj-kafka.producer] 
     [clj-kafka.zk] 
     [clj-kafka.consumer.zk] 
     [clj-kafka.core])) 

(brokers {"zookeeper.connect" "localhost:2181"}) 

(def p (producer {"metadata.broker.list" "localhost:9092" 
        "serializer.class" "kafka.serializer.DefaultEncoder" 
        "partitioner.class" "kafka.producer.DefaultPartitioner"})) 

(send-message p (message "test" (.getBytes "this is my message"))) 

(def config {"zookeeper.connect" "localhost:2181" 
      "group.id" "clj-kafka.consumer" 
      "auto.offset.reset" "smallest" 
      "auto.commit.enable" "false"}) 

(with-resource [c (consumer config)] 
    shutdown 
    (take 2 (messages c "test"))) ;; return() 

私は設定/ zookepper.properties

bin/zookeeper-server-start.sh config/zookeeper.properties 
bin/kafka-server-start.sh config/server.properties 

でzookepper-serverとカフカ自身を起動します。

dataDir=/tmp/zookeeper 
clientPort=2181 
maxClientCnxns=0 

とのconfig/server.properties :

broker.id=0 
listeners=PLAINTEXT://:9092 
num.network.threads=3 
num.io.threads=8 
socket.send.buffer.bytes=102400 
zocket.receive.buffer.bytes=102400 
socket.request.max.bytes=104857600 
log.dirs=/tmp/kafka-logs 
num.partitions=1 
num.recovery.threads.per.data.dir=1 
log.retention.hours=168 
log.segment.bytes=1073741824 
log.retention.check.interval.ms=300000 
zookeeper.connect=localhost:2181 
zookeeper.connection.timeout.ms=6000 

私は実行時に '問題は' です:

(with-resource [c (consumer config)] 
    shutdown 
    (take 2 (messages c "test"))) ;; return empty() 

ここに任意のアイデア?事前

答えて

1

おかげで、このgithubのissueを参照してください。ドキュメントが素晴らしいとは思わない。あなたはdoallでシーケンス(これは怠惰です)の実現を強制しなければなりません。これを試してください:

(with-resource [c (consumer config)] 
    shutdown 
    (doall (take 2 (messages c "test")))) 
+0

こんにちは@leeorあなたは正しいです。私はここでこの詳細を逃した。そして、はい、ドキュメントはこの時点で失敗します。お返事ありがとうございます。 – elf

+0

こんにちは、 トピックに関するすべてのメッセージを取りたいのですか?それは怠惰なシーケンスです、それは可能ですか? ''(doseq [msg doall(messages(consumer config) "logs" ) '' – elf

関連する問題