2017-04-03 23 views
1

スパークストリーミングジョブが常に実行され、ストリーム内のワード数がカウントされ、特定のボキャブラリ内のワードのみがカウントされて返されます。スパークストリーミングジョブのパフォーマンス向上

しかし、この語彙は固定されておらず、その代わりに語彙はredisに格納されており、時間とともに変化する可能性があります。ここでは、このジョブの素朴な実装です:filter(check_if_in_vocabulary)変換がそれはあまりにも時間がかかるだろう、ストリーム内の各要素についてのRedisから語彙を引くため

sc = SparkContext(appName="WordCount") 
ssc = StreamingContext(sc, 10) # batch interval is 10s 

def check_if_in_vocab(word): 
    vocab = redis_client.smembers() # get all vocabulary from redis 
    return word in vocab 

lines = ssc.socketTextStream(host_ip, port) # read data stream from the socket 
words = lines.flatMap(lambda line: line.split(" "))\ 
          .filter(check_if_in_vocab)\ # ANY BETTER SOLUTION HERE??? 
          .map(lambda word: (word, 1)) # create (word, count) pair 
counts = words.reduceByKey(lambda x,y: x+y) 
counts.pprint() 

私の実装では、私が思うに、パフォーマンスの低下があります。

もっと良い解決策はありますか?

フォロー

OK語彙はいつでも変更される可能性があるので、私は非常に頻繁にredisをチェックする必要があるため、上記の問題に、今、それは単純になり、語彙だけで、すべて60秒または1時間に変更するとし上記のコードを改善するには?

答えて

0

私はPythonでSparkを使ったことがありませんが、これがScalaの実装であれば、コールでRedisコールを作成することで改善を探します。 mapPartitionsの中で、私はクライアントとの接続を確立し、vocabを取り出し、メモリ内のvocabを使用してiterableをフィルタリングし、接続を閉じます。

おそらく、あなたはPython APIに類似したことをすることができます。

+0

はい、実際に私はpythonでもmapPartitionsを使うことができますが、私はmapPartitionsのドキュメントで読んでいるように、データ準備コードを実行します(これは私の中でredisからボキャブラリを引き下げることです大文字と小文字の区別は一度だけですが、私の状況は語彙が時間の経過とともに変化することがあるため、各パーティションのために、私は時間の経過とともに語彙を更新できる必要があります。私は正しい? – avocado

関連する問題