2016-09-20 13 views
2

spark-kafkaコンシューマのトピックリストを動的に更新できますか?spark kafkaコンシューマのトピックリストを動的に更新

私はspark-kafkaコンシューマを使用するSparkストリーミングアプリケーションを使用しています。 私はspark-kakfaの消費者がトピックを聞いていると言って、["test"]しばらくしてトピックリストが["test"、 "testNew"]に更新されました。今火花カフカ消費者のトピックのリストを更新し、sparkStreamingアプリケーションまたはsparkStreamingコンテキスト

答えて

1

を停止することなく、話題の最新リストについては、データを消費するために火花カフカの消費者を依頼するが方法は、それを動的にスパークのトピックのリストを更新することが可能です-kafka consumer

いいえ受信機と受信機の両方のアプローチは、KafkaUtilsを使用してカフカストリームを初期化すると固定されます。 DAGが修正されたときに新しいトピックを渡す方法はありません。

動的に読みたい場合は、反復的にスケジュールされたバッチk ジョブを考えてみて、トピックを動的に読み取り、そこからRDDを作成することができます。

さらに、Akka Streamsのような消費に対して柔軟性を提供するテクノロジを使用する方法もあります。

0

Yuvalが言ったように、あなたはカフカからあなたが扱っているデータの構造/形式が何であるか知っていれば回避することができます。例えば

  • ストリーミングアプリケーションは、[「テスト」、「testNew」]トピックを聴いている場合は
  • Downlラインあなたのように、[TEST4]という名前の新しいトピックを追加します回避するには、それに含まれている固有のキーを追加して既存のトピックに渡すだけです。
  • あなたは、任意のデータ構造を使用してキャッシュを定義
    1.スレッドベースのアプローチを使用することができますが、そのTEST2データに
0

を追加したキーに基づいてデータをフィルタリング/認識するようにあなたのストリーミングアプリケーションを設計トピックのリストが含まれています
2.このキャッシュに要素を追加する方法
3. Bはすべてのスパーク関連ロジックを持つクラスAとBでなければなりません
4クラスAは長時間実行中のジョブであり、Aから呼び出していますB、新しいトピックがあるときはいつでもB

01で新しいスレッドを生成する
+0

私は現在、同様のアプローチを使用していますが、これは優雅に私がトピックのリストを更新する必要があるたびに、コンテキストのストリーミングを停止する必要があるような多くの合併症を持っています。これは非同期プロセスであり、停止するのにかかる時間の点では予測できません。私はデータのストリームを処理することができませんが、ストリーミングコンテキストは計算を停止、開始、再開する必要があります。 –

+0

@ rohith-yeravothulaあなたは他の解決策を見つけましたか、私は俳優システムでAkkaストリームを使うことしか考えられません。私はSubscribePatternを試しましたが、それはDAGの間にトピックを追加しないようにスタートアップ中のフィルタトピックの種類だけであり、ストリームはスケジュールされています。 – ASe

-1

Spark-Kafka integration (0.10) APIバージョンのConsumerStrategies.SubscribePatternを試してみることをおすすめします。以下のようになります

KafkaUtils.createDirectStream(
mySparkStreamingContext, 
PreferConsistent, 
SubscribePattern("test.*".r.pattern, myKafkaParamsMap)) 
+0

私は同じことを試みましたが、トピックを動的に選択しないということは、ストリームを開始するときに、正規表現を使用してすべてのトピック(フィルタのようなもの)を一致させ、ストリームを作成することを意味します。私たちは、ストリームが既に稼動している間に新しいトピックを動的に追加できる解決策を探しています。クラスタを起動する方法がジョブを再スケジューリングしてDAGをストリームできるため、不可能なように見えます。 – ASe

関連する問題