0

私は、クエリ2キネシスStreamsを使用して1 x Spark Streaming Appで2つのキネシスストリームからデータを解析する方法は?

select a.user_id , b.domain from realTimeTable_1 as a join realTimeTable_2 as b on a.device_id = b.device_id

をやろうとしています。しかし、Stream2には出力がありません。誰かがhbaseやparquetに2つのストリームデータを同時に結合する方法を知っていますか?理論的には私が実行できるようにする必要があり、それらので

val numShards_s1 = kinesisClient.describeStream("stream1").getStreamDescription().getShards().size 
val numShards_s2 = kinesisClient.describeStream("stream2").getStreamDescription().getShards().size 
val numStreams_s1 = numShards_s1 
val numStreams_s2 = numShards_s2 
val batchInterval = Seconds(5) 
val kinesisClient = new AmazonKinesisClient(credentials)kinesisClient.setEndpoint("https://kinesis.us-east-1.amazonaws.com") 
val kinesisCheckpointInterval = batchInterva 
val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName() 
val ssc = new StreamingContext(sc, batchInterval) 
val kinesisStreams_s1 = (0 until numStreams_s1).map { i => 
    KinesisUtils.createStream(ssc, "stream-demo", "stream1", endpointUrl, regionName,InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2) 
} 
    val kinesisStreams_s2 = (0 until numShards_s2).map { i => 
    KinesisUtils.createStream(ssc, "stream-demo", "stream2", endpointUrl, regionName,InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2) 
} 
val unionStreams_s1 = ssc.union(kinesisStreams_s1) 
val unionStreams_s2 = ssc.union(kinesisStreams_s2) 
val schemaString_s1 = "user_id,device_id,action,timestamp 
val schemaString_s2= "device_id,domain,timestamp 
val tableSchema_s1 = StructType(schemaString_s1.split(",").map(fieldName => StructField(fieldName, StringType, true))) 
val tableSchema_s2 = StructType(schemaString_s2.split(",").map(fieldName => StructField(fieldName, StringType, true))) 

unionStreams_s1.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => { 
    val rowRDD = rdd.map(w => Row.fromSeq(new String(w).split(","))) 
    val output1 = sqlContext.createDataFrame(rowRDD,tableSchema_s1) 
    output1.createOrReplaceTempView("realTimeTable_1")}) 

unionStreams_s2.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => { 
    val rowRDD = rdd.map(w => Row.fromSeq(new String(w).split(","))) 
    val output2 = sqlContext.createDataFrame(rowRDD,tableSchema_s2) 
    output1.createOrReplaceTempView("realTimeTable_2")}) 

:ここに私のコードが、私は両方のストリームを処理するためにSparkConf().set("spark.streaming.concurrentJobs", "2")を設定

select a.user_id , b.domain from realTimeTable_1 as a join realTimeTable_2 as b on a.device_id = b.device_id 

は、しかし、でも私が思うに、任意の出力を生成していないselect * from realTimeTable_2をやって私のコードは何かが欠けている、誰もが不足しているロジックを見分けることができる?

+0

私はどちらかのドキュメントをストリーミングスパークごとにJVMに2つのコンテキストを使用することはできません。一つだけStreamingContextが同時にJVMでアクティブにすることができます。 –

答えて

0

Splice Machineでは、デュアルストリームは1つのストリームのみを試したことがなく、SQLを介して永続データに結合されました。

ストリームの開始が見えませんか?ここにあなたのコードに非常によく似ているコードがあります。

SparkのマスターブランチでKinesisWordCountASL.scalaをチェックしてください。

ここには短期間のリンクがあります。

https://github.com/apache/spark/blob/master/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala

+0

Johnさん、ありがとう、2つの異なる入力スキーマを解析しましたか?私は添付のコードを見ましたが、残念ながら2つのストリームを別々のシャードとして扱います。私の問題は、複雑なイベント処理フレームワークを有効にして、2つの異なるsch emasを持つ2つのライブデータセットを同時に分析できるようにすることです。 –

+0

はssc.start()としてストリーミングを開始しました。 –

関連する問題