2016-11-14 15 views
2

カフカソースから読み込もうとしています。私は構造化スパークストリーミングを行うために受信したメッセージからタイムスタンプを抽出したい。 カフカ(バージョン0.10.0.0) スパークストリーミング(バージョン2.0.1)スパークストリーミングのカフカメッセージからタイムスタンプを抽出しますか?

+0

現在のコードのスニペットを表示できますか? – vanekjar

+0

@vanekjarヴァルのDS1 =スパーク .readStream .format( "カフカ") .OPTION( "kafka.bootstrap.servers"、 "ローカルホスト:9092") .OPTION( "購読"、 "TOPICA") 。負荷() – shivali

答えて

0

私はカップルの事をお勧めしたい:

  1. あなたは、例えば、最新のKafka Streaming Api (0.10 Kafka)

    経由でストリームを作成するとあなたは、ストリームを作成したより上記のドキュメントによると、"org.apache.spark" %% "spark-streaming-kafka-0-10" % 2.0.1

    :あなたは依存関係を使用

    val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "broker1:9092,broker2:9092", 
        "key.deserializer" -> classOf[StringDeserializer], 
        "value.deserializer" -> classOf[ByteArrayDeserializer], 
        "group.id" -> "spark-streaming-test", 
        "auto.offset.reset" -> "latest", 
        "enable.auto.commit" -> (false: java.lang.Boolean)) 
    
    val sparkConf = new SparkConf() 
    // suppose you have 60 second window 
    val ssc = new StreamingContext(sparkConf, Seconds(60)) 
    ssc.checkpoint("checkpoint") 
    
    val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, 
    Subscribe[String, Array[Byte]](topics, kafkaParams)) 
    
  2. はあなたのストリームがConsumerRecord[String,Array[Byte]]のDSTREAMとなり、あなたにシンプルにタイムスタンプと、キーと値を取得することができます:

    stream.map { record => (record.timestamp(), record.key(), record.value()) } 
    

お役に立てば幸いです。

0
spark.read 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "your.server.com:9092") 
    .option("subscribe", "your-topic") 
    .load() 
    .select($"timestamp", $"value") 

フィールド「タイムスタンプ」は、あなたが探しているものです。タイプ - java.sql.Timestamp。 0.10 Kafkaサーバーに接続していることを確認してください。以前のバージョンにはタイムスタンプはありません。 ここに記載されているフィールドの完全なリスト - http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries

関連する問題