2017-01-06 11 views
1

私は、カフカ(createDstreamを使用)から消費しているストリーミングジョブを持っています。 「ID」Sparkストリーミングジョブ内でユーティリティ(外部)を呼び出す

[id1,id2,id3 ..] 

のその流れIは、各ID

[id:t1,id2:t2,id3:t3...] 
のための「T」を言うIDの配列を受け取り、いくつかの外部コールを行い、いくつかの情報をバック受信ユーティリティまたはAPIを持っています

DStreamを保持し、Dstreamを保持するユーティリティを呼び出したいとします。 Dstream rddでマップ変換を使用することはできません。これは、各IDの呼び出しを行います。また、ユーティリティはIDのコレクションを受け入れます。

Dstream.map(x=> myutility(x)) -- ruled out 

そして、私は

Dstream.foreachrdd(rdd=> myutility(rdd.collect.toarray)) 

を使用する場合、私はDStreamを失います。私は下流処理のためにDStreamを保持する必要があります。

+0

「myutility」を再設計すると、正しく動作するようになりますか?スパークでの単一のローカルコレクションを持つことはできません。 – user7337271

+0

@ user7337271は、以下のDstream.foreachrdd(rdd => myutility(rdd.collect.toarray))によって達成されますが、DStreamは失われます。 –

+0

ここでは並列性はありません。全体 'foreachrdd(rdd => myutility(rdd.collect.toarray))'はドライバ上でローカルに実行されます。あなたは 'rdd => sc.parallelize(myutility(rdd.collect.toarray))')を変換することはできますが、この問題を解決することはできません。 – user7337271

答えて

3

外部バルクコールを実現するアプローチは、DStream内のRDDをパーティションレベルで直接変換することです。

パターンは次のようになります

val transformedStream = dstream.transform{rdd => 
    rdd.mapPartitions{iterator => 
     val externalService = Service.instance() // point to reserve local resources or make server connections. 
     val data = iterator.toList // to act in bulk. Need to tune partitioning to avoid huge data loads at this level 
     val resultCollection = externalService(data) 
     resultCollection.iterator 
    } 
} 

このアプローチプロセスクラスタ内の利用可能なリソースを使用して並列に下地RDDの各パーティション。外部システムへの接続は、パーティションごとに(各要素ではなく)インスタンス化する必要があることに注意してください。

+0

ありがとう、私が探していたもの –

関連する問題