私はKafka(@ 1minバッチ)から読み込み、いくつかの操作の後にHTTPエンドポイントにPOSTするストリーミングジョブを持っています。スパークストリーミングカフカジョブが「処理」ステージでスタックしました
(APP-UIのページで)実行されている「エグゼキュー」を調べた後、私はそれだけ 1 6つのエグゼキュータのを発見:それは「処理」の段階で立ち往生し、その後、ジョブキューイングを開始なってき数時間ごとに2つの「活動的な仕事」を示していました。
そのためのスレッドダンプをやったら、それは「キュータのタスクの起動ワーカー」スレッドプール(source)のための2つのスレッドを示しました。これらのスレッドは、すべて同じエラーで立ち往生した。
全読めるエラー:
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:502)
java.net.InetAddress.checkLookupTable(InetAddress.java:1393)
java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1310)
java.net.InetAddress.getAllByName0(InetAddress.java:1276)
java.net.InetAddress.getAllByName(InetAddress.java:1192)
java.net.InetAddress.getAllByName(InetAddress.java:1126)
java.net.InetAddress.getByName(InetAddress.java:1076)
java.net.InetSocketAddress.<init>(InetSocketAddress.java:220)
kafka.network.BlockingChannel.liftedTree1$1(BlockingChannel.scala:59)
kafka.network.BlockingChannel.connect(BlockingChannel.scala:49)
kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:44)
kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:151)
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:69)
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:193)
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:209)
org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
org.apache.spark.scheduler.Task.run(Task.scala:85)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
を。これは、JDK 7で固定されている必要がありますJDK bugのようだ - 私は確かに私と判断しました'1.8.0_101(Oracle Corporation)'を使用しています。私は(提案hereなど)コマンドラインで、次の追加しようとしたが、それは問題を解決しませんでした:
-Djava.net.preferIPv4Stack=true -Dnetworkaddress.cache.ttl=60
誰でもデバッグへのアプローチ上の任意のアイデアを持っています/この問題を解決しますか?
*編集:混乱JDKの理由に
ホスト名解決でハングします。 heapdumpを使って解決しようとする名前を確認してください。 '-Dsun.net.spi.nameservice.provider.1 = dns、sun' – apangin
カフカブローカーホストを解決するのが遅れてしまいます。興味深いことに、常に2時間に1回、常に失敗するわけではありません。 –
ある程度時間がかかるクラウドプロバイダのDNSに依存するホストの解決に時間がかかる InetAddressアドレス= InetAddress.getByName( "www.example.com")を使用していたのと同じ問題が発生したため、Http接続にタイムアウトを設定しましたDNSルックアップのために別のサービスを使用 また、すべてのクラスタにIPベースの設定があり、プライベートIPを置くとクラスタの円滑な実行に役立ちます – vipin