KafkaUtils.createDirectStream
の使用方法は、Pysparkの特定のTopic
のオフセットを使用しますか?PySparkでオフセット付きのInputDStreamを作成するにはどうすればいいですか?(KafkaUtils.createDirectStreamを使用して)
7
A
答えて
7
カフカトピックのレコードからRDDを作成する場合は、タプルの静的なセットを使用します。その後、カフカブローカーの辞書を全て輸入
from pyspark.streaming.kafka import KafkaUtils, OffsetRange
利用可能
メイクを作成
kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
次に、あなたのオフセットが最後にあなたがRDDを作成
start = 0
until = 10
partition = 0
topic = 'topic'
offset = OffsetRange(topic,partition,start,until)
offsets = [offset]
オブジェクト作成します。
あなたが次のことを行う必要があるオフセットでストリームを作成するにはkafkaRDD = KafkaUtils.createRDD(sc, kafkaParams,offsets)
:
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
from pyspark.streaming import StreamingContext
次に、あなたのsparkcontext
ssc = StreamingContext(sc, 1)
を使用してsparkstreamingコンテキストを作成する次の私たちはすべてを設定パラメータ
kafkaParams = {"metadata.broker.list": "host1:9092,host2:9092,host3:9092"}
start = 0
partition = 0
topic = 'topic'
その後、我々は最後に、我々はストリームあなたが行うことができます
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic],kafkaParams,
fromOffsets=fromOffset)
1
作成
topicPartion = TopicAndPartition(topic,partition)
fromOffset = {topicPartion: long(start)}
//notice that we must cast the int to long
当社fromOffset辞書を作成します。
from pyspark.streaming.kafka import TopicAndPartition
topic = "test"
brokers = "localhost:9092"
partition = 0
start = 0
topicpartion = TopicAndPartition(topic, partition)
fromoffset = {topicpartion: int(start)}
kafkaDStream = KafkaUtils.createDirectStream(spark_streaming,[topic], \
{"metadata.broker.list": brokers}, fromOffsets = fromoffset)
注:スパーク2.2.0、Pythonの3.6
関連する問題
- 1. zeppelinでpysparkを使用するにはどうすればいいですか?
- 2. アップロードキーを使用して署名付きapkを作成するにはどうすればよいですか?
- 3. Kafka Direct InputDstreamとステートフルストリーム変換を使用しているときのチェックポイントの再認識を理解するにはどうすればよいですか?
- 4. Firebaseサーバのタイムスタンプを使用して日付を生成するにはどうすればいいですか?
- 5. このプロジェクトでjqueryを使用してタブシステムを作成するにはどうすればいいですか?
- 6. ユニティを使用してモバイル用の2Dサイドスクロールゲームを作成するにはどうすればいいですか
- 7. 複数のプロジェクトに使用できるギャラリーページを作成するにはどうすればいいですか?
- 8. pysparkでdf.write.csvを使用してcsvファイルに追加するにはどうすればよいですか?
- 9. DataFrameを使用してJinjaを使用して条件付きでドキュメントを作成するにはどうすればよいですか?
- 10. Pythonクライアントを使用してVoltDBで関係を作成するにはどうすればいいですか?
- 11. groovy jsonbuilderを.eachで使用して配列を作成するにはどうすればいいですか?
- 12. api-modeでレール5を使用してカスタムミドルウェアを作成するにはどうすればいいですか?
- 13. xamarin.formsを使用してポップアップでフォームを作成するにはどうすればいいですか?
- 14. jsでオプションを使用してモーダルウィンドウを作成するにはどうすればいいですか?
- 15. wpfでdatavalidationを使用してカスタム動作を作成するにはどうすればよいですか?
- 16. selenium node.jsを使用してラベル付きのチェックボックスをクリックするにはどうすればいいですか?
- 17. 作成しているフレームワークでFirebaseを使用するにはどうすればよいですか?
- 18. オフセットを使用してPolyCollectionを正しく配置するにはどうすればよいですか?
- 19. cssを使用して縦書きテキストを作成するにはどうすればよいですか?
- 20. スムーススクロールにJavascriptを使用してオフセット位置を調整するにはどうすればよいですか?
- 21. Objective-Cを使用して新しいNSWindowを作成するにはどうすればよいですか?
- 22. WinAPIを使用して「はい」ボタンを作成するにはどうすればよいですか?
- 23. AspectJ - ajcを使用して2つのクラスファイルを作成するにはどうすればいいですか?
- 24. 既存のHashSetを使用してTreeSetを作成するにはどうすればいいですか?
- 25. パンダを使用してハイパーリンクの1列を作成するにはどうすればいいですか?
- 26. - コードのみを使用してウィンドウを作成するにはどうすればいいですか?
- 27. ログフォーマットを使用して独自のコンソールログを作成するにはどうすればいいですか?
- 28. Log4jの設定を使用してGuiceログを作成するにはどうすればいいですか
- 29. iOSでカスタムボタンを作成して何度も使用するにはどうすればいいですか?
- 30. javascript変数を作成してjQueryで使用するにはどうすればいいですか
を私はエラーが発生します "TypeError:unhashable型: 'TopicAndPartition'" – pangpang
これはKのために古いですafka 0.8とSpark 2.0+ :( – rjurney