2017-09-07 18 views
1

Spark Structured StreamingでCassandraのような外部店舗からKafkaとクエリを読み取るにはどうすればいいですか?Spark Structured StreamingのCassandraのような外部店舗からKafkaとクエリを読み取る方法は?

私はKafkaからのメッセージのストリームを取得しています。私はそれにマップ操作を適用したいと思います。それぞれのキーについて、Cassandraのようなデータストアにクエリを行い、そのキーの詳細情報を取得し、 Spark Structured Streaming 2.2.0を使って、どうすればいいですか?

答えて

1

カフカ構造化ストリームは、静的データフレームと結合できます。あなたがデータフレームのスキーマを提供する必要が詠唱中にデータフレームの構造は、複雑な場合、ストリームとしてカフカから

val spark = SparkSession 
       .builder 
       .appName("kafka-reading") 
       .getOrCreate() 

    val df = spark 
      .readStream 
      .format("kafka") 
      .option("kafka.bootstrap.servers", "localhost:9092") 
      .option("startingOffsets", "latest") 
      .option("subscribe", topicName) 
      .load() 
      .selectExpr("CAST (key AS STRING)", "CAST (value AS STRING)").as[(String, String)] 

を読むには

val staticDf = spark.read. ... // read from Cassandra 
val streamingDf = spark.readStream. ... // read from stream 


// example of join to get information from both Cassandra and stream 
streamingDf.join(staticDf, "type")   // inner equi-join with a static DF 
streamingDf.join(staticDf, "type", "right_join") // right outer join with a static DF 
0

documentationあたりとして、あなたはこれを行うことができます。

操作を実行するには、まず特定の時間、つまり10秒間のデータを蓄積するためにウォーターマークを使用する必要があります。透かしを入れた後、groupByを適用して集約できます。キーと値をリストとして収集します。次に鍵のリストをトラバースすることにより、キー を使用してcassandraからデータをフェッチすることができます。ウォーターマーキングを行い、集約を適用する方法の詳細は、 参照可能Structured Streaming

関連する問題