2016-11-24 8 views
0

Sparkアプリケーションで、ワーカーコードで使用される変数を定期的に更新する必要があるという要件があります。Sparkワーカーで定期的にオブジェクトを更新するには?

より具体的に理解するために、私は等しい分割で私のデータをキネシスの断片にする必要があります。キネシスの破片の数が増加または減少するいずれかの時間を、ので、私のパーティションキーは

System.currentTimeMillis % shardSize

ですが、私はいつもこれを照会したいので、java.utilのような何かいけないので、私はいつも破片の正しい数を取得する必要がありますすることができます.TimerTaskが役立ちます。これは私が労働者にブロードキャストすることができます。

これは定期的に実行し、その値をスレーブにブロードキャストするために必要なコードです。

def fetchNumberOfShards(): Integer = { 
    val describeStreamRequest = new DescribeStreamRequest() 
    describeStreamRequest.setStreamName(streamName) 
    val describeStreamResult = kinesisClient.describeStream(describeStreamRequest) 
    val shards = describeStreamResult.getStreamDescription().getShards() 
    return shards.size() 
} 

答えて

0

私は、均等な分布を持つランダムなパーティションキーを使用する必要があると思います。たとえば、System.currentTimeMillisです。実際に使用するシャードを指定する必要はないので、シャードの数を追跡する必要はありません。あなたはそれで運を持っていなかったことを考えると

EDIT

、放送変数はあなたのためにこれを解決します:

放送変数は、プログラマが各マシン上で読み取り専用の変数キャッシュしておくことを可能

// executes on driver 
scala> var broadcastVar = sc.broadcast(numShards) 
broadcastVar: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(0) 

// retrieve value on worker 
scala> broadcastVar.value 
res0: Int = 5 

http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

ドライバーは、必要に応じてbroadcastVarを定期的に更新できます。ワーカーはbroadcastVar.valueしか読むことができません。 broadcastVar.valueを使って、あなたのシャードハッシュ関数にプラグインすることができます。

+0

私はそれを試しましたが、どういうわけかKinesisはパーティションキーを一様に配布しません。特に再払いが発生したとき。それはなぜこのコードです。 – cmbendre

+0

ブロードキャスト変数を含めるように答えを更新しました。これにより解決されます – ImDarrenG

関連する問題