Google Cloud DataprocのSparkストリーミングを使用して、複数の連続したパイプラインからなるフレームワーク(Pythonで書かれています)を実行します。 Kafkaのキューから取り出し、変換された出力をBigtableに書き込みます。すべてのパイプラインを組み合わせると、1日あたり数ギガバイトのデータを2つのクラスタ(3つのワーカーノードと4つのノード)で処理します。突発的なソケットタイムアウトが発生するDataprocのデータストリーミングスパイラルストリーム
Dataprocの上にこのSparkストリーミングフレームワークを実行すると、5月正確には):私たちはパイプラインを終了させる頻繁なソケットタイムアウト例外を経験し始めました。それは大幅に増加していないので、クラスタの負荷には関係していないようです。それはまた、一日を通して全く無作為に起こり、おそらく関連するコードの変更をチェックしたが、見つけられなかった。さらに、これは4つのワーカー・ノードを持つクラスタでのみ発生すると思われますが、3つのノードを持つクラスタのパイプラインは非常に似ており、タイムアウトはまったく発生しません。私はすでにクラスタを2回再作成しましたが、問題は残り、このdataprocクラスタ上で実行されているすべてのパイプラインに影響します。 3つのノードを持つクラスタはn1-standard-4
マシンタイプですが、4つのノードを持つ問題のあるクラスタはn1-standard-8
マシンタイプであり、それ以外の構成は同じです。パイプラインのジョブ実行の
出力例は、問題が発生した場合、ジョブは終了します。
java.net.SocketTimeoutException: Accept timed out
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
at java.net.ServerSocket.implAccept(ServerSocket.java:545)
at java.net.ServerSocket.accept(ServerSocket.java:513)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645)
16/05/23 14:45:45 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error running job streaming job 1464014740000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 65, in call
r = self.func(t, *rdds)
File "/tmp/b85990ba-e152-4d5b-8977-fb38915e78c4/transformfwpythonfiles.zip/transformationsframework/StreamManager.py", line 138, in process_kafka_rdd
.foreach(lambda *args: None)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 747, in foreach
self.mapPartitions(processPartition).count() # Force evaluation
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1004, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 995, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 869, in fold
vals = self.mapPartitions(func).collect()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 772, in collect
return list(_load_from_socket(port, self._jrdd_deserializer))
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 142, in _load_from_socket
for item in serializer.load_stream(rf):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream
yield self._read_with_length(stream)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 156, in _read_with_length
length = read_int(stream)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 543, in read_int
length = stream.read(4)
File "/usr/lib/python2.7/socket.py", line 380, in read
data = self._sock.recv(left)
timeout: timed out
スタックトレースの開始は、私たちのStreamManager
モジュール、メソッドprocess_kafka_rddである:それは、直接内の単一の個別RDDを処理カフカメッセージのストリーム。 KafkaとSparkストリーミングとの統合は、
エラーが発生したときに消費したコンシューマとパーティションの数はいくらですか? –