これは、一日と言うために通ってくるしているどのように多くのエラーメッセージ/警告メッセージの数を保つためにPyspark - スパークセッションのうち転送制御(SC)
Pyspark filter operation on Dstream
のフォローアップの質問です、時間 - どのように仕事をデザインするのか。
私が試してみました:
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def counts():
counter += 1
print(counter.value)
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 5)
counter = sc.accumulator(0)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
errors = lines.filter(lambda l: "error" in l.lower())
errors.foreachRDD(lambda e : e.foreach(counts))
errors.pprint()
ssc.start()
ssc.awaitTermination()
これはしかし、印刷で開始する、複数の問題があります動作しません(出力はstdoutにない、私はそれについて読んだことが、私が使用することができる最高はここにありますロギング)。その関数の出力をテキストファイルに保存し、代わりにそのファイルをテールできますか?
私はプログラムがちょうど出てくる理由は、エラーがないことを確認していない/
どのように1は状態を保持ん(火花1.6.2)に、さらに見てどこかダンプ?
foreachRDD(Dstream):
if RDD.contains("keyword1 | keyword2 | keyword3"):
dictionary[keyword] = dictionary.get(keyword,0) + 1 //add the keyword if not present and increase the counter
print dictionary //or send this dictionary to else where
:私は何をしようとしていますが、サーバーおよび重症度によりログを集約することで、別のユースケースは、特定のキーワードのために私が試してみたいもののため
擬似コードを見ることによって処理されたどのように多くのトランザクション数えることです送信または印刷辞書の最後の部分は、スパークストリーミングコンテキストの切り替えを必要とする - 誰かがその概念を説明できますか?
何かの理由で、何らかの理由でプログラムのどこかで "print"を使うとエラーが出なくなります。 – GreenThumb