2016-05-24 3 views
3

Google Cloud DataprocのSparkストリーミングを使用して、複数の連続したパイプラインからなるフレームワーク(Pythonで書かれています)を実行します。 Kafkaのキューから取り出し、変換された出力をBigtableに書き込みます。すべてのパイプラインを組み合わせると、1日あたり数ギガバイトのデータを2つのクラスタ(3つのワーカーノードと4つのノード)で処理します。突発的なソケットタイムアウトが発生するDataprocのデータストリーミングスパイラルストリーム

Dataprocの上にこのSparkストリーミングフレームワークを実行すると、5月正確には):私たちはパイプラインを終了させる頻繁なソケットタイムアウト例外を経験し始めました。それは大幅に増加していないので、クラスタの負荷には関係していないようです。それはまた、一日を通して全く無作為に起こり、おそらく関連するコードの変更をチェックしたが、見つけられなかった。さらに、これは4つのワーカー・ノードを持つクラスタでのみ発生すると思われますが、3つのノードを持つクラスタのパイプラインは非常に似ており、タイムアウトはまったく発生しません。私はすでにクラスタを2回再作成しましたが、問題は残り、このdataprocクラスタ上で実行されているすべてのパイプラインに影響します。 3つのノードを持つクラスタはn1-standard-4マシンタイプですが、4つのノードを持つ問題のあるクラスタはn1-standard-8マシンタイプであり、それ以外の構成は同じです。パイプラインのジョブ実行の

出力例は、問題が発生した場合、ジョブは終了します。

java.net.SocketTimeoutException: Accept timed out 
    at java.net.PlainSocketImpl.socketAccept(Native Method) 
    at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409) 
    at java.net.ServerSocket.implAccept(ServerSocket.java:545) 
    at java.net.ServerSocket.accept(ServerSocket.java:513) 
    at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645) 
16/05/23 14:45:45 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error running job streaming job 1464014740000 ms.0 
org.apache.spark.SparkException: An exception was raised by Python: 
Traceback (most recent call last): 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 65, in call 
    r = self.func(t, *rdds) 
    File "/tmp/b85990ba-e152-4d5b-8977-fb38915e78c4/transformfwpythonfiles.zip/transformationsframework/StreamManager.py", line 138, in process_kafka_rdd 
    .foreach(lambda *args: None) 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 747, in foreach 
    self.mapPartitions(processPartition).count() # Force evaluation 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1004, in count 
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 995, in sum 
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 869, in fold 
    vals = self.mapPartitions(func).collect() 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 772, in collect 
    return list(_load_from_socket(port, self._jrdd_deserializer)) 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 142, in _load_from_socket 
    for item in serializer.load_stream(rf): 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream 
    yield self._read_with_length(stream) 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 156, in _read_with_length 
    length = read_int(stream) 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 543, in read_int 
    length = stream.read(4) 
    File "/usr/lib/python2.7/socket.py", line 380, in read 
    data = self._sock.recv(left) 
timeout: timed out 

スタックトレースの開始は、私たちのStreamManagerモジュール、メソッドprocess_kafka_rddである:それは、直接内の単一の個別RDDを処理カフカメッセージのストリーム。 KafkaとSparkストリーミングとの統合は、

+0

エラーが発生したときに消費したコンシューマとパーティションの数はいくらですか? –

答えて

1

私がSparkとソケットのエラーを経験したことは、突然死んでしまったということです。そのときに通信している他のエグゼキュータは、ソケットエラーを発生させます。

私の経験では、予期せぬ実行者の死の原因は、リソースの不足、通常はメモリ不足にぶつかっています。

(これはデフォルトでは通常、あまりにも低いです。メモリエグゼキュータの量を使用することができますチューニングすることが重要です。しかし、私はあなたがすでにこのことを認識している疑いがある。)

私はスパークが糸の上で実行されていると仮定?残念なことに、私の経験では、スパークは糸の腸の中で起こるときに問題の原因を報告する貧しい仕事をしています。残念ながら、実際に突然の死者を引き起こした原因を突き止めるために糸ログを調べなければなりません。エグゼクティブは、それぞれ糸 "コンテナ"で走ります。ヤーンのログのどこかに、落下したコンテナの記録があるはずです。

関連する問題