私は、クエリ2キネシスStreamsを使用して1 x Spark Streaming Appで2つのキネシスストリームからデータを解析する方法は?
select a.user_id , b.domain from realTimeTable_1 as a join realTimeTable_2 as b on a.device_id = b.device_id
をやろうとしています。しかし、Stream2には出力がありません。誰かがhbaseやparquetに2つのストリームデータを同時に結合する方法を知っていますか?理論的には私が実行できるようにする必要があり、それらので
val numShards_s1 = kinesisClient.describeStream("stream1").getStreamDescription().getShards().size
val numShards_s2 = kinesisClient.describeStream("stream2").getStreamDescription().getShards().size
val numStreams_s1 = numShards_s1
val numStreams_s2 = numShards_s2
val batchInterval = Seconds(5)
val kinesisClient = new AmazonKinesisClient(credentials)kinesisClient.setEndpoint("https://kinesis.us-east-1.amazonaws.com")
val kinesisCheckpointInterval = batchInterva
val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
val ssc = new StreamingContext(sc, batchInterval)
val kinesisStreams_s1 = (0 until numStreams_s1).map { i =>
KinesisUtils.createStream(ssc, "stream-demo", "stream1", endpointUrl, regionName,InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
}
val kinesisStreams_s2 = (0 until numShards_s2).map { i =>
KinesisUtils.createStream(ssc, "stream-demo", "stream2", endpointUrl, regionName,InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
}
val unionStreams_s1 = ssc.union(kinesisStreams_s1)
val unionStreams_s2 = ssc.union(kinesisStreams_s2)
val schemaString_s1 = "user_id,device_id,action,timestamp
val schemaString_s2= "device_id,domain,timestamp
val tableSchema_s1 = StructType(schemaString_s1.split(",").map(fieldName => StructField(fieldName, StringType, true)))
val tableSchema_s2 = StructType(schemaString_s2.split(",").map(fieldName => StructField(fieldName, StringType, true)))
unionStreams_s1.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => {
val rowRDD = rdd.map(w => Row.fromSeq(new String(w).split(",")))
val output1 = sqlContext.createDataFrame(rowRDD,tableSchema_s1)
output1.createOrReplaceTempView("realTimeTable_1")})
unionStreams_s2.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => {
val rowRDD = rdd.map(w => Row.fromSeq(new String(w).split(",")))
val output2 = sqlContext.createDataFrame(rowRDD,tableSchema_s2)
output1.createOrReplaceTempView("realTimeTable_2")})
:ここに私のコードが、私は両方のストリームを処理するためにSparkConf().set("spark.streaming.concurrentJobs", "2")
を設定
select a.user_id , b.domain from realTimeTable_1 as a join realTimeTable_2 as b on a.device_id = b.device_id
は、しかし、でも私が思うに、任意の出力を生成していないselect * from realTimeTable_2
をやって私のコードは何かが欠けている、誰もが不足しているロジックを見分けることができる?
私はどちらかのドキュメントをストリーミングスパークごとにJVMに2つのコンテキストを使用することはできません。一つだけStreamingContextが同時にJVMでアクティブにすることができます。 –