2017-03-28 16 views
0

この質問は、AmazonDynamoDbClientのスロットルとリトライを管理するための私の他の質問になります。しかし、私はダイナモの呼び出しに到達する前に解決策が存在する可能性があると思います。スパークストリーミングを抑制する方法?

高度なプロセスは次のとおりです。Apache Sparkを使用して大規模なCSVファイルを読み込んで、いくつかの集計を実行してダイナモに書き込むためのscalaアプリケーションがあります。私はこれをEMRに展開してスケーラビリティを提供します。問題は、集計が完了すると、数百万のレコードが発電所に入る準備ができているが、我々はダイナモで書込み能力があるということである。彼らはすぐに挿入する必要はありませんが、ユースケースのために微調整できるように、1秒あたりの数を制御するのが良いでしょう。ここで

は、私がこれまで持っているもののコードサンプルです:

val foreach = new ForeachWriter[Row] { 
    override def process(value: Row): Unit = { 
     //write to dynamo here 
    } 

    override def close(errorOrNull: Throwable): Unit = { 
    } 

    override def open(partitionId: Long, version: Long): Boolean = { 
     true 
    } 
    } 

val query = dataGrouped 
    .writeStream 
    .queryName("DynamoOutput") 
    .format("console") 
    .foreach(foreach) 
    .outputMode(OutputMode.Complete()) 
    .start() 
    .awaitTermination() 

誰もがこの問題を解決するための方法を任意の勧告がありますか?

答えて

0

spark.streaming.backpressure.enabledの設定を調べてください。 documentation

最大受信レートの設定 - ストリーミングアプリケーションがデータを受信するのと同じ速さで処理するにはクラスタリソースが十分に大きくない場合、受信者は最大レート制限レコード/秒の点で。レシーバの設定パラメータspark.streaming.receiver.maxRateとDirect Kafkaアプローチのspark.streaming.kafka.maxRatePerPartitionを参照してください。 Spark Streamingはレート制限を自動的に把握し、処理条件が変更された場合に動的に調整するため、Spark 1.5では、背圧と呼ばれる機能を導入しました。このバックプレッシャは、設定パラメータspark.streaming.backpressure.enabledをtrueに設定することで有効にできます。

+0

私はこれを見ましたが、これは受信データのためのものであると仮定しました。さらに、ストリーミング集約が完了するまでは出力が開始されないので、これで終了するまでに時間がかかることになります。 –

+0

それはありますが、各作業者がデータを取得してからDynamoに書き込んでいると思います。書き込みにしばらく時間がかかる場合、バックプレッシャー設定は、作業者が圧倒されてボトルネックにならないようにするのに役立ちます。これを超えて、私はスパークストリーミングを「抑制」する方法を知らない。 – Vidya

+0

クール、心配する必要はありません、私はそれをショットし、報告を返すよ。ありがとうございました –

関連する問題