2015-10-21 8 views

答えて

7

カフカトピックのレコードからRDDを作成する場合は、タプルの静的なセットを使用します。その後、カフカブローカーの辞書を全て輸入

from pyspark.streaming.kafka import KafkaUtils, OffsetRange 

利用可能

メイクを作成

kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"} 

次に、あなたのオフセットが最後にあなたがRDDを作成

start = 0 
until = 10 
partition = 0 
topic = 'topic'  
offset = OffsetRange(topic,partition,start,until) 
offsets = [offset] 

オブジェクト作成します。

あなたが次のことを行う必要があるオフセットでストリームを作成するには
kafkaRDD = KafkaUtils.createRDD(sc, kafkaParams,offsets) 

from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition 
from pyspark.streaming import StreamingContext 

次に、あなたのsparkcontext

ssc = StreamingContext(sc, 1) 

を使用してsparkstreamingコンテキストを作成する次の私たちはすべてを設定パラメータ

kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"} 
start = 0 
partition = 0 
topic = 'topic'  

その後、我々は最後に、我々はストリームあなたが行うことができます

directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams, 
fromOffsets=fromOffset) 
+0

を私はエラーが発生します "TypeError:unhashable型: 'TopicAndPartition'" – pangpang

+1

これはKのために古いですafka 0.8とSpark 2.0+ :( – rjurney

1

作成

topicPartion = TopicAndPartition(topic,partition) 
fromOffset = {topicPartion: long(start)} 
//notice that we must cast the int to long 

当社fromOffset辞書を作成します。

from pyspark.streaming.kafka import TopicAndPartition 
topic = "test" 
brokers = "localhost:9092" 
partition = 0 
start = 0 
topicpartion = TopicAndPartition(topic, partition) 
fromoffset = {topicpartion: int(start)} 
kafkaDStream = KafkaUtils.createDirectStream(spark_streaming,[topic], \ 
     {"metadata.broker.list": brokers}, fromOffsets = fromoffset) 

注:スパーク2.2.0、Pythonの3.6

関連する問題