2016-04-03 2 views
0

曖昧なタイトルは申し訳ありませんが、どのように問題が簡潔に記述されているか分かりません。ScheduledExecutorServiceで変わったこと

ScheduledExecutorServiceを使用して、私はRunnableを初期遅延なく5秒ごとに実行するようにスケジュールします。私は、60秒後にScheduledExecutorServiceでshutdonw()を呼び出すshceduledタスクを持っています。シャットダウンが発生すると、メインスレッドが停止しているように見えます。終了しません。

以下のコードでは、 "Ping"が5秒ごとにブロッキングキューから正しくピックアップされます。 ScheduledExecutorServiceがシャットダウンされると、「Ping」の印刷が停止し、isDone()チェックの下の行は実行されず、テストの最後にロガーも実行されませんが、Eclipseはまだテストが実行中であることを示します。

"テスト"

package scheduledExecutorTest; 

import org.junit.Test; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import com.paul.scheduledexecutortest.service.HeartBeatService; 
import com.paul.scheduledexecutortest.service.MasterScheduler; 

public class SchedulerTest { 

private Logger LOGGER = LoggerFactory.getLogger(SchedulerTest.class); 
    @Test 
    public void testScheduleTasks() throws InterruptedException { 

     MasterScheduler.scheduleToRunOnceWithInitialDelay(new Runnable() { 

      @Override 
      public void run() { 
       MasterScheduler.shutDown(); 
      } 
     }, 60L); 

     HeartBeatService heartbeatService = new HeartBeatService(); 
     heartbeatService.doStart(); 
     LOGGER.debug("doStart() returned"); //THIS NEVER GETS HIT 
    } 
} 

ハートビート・サービス(だけではなく、メイン()のそれを使用して、何をテストしていません)というスケジュールハートビート:

package com.paul.scheduledexecutortest.service; 

import java.util.concurrent.ScheduledFuture; 

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import com.paul.scheduledexecutortest.HeartBeat; 

public class HeartBeatService { 

    private Logger LOGGER = LoggerFactory.getLogger(HeartBeatService.class); 

    public void doStart() throws InterruptedException { 

     Scheduler<String> scheduler = new Scheduler<String>(); 
     ScheduledFuture<String> taskStatus = scheduler.scheduleToRunPeriodically(new HeartBeat(), 5L); 

     try { 
      while (taskStatus.isDone() == false) { 
       LOGGER.debug(scheduler.getTaskOutput()); 
      } 
     } 
     catch (Exception ex) { 
      LOGGER.error("Something happened"); 
     } 

     LOGGER.debug("COMPLETE"); //THIS NEVER GETS HIT 

    } 
} 

スケジューラ:

package com.paul.scheduledexecutortest.service; 

import java.util.concurrent.ArrayBlockingQueue; 
import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.ScheduledFuture; 

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import com.paul.scheduledexecutortest.ScheduledTask; 

public class Scheduler<T> { 

    private Logger LOGGER = LoggerFactory.getLogger(Scheduler.class); 
    private BlockingQueue<T> queue = new ArrayBlockingQueue<T>(1); 

    @SuppressWarnings("unchecked") 
    public ScheduledFuture<T> scheduleToRunPeriodically(ScheduledTask<T> scheduledTask, 
      long timeIntervalSeconds) { 

     Runnable task = transformIntoRunnable(scheduledTask); 
     return (ScheduledFuture<T>) MasterScheduler.scheduleToRunPeriodically(task, 
       timeIntervalSeconds); 
    } 

    @SuppressWarnings("unchecked") 
    public ScheduledFuture<T> scheduleToRunPeriodicallyWithInitialDelay(
      ScheduledTask<T> scheduledTask, long repeatTimeIntervalSeconds, 
      long initalDelaySeconds) { 

     Runnable task = transformIntoRunnable(scheduledTask); 
     return (ScheduledFuture<T>) MasterScheduler.scheduleToRunPeriodicallyWithInitialDelay(task, 
       repeatTimeIntervalSeconds, initalDelaySeconds); 
    } 

    public void scheduleToRunOnceWithInitialDelay(ScheduledTask<T> scheduledTask, 
      long timeIntervalSeconds) { 

     Runnable task = transformIntoRunnable(scheduledTask); 
     MasterScheduler.scheduleToRunOnceWithInitialDelay(task, timeIntervalSeconds); 
    } 

    private Runnable transformIntoRunnable(final ScheduledTask<T> scheduledTask) { 
     LOGGER.debug("Converting ScheduledTask into Runnable"); 
     return() -> queue.add(scheduledTask.invoke()); 
    } 

    public T getTaskOutput() throws InterruptedException { 
     return queue.take(); 
    } 

} 

シングルトンマスタースケジューラ

package com.paul.scheduledexecutortest.service; 

import java.util.concurrent.Executors; 
import java.util.concurrent.ScheduledExecutorService; 
import java.util.concurrent.ScheduledFuture; 
import java.util.concurrent.TimeUnit; 

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

public class MasterScheduler { 

    private static Logger LOGGER = LoggerFactory.getLogger(MasterScheduler.class); 

    public static final int THREAD_POOL_SIZE = 10; 

    private static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(THREAD_POOL_SIZE); 

    public static ScheduledFuture<?> scheduleToRunPeriodically(Runnable task, long timeIntervalSeconds) { 

     LOGGER.debug("Scheduling task to run async every " + timeIntervalSeconds + " seconds without delay"); 
     return scheduler.scheduleAtFixedRate(task, 0, timeIntervalSeconds, TimeUnit.SECONDS); 
    } 

    public static ScheduledFuture<?> scheduleToRunPeriodicallyWithInitialDelay(Runnable task, 
      long repeatTimeIntervalSeconds, long initalDelaySeconds) { 

     LOGGER.debug("Seceduling task to run every " + repeatTimeIntervalSeconds + " seconds after initial delay of " 
       + initalDelaySeconds + " seconds"); 

     return scheduler.scheduleAtFixedRate(task, initalDelaySeconds, repeatTimeIntervalSeconds, TimeUnit.SECONDS); 
    } 

    public static void scheduleToRunOnceWithInitialDelay(Runnable task, long timeIntervalSeconds) { 

     scheduler.schedule(task, timeIntervalSeconds, TimeUnit.SECONDS); 
    } 

    public static void shutDown() { 
     System.err.println("SCHEDULER SHUTTING DOWN GRACEFULLY. NO NEW TASKS ALLOWED"); 
     scheduler.shutdown(); 
    } 
} 

Heatbeat(予定されたタスク)

package com.paul.scheduledexecutortest; 

public class HeartBeat implements ScheduledTask<String> { 

    @Override 
    public String invoke() { 
     return "Ping"; 
    } 

} 

出力:

13:48:41.721 [main] DEBUG com.paul.scheduledexecutortest.service.Scheduler - Converting ScheduledTask into Runnable 
13:48:41.780 [main] DEBUG com.paul.scheduledexecutortest.service.MasterScheduler - Scheduling task to run async every 5 without delay 
13:48:41.781 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping 
13:48:46.807 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping 
13:48:51.782 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping 
13:48:56.782 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping 
13:49:01.782 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping 
13:49:06.783 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping 
13:49:11.783 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping 
13:49:16.783 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping 
13:49:21.784 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping 
13:49:26.783 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping 
13:49:31.783 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping 
13:49:36.783 [main] DEBUG com.paul.scheduledexecutortest.service.HeartBeatService - Ping 
SCHEDULER SHUTTING DOWN GRACEFULLY. NO NEW TASKS ALLOWED 

答えて

1

それはと思われますtaskStatus.isDone()はほぼ常にfalseであるため、doStart()メソッドがブロックしています。thisを参照してください。

あなたのScheduler.javaクラスでは、なぜ1つの要素しか含んでいないときにブロックキューが必要ですか?

Test.java:

MasterScheduler.scheduleToRunOnceWithInitialDelay(() -> MasterScheduler.shutDown(), 60L); 
HeartBeatService heartbeatService = new HeartBeatService(); 
heartbeatService.doStart(); 

HeartBeatService:

public void doStart() throws InterruptedException { 
    Scheduler<String> scheduler = new Scheduler<String>(); 
    scheduler.scheduleToRunPeriodically(new HeartBeat(), 5L); 
} 

Scheduler.java:

は、私は、以下の変更を実行することをお勧め

private Runnable transformIntoRunnable(final ScheduledTask<T> scheduledTask) { 
    LOGGER.debug("Converting ScheduledTask into Runnable"); 
    return() -> LOGGER.debug((String)scheduledTask.invoke()); 
} 

MasterScheduler.java:

public static void shutDown() { 
    System.err.println("SCHEDULER SHUTTING DOWN GRACEFULLY. NO NEW TASKS ALLOWED"); 
    scheduler.shutdown(); 
    try { 
     if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) { 
     scheduler.shutdownNow(); 
     } 
    } catch (InterruptedException e) { 
    } 
} 
関連する問題