2016-07-05 13 views
1

私はZookeeperで書かれた設定情報を持っています。私はApache Curatorを使用して設定を読み込みます(それを読むためのより良いソリューションがあれば、それをCurve Watcherと一緒に使えます)。Zookeeperで設定が変更されたら、新しいものを受け取ります。私はSparkでこの設定を使用しています。同じアプリケーションのすべてのエグゼキュータをどのように共有することができますか?複数のSpark ExecutorでZookeeper設定を共有する

ありがとうございました!

LE:

はあなたがウォッチャーの実装を行うだろうDikei、次のコードで

を、ありがとう!私はスパークするために新しいですし、私は確かに各作業員に何が行くのか分からない。

ありがとうございました!私はこのような場合にはどうなるのか

final JavaDStream<ElementMessage> nodeMessageStream = mapWithStateDistinctAndFiltered.flatMap(pair -> pair._2.buildElementMessages()) 
      .filter(f -> f != null); 

    nodeMessageStream.foreachRDD(rdd -> { 
     rdd.foreachPartition(r -> { 
      final ElementRecordRestClient rest = new ElementRecordRestClient(
        startProps.getProperty(InputPropertyKey.WEPAPP_URL.toString())); 
      r.forEachRemaining(message -> { 
       rest.createObject(message.toElementRecord()); 
      }); 
     }); 
    }); 
+0

ありがとうございます。私は最初の投稿を編集しました。 – Vlad

答えて

0

は、マスターノードでキュレーターウォッチャーを実行し、スパークの放送変数を使用して、すべてのエグゼキュータに設定をブロードキャストすることです。設定が変更されると、現在のストリーミングコンテキストを停止し、新しい設定で新しいストリーミングコンテキストを開始します。これにより、結果が常に一貫していることが保証されます。

もう1つの方法は、foreachPartitionラムダ機能の中で飼いょう動物園の設定を読むことです。しかし、構成は各パーティションで独立して読み込まれるため、同じRDDの異なるパーティションで異なる構成が得られる可能性があります。

+0

興味深い解決策、最初のものがストリームの停止と開始が私を悩ます。これは正確にどういう意味ですか、ストリームを停止して開始するにはどうすればよいですか(手動以外)?ありがとうございました! – Vlad

+0

処理を停止するには、現在のコンテキストオブジェクトで 'stop'を呼び出すことによって手動で行う必要があります。次に、新しいストリーミングコンテキストを作成し、 'start'を呼び出して起動します。入力ソースが永続キューのように機能する場合、新しいストリーミングコンテキストが開始されると、以前のコンテキストが停止した場所から再開します。 – Dikei

+0

ありがとうございます。 – Vlad

関連する問題