私は、カフカ(createDstream
を使用)から消費しているストリーミングジョブを持っています。 「ID」Sparkストリーミングジョブ内でユーティリティ(外部)を呼び出す
[id1,id2,id3 ..]
のその流れIは、各ID
[id:t1,id2:t2,id3:t3...]
のための「T」を言うIDの配列を受け取り、いくつかの外部コールを行い、いくつかの情報をバック受信ユーティリティまたはAPIを持っています
DStream
を保持し、Dstreamを保持するユーティリティを呼び出したいとします。 Dstream rddでマップ変換を使用することはできません。これは、各IDの呼び出しを行います。また、ユーティリティはIDのコレクションを受け入れます。
Dstream.map(x=> myutility(x)) -- ruled out
そして、私は
Dstream.foreachrdd(rdd=> myutility(rdd.collect.toarray))
を使用する場合、私はDStream
を失います。私は下流処理のためにDStream
を保持する必要があります。
「myutility」を再設計すると、正しく動作するようになりますか?スパークでの単一のローカルコレクションを持つことはできません。 – user7337271
@ user7337271は、以下のDstream.foreachrdd(rdd => myutility(rdd.collect.toarray))によって達成されますが、DStreamは失われます。 –
ここでは並列性はありません。全体 'foreachrdd(rdd => myutility(rdd.collect.toarray))'はドライバ上でローカルに実行されます。あなたは 'rdd => sc.parallelize(myutility(rdd.collect.toarray))')を変換することはできますが、この問題を解決することはできません。 – user7337271