スパークストリーミングジョブが常に実行され、ストリーム内のワード数がカウントされ、特定のボキャブラリ内のワードのみがカウントされて返されます。スパークストリーミングジョブのパフォーマンス向上
しかし、この語彙は固定されておらず、その代わりに語彙はredis
に格納されており、時間とともに変化する可能性があります。ここでは、このジョブの素朴な実装です:filter(check_if_in_vocabulary)
変換がそれはあまりにも時間がかかるだろう、ストリーム内の各要素についてのRedisから語彙を引くため
sc = SparkContext(appName="WordCount")
ssc = StreamingContext(sc, 10) # batch interval is 10s
def check_if_in_vocab(word):
vocab = redis_client.smembers() # get all vocabulary from redis
return word in vocab
lines = ssc.socketTextStream(host_ip, port) # read data stream from the socket
words = lines.flatMap(lambda line: line.split(" "))\
.filter(check_if_in_vocab)\ # ANY BETTER SOLUTION HERE???
.map(lambda word: (word, 1)) # create (word, count) pair
counts = words.reduceByKey(lambda x,y: x+y)
counts.pprint()
私の実装では、私が思うに、パフォーマンスの低下があります。
もっと良い解決策はありますか?
フォロー
OK語彙はいつでも変更される可能性があるので、私は非常に頻繁にredis
をチェックする必要があるため、上記の問題に、今、それは単純になり、語彙だけで、すべて60秒または1時間に変更するとし上記のコードを改善するには?
はい、実際に私はpythonでもmapPartitionsを使うことができますが、私はmapPartitionsのドキュメントで読んでいるように、データ準備コードを実行します(これは私の中でredisからボキャブラリを引き下げることです大文字と小文字の区別は一度だけですが、私の状況は語彙が時間の経過とともに変化することがあるため、各パーティションのために、私は時間の経過とともに語彙を更新できる必要があります。私は正しい? – avocado