0

次は私のpythonがkafkaプロデューサをコーディングしているのですが、私はKafka Brokerにメッセージを公開できるかどうかはわかりません。消費者側は何のメッセージも受け取っていないからです。私のコンシューマーPythonプログラムは、プロデューサーコンソールコマンドを使ってテストしていますが、うまく動作しています。Kafka Consumerはプロデューサーから何もメッセージを受け取っていません

from __future__ import print_function 

import sys 
from pyspark import SparkContext 
from kafka import KafkaClient, SimpleProducer 

if __name__ == "__main__": 

if len(sys.argv) != 2: 
    print("Usage:spark-submit producer1.py <input file>", file=sys.stderr) 
    exit(-1) 

sc = SparkContext(appName="PythonRegression") 

def sendkafka(messages): 
    ## Set broker port 
    kafka = KafkaClient("localhost:9092") 
    producer = SimpleProducer(kafka, async=True, batch_send_every_n=5, 
batch_send_every_t=10) 
    send_counts = 0 
    for message in messages: 
     try: 
      print(message) 
      ## Set topic name and push messages to the Kafka Broker 
      yield producer.send_messages('test', message.encode('utf-8')) 
     except Exception, e: 
      print("Error: %s" % str(e)) 
     else: 
      send_counts += 1 
    print("The count of prediction results which were sent IN THIS PARTITION 
is %d.\n" % send_counts) 

## Connect and read the file.  
rawData = sc.textFile(sys.argv[1]) 

## Find and skip the first row 
dataHeader = rawData.first() 
data = rawData.filter(lambda x: x != dataHeader) 

## Collect the RDDs. 
sentRDD = data.mapPartitions(sendkafka) 
sentRDD.collect() 

## Stop file connection 
sc.stop() 

これは、私は通常、あなたが作成しようとしたトピックから消費するカフカ・コンソール・消費者(Apacheのカフカの一部)を使用して、このような問題をデバッグ

from __future__ import print_function 
import sys 
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kafka import KafkaUtils 

if len(sys.argv) < 3: 
print ("Program to pulls the messages from kafka brokers.") 
print("Usage: consume.py <zk> <topic>", file=sys.stderr) 

else: 
## Flow 
## Loads settings from system properties, for launching of spark-submit. 
sc = SparkContext(appName="PythonStreamingKafkaWordCount") 

## Create a StreamingContext using an existing SparkContext. 
ssc = StreamingContext(sc, 10) 

## Get everything after the python script name 
zkQuorum, topic = sys.argv[1:] 

## Create an input stream that pulls messages from Kafka Brokers. 
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", 
{topic: 1}) 

## 
lines = kvs.map(lambda x: x[1]) 

## Print the messages pulled from Kakfa Brokers 
lines.pprint() 

## Save the pulled messages as file 
## lines.saveAsTextFiles("OutputA") 

## Start receiving data and processing it 
ssc.start() 

## Allows the current process to wait for the termination of the context 
ssc.awaitTermination() 

答えて

0

をコーディング私の「消費者」のpythonです。コンソールコンシューマーがメッセージを受け取った場合、彼らはKafkaに到着したことを知っています。

プロデューサを最初に実行して終了させて​​からコンシューマを起動すると、コンシューマがログの最後から開始し、追加のメッセージを待っている可能性があります。最初にコンシューマーを起動していることを確認するか、最初から自動的に開始するように設定してください(残念ながら、Pythonクライアントでこれを行う方法はわかりません)。彼らはプロデュース要求に増加している場合は、トピックのメッセージの数を確認することができます

+0

自分のノートパソコンと一緒に試してみる前にそれを試してみた しかし、別のサーバーで試してみた とにかく、ありがとう –

0

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell \ 
--broker-list <Kafka_broker_hostname>:<broker_port> --topic Que1 \ 
--time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}' 

メッセージの数が増加している場合、それはプロデューサーが正常に動作していることを意味します。

0

申し訳ありませんが、私は完全に動作する別のサーバーでそれをテストするので、私のローカル飼い犬飼い機かカフカに何か問題があると思います。しかし、私に返信してくれた人のおかげで);

関連する問題