2016-10-19 17 views
1

ストリーミングジョブを実行します。
start-clusted.shとFlink Web Interfaceを使用してローカルで実行しようとすると、問題はありません。
Flink:実行中のジョブをキャンセルできません(ストリーミング)

しかし、私は現在YARN にFLINKを使用して、私のジョブを実行しようとしています(GoogleのDataprocに配備)、私はそれをキャンセルしようとすると、 キャンセル状態が永遠に続くとスロットは に占めるたままタスクマネージャー。

2016-10-18 16:56:04,053 INFO org.apache.flink.runtime.taskmanager.Task - 
Attempting to cancel task Source: pubSubMessageAcknowledgingSource -> 
TrackingDisplayPushDeduplicater -> TrackingDisplayPushDeserializer -> 
(Sink: TrackingDisplayPushErrorFlumeSink, Map -> Sink: 
TrackingDisplayPushValidFlumeSink) (1/1) 
2016-10-18 16:56:04,053 INFO org.apache.flink.runtime.taskmanager.Task - 
Source: pubSubMessageAcknowledgingSource -> 
TrackingDisplayPushDeduplicater -> TrackingDisplayPushDeserializer -> 
(Sink: TrackingDisplayPushErrorFlumeSink, Map -> Sink: 
TrackingDisplayPushValidFlumeSink) (1/1) switched to CANCELING 
2016-10-18 16:56:04,053 INFO org.apache.flink.runtime.taskmanager.Task - 
Triggering cancellation of task code Source: 
pubSubMessageAcknowledgingSource -> TrackingDisplayPushDeduplicater -> 
TrackingDisplayPushDeserializer -> (Sink: 
TrackingDisplayPushErrorFlumeSink, Map -> Sink: 
TrackingDisplayPushValidFlumeSink) (1/1) (38bf32d9199a0c9383a8b1e8d73a1f65). 
2016-10-18 16:56:34,055 WARN org.apache.flink.runtime.taskmanager.Task - 
Task 'Source: pubSubMessageAcknowledgingSource -> 
TrackingDisplayPushDeduplicater -> TrackingDisplayPushDeserializer -> 
(Sink: TrackingDisplayPushErrorFlumeSink, Map -> Sink: 
TrackingDisplayPushValidFlumeSink) (1/1)' did not react to cancelling 
signal, but is stuck in method: 
java.net.PlainSocketImpl.socketConnect(Native Method) 
java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) 
java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) 
java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) 
java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
java.net.Socket.connect(Socket.java:589) 
java.net.Socket.connect(Socket.java:538) 
sun.net.NetworkClient.doConnect(NetworkClient.java:180) 
sun.net.www.http.HttpClient.openServer(HttpClient.java:432) 
sun.net.www.http.HttpClient.openServer(HttpClient.java:527) 
sun.net.www.http.HttpClient.<init>(HttpClient.java:211) 
sun.net.www.http.HttpClient.New(HttpClient.java:308) 
sun.net.www.http.HttpClient.New(HttpClient.java:326) 
sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1169) 
sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1105) 
sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:999) 
sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:933) 
sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1283) 
sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1258) 
com.accengage.bigdata.flink.streaming.sinks.FlumeSink.flush(FlumeSink.java:107) 
com.accengage.bigdata.flink.streaming.sinks.FlumeSink.invoke(FlumeSink.java:80) 
com.accengage.bigdata.flink.streaming.sinks.FlumeSink.invoke(FlumeSink.java:25)l 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39) 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) 
org.apache.flink.streaming.api.collector.selector.DirectedOutput.collect(DirectedOutput.java:126) 
org.apache.flink.streaming.api.collector.selector.DirectedOutput.collect(DirectedOutput.java:35) 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38) 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373) 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358) 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346) 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329) 
org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:160) 
com.accengage.bigdata.flink.streaming.sources.PubSubAcknowledgingSource.run(PubSubAcknowledgingSource.java:148) 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:80) 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:53) 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) 
java.lang.Thread.run(Thread.java:745) 

私が間違っているのかの任意のアイデア:

ここで私が得たログがありますか?
私は何ができますか?

ありがとうございました。

答えて

1

Flumeとの通信にいくつかのHTTPライブラリを使用するカスタムシンク(com.accengage.bigdata.flink.streaming.sinks.FlumeSink)を使用しているとします。割り込みが問題を解決するには、スレッド(中断例外は無視されたときに、これは、たとえば起こる)

に送信されたときに

ほとんどの場合、HTTPライブラリがループか何かに打たれてしまった、のいずれかを使用でき割り込みを適切に処理するHTTPライブラリ、またはメインスレッドで割り込みを受け取らない別のスレッドからライブラリを呼び出す。

Flink 1.2には、システムがcancel()呼び出しを打つのを防ぐためのいくつかの追加メカニズムがあります。 FLINK-4715を参照してください。

+1

原因は、接続の試行でタイムアウトが発生しておらず、そこに滞留していたためです。 –

関連する問題