カフカソースから読み込もうとしています。私は構造化スパークストリーミングを行うために受信したメッセージからタイムスタンプを抽出したい。 カフカ(バージョン0.10.0.0) スパークストリーミング(バージョン2.0.1)スパークストリーミングのカフカメッセージからタイムスタンプを抽出しますか?
2
A
答えて
0
私はカップルの事をお勧めしたい:
あなたは、例えば、最新のKafka Streaming Api (0.10 Kafka)
経由でストリームを作成するとあなたは、ストリームを作成したより上記のドキュメントによると、
"org.apache.spark" %% "spark-streaming-kafka-0-10" % 2.0.1
:あなたは依存関係を使用
val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "broker1:9092,broker2:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[ByteArrayDeserializer], "group.id" -> "spark-streaming-test", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean)) val sparkConf = new SparkConf() // suppose you have 60 second window val ssc = new StreamingContext(sparkConf, Seconds(60)) ssc.checkpoint("checkpoint") val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, Array[Byte]](topics, kafkaParams))
はあなたのストリームがConsumerRecord[String,Array[Byte]]のDSTREAMとなり、あなたにシンプルにタイムスタンプと、キーと値を取得することができます:
stream.map { record => (record.timestamp(), record.key(), record.value()) }
お役に立てば幸いです。
0
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "your.server.com:9092")
.option("subscribe", "your-topic")
.load()
.select($"timestamp", $"value")
フィールド「タイムスタンプ」は、あなたが探しているものです。タイプ - java.sql.Timestamp。 0.10 Kafkaサーバーに接続していることを確認してください。以前のバージョンにはタイムスタンプはありません。 ここに記載されているフィールドの完全なリスト - http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries
関連する問題
- 1. ビデオクリップからタイムスタンプを抽出する
- 2. postgresqlのDBテーブルからEPOCH cloumnのタイムスタンプを抽出します
- 3. タイムスタンプからのSQL抽出時間
- 4. ライトニングからのタイムスタンプ抽出.sqllight
- 5. 一意のチェックでタイムスタンプから月と年を抽出しますか?
- 6. JSONArrayから各JSONオブジェクトを抽出し、スパークストリーミングでcassandraに保存する方法
- 7. パンダのデータフレームのタイムスタンプから月を抽出する方法は?
- 8. standardSQL:タイムスタンプからDAYを抽出できません
- 9. postgresのmongodb objectidからタイムスタンプを抽出する方法
- 10. ブタのタイムスタンプから日付を抽出する
- 11. r内のtsオブジェクトからタイムスタンプを抽出する方法
- 12. Javaのタイムスタンプから要素を抽出する
- 13. タイムゾーン付きのタイムスタンプから現地時間を抽出します
- 14. Bigqueryのタイムスタンプから日付を抽出する:好ましい方法
- 15. タイムゾーンでタイムスタンプから日付を抽出する方法
- 16. タイムスタンプから時間部分を抽出する
- 17. BigQuery Standart SQL:タイムスタンプから曜日名を抽出
- 18. BigQueryでタイムスタンプから日付を抽出できない
- 19. タイムスタンプからの抽象的な日数
- 20. ログファイルから値を抽出します
- 21. HTTPからデータを抽出します。
- 22. アプリケーションサンドボックスからデータベースを抽出します
- 23. mhtからテキストを抽出します
- 24. HttpResponseMessageからコンテンツを抽出します
- 25. テキストからJSONを抽出します。
- 26. USBカメラで取得した各フレームからタイムスタンプを抽出する方法は?
- 27. HTMLから抽出して特定の行を抽出する
- 28. Bouncy Castleのタイムスタンプ応答から証明書を抽出する方法
- 29. スパークストリーミングKafkaからコンソール上に出力を生成しない
- 30. C#の特定の列からテキストを抽出しますか?
現在のコードのスニペットを表示できますか? – vanekjar
@vanekjarヴァルのDS1 =スパーク .readStream .format( "カフカ") .OPTION( "kafka.bootstrap.servers"、 "ローカルホスト:9092") .OPTION( "購読"、 "TOPICA") 。負荷() – shivali