2017-05-15 3 views
0

ヘイルキャストScheduledExecutorServiceを使用して定期的なタスクを実行しようとしています。私はヘイルキャストを使用しています。3.8.1。hazelcast ScheduledExecutorServiceノードシャットダウン後にタスクが失われる

私はあるノードを起動し、次にもう一方のノードを起動し、両方のノード間でタスクが分散されて正しく実行されます。

最初のノードをシャットダウンすると、2番目のノードが以前に最初のノードにあった定期的なタスクの実行を開始します。

問題は、最初のノードの代わりに2番目のノードを停止すると、そのタスクは最初のノードに再スケジュールされません。これは、ノードが増えても発生します。最後のノードをシャットダウンしてタスクを受信すると、それらのタスクは失われます。

シャットダウンは常にCtrl + C

で行われ、私はhazelcast例からいくつかのサンプルコードで、私はウェブ上で見つけたコードのいくつかの作品で、テストアプリケーションを作成しました。私はこのアプリの2つのインスタンスを開始します。

public class MasterMember { 

/** 
* The constant LOG. 
*/ 
final static Logger logger = LoggerFactory.getLogger(MasterMember.class); 

public static void main(String[] args) throws Exception { 

    Config config = new Config(); 
    config.setProperty("hazelcast.logging.type", "slf4j"); 
    config.getScheduledExecutorConfig("scheduler"). 
    setPoolSize(16).setCapacity(100).setDurability(1); 

    final HazelcastInstance instance = Hazelcast.newHazelcastInstance(config); 

    Runtime.getRuntime().addShutdownHook(new Thread() { 

     HazelcastInstance threadInstance = instance; 

     @Override 
     public void run() { 
      logger.info("Application shutdown"); 

      for (int i = 0; i < 12; i++) { 
       logger.info("Verifying whether it is safe to close this instance"); 
       boolean isSafe = getResultsForAllInstances(hzi -> { 
        if (hzi.getLifecycleService().isRunning()) { 
         return hzi.getPartitionService().forceLocalMemberToBeSafe(10, TimeUnit.SECONDS); 
        } 
        return true; 
       }); 

       if (isSafe) { 
        logger.info("Verifying whether cluster is safe."); 
        isSafe = getResultsForAllInstances(hzi -> { 
         if (hzi.getLifecycleService().isRunning()) { 
          return hzi.getPartitionService().isClusterSafe(); 
         } 
         return true; 
        }); 

        if (isSafe) { 
         System.out.println("is safe."); 
         break; 
        } 
       } 

       try { 
        Thread.sleep(1000); 
       } catch (InterruptedException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 
      } 

      threadInstance.shutdown(); 

     } 

     private boolean getResultsForAllInstances(
       Function<HazelcastInstance, Boolean> hazelcastInstanceBooleanFunction) { 

      return Hazelcast.getAllHazelcastInstances().stream().map(hazelcastInstanceBooleanFunction).reduce(true, 
        (old, next) -> old && next); 
     } 
    }); 

    new Thread(() -> { 

     try { 
      Thread.sleep(10000); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

     IScheduledExecutorService scheduler = instance.getScheduledExecutorService("scheduler"); 
     scheduler.scheduleAtFixedRate(named("1", new EchoTask("1")), 5, 10, TimeUnit.SECONDS); 
     scheduler.scheduleAtFixedRate(named("2", new EchoTask("2")), 5, 10, TimeUnit.SECONDS); 
     scheduler.scheduleAtFixedRate(named("3", new EchoTask("3")), 5, 10, TimeUnit.SECONDS); 
     scheduler.scheduleAtFixedRate(named("4", new EchoTask("4")), 5, 10, TimeUnit.SECONDS); 
     scheduler.scheduleAtFixedRate(named("5", new EchoTask("5")), 5, 10, TimeUnit.SECONDS); 
     scheduler.scheduleAtFixedRate(named("6", new EchoTask("6")), 5, 10, TimeUnit.SECONDS); 
    }).start(); 

    new Thread(() -> { 

     try { 
      // delays init 
      Thread.sleep(20000); 

      while (true) { 

       IScheduledExecutorService scheduler = instance.getScheduledExecutorService("scheduler"); 
       final Map<Member, List<IScheduledFuture<Object>>> allScheduledFutures = 
         scheduler.getAllScheduledFutures(); 

       // check if the subscription already exists as a task, if so, stop it 
       for (final List<IScheduledFuture<Object>> entry : allScheduledFutures.values()) { 
        for (final IScheduledFuture<Object> objectIScheduledFuture : entry) { 
         logger.info(
           "TaskStats: name {} isDone() {} isCanceled() {} total runs {} delay (sec) {} other statistics {} ", 
           objectIScheduledFuture.getHandler().getTaskName(), objectIScheduledFuture.isDone(), 
           objectIScheduledFuture.isCancelled(), 
           objectIScheduledFuture.getStats().getTotalRuns(), 
           objectIScheduledFuture.getDelay(TimeUnit.SECONDS), 
           objectIScheduledFuture.getStats()); 
        } 
       } 

       Thread.sleep(15000); 

      } 

     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

    }).start(); 

    while (true) { 
     Thread.sleep(1000); 
    } 
    // Hazelcast.shutdownAll(); 
} 
} 

、タスク、私は私が何か間違ったことをやっている

public class EchoTask implements Runnable, Serializable { 

/** 
* serialVersionUID 
*/ 
private static final long serialVersionUID = 5505122140975508363L; 

final Logger logger = LoggerFactory.getLogger(EchoTask.class); 

private final String msg; 

public EchoTask(String msg) { 
    this.msg = msg; 
} 

@Override 
public void run() { 
    logger.info("--> " + msg); 
} 
} 

?予め


おかげ - EDIT -

修飾(及び上記更新)ログの代わりにSystem.outを使用するコード。タスク統計情報のロギングとConfigオブジェクトの固定使用法を追加しました。

ログ:

Node1_log

Node2_log

は、すべてのタスクが二番目を開始する前に、最初のノードで実行されてまで、私は待つことに言及し忘れました。

答えて

0

これを報告してくれてありがとう、本当にバグです。残念ながら、複数のノードではそれほど明白ではありませんでした。あなたが答えによって分かったように、タスクを失うことなく、移行後にキャンセルしてください。ただし、あなたの修正は安全ではありません。なぜなら、Taskはキャンセルされ、同時にnullのFutureがあるからです。あなたがマスターレプリカを取り消すと、決して未来を持っていないバックアップが結果を得るだけです。修正はあなたが行ったことに非常に近いので、prepareForReplication()の場合migrationModeに結果を設定することは避けてください。私はすぐに修正を加える予定で、もう少しテストを実行するだけです。これは、マスターとそれ以降のバージョンで利用可能になります。

あなたの発見に問題が記録されましたが、あなたが気にしない場合は、https://github.com/hazelcast/hazelcast/issues/10603のステータスを追跡することができます。

+0

ありがとうございます。 3.9が出たときにこの修正が利用可能になるのでしょうか?リリースの予定日はありますか? –

+0

これを3.8.3にバックポートしますが、いずれのリリースの日付もありません。彼らはすぐに両方とも期待されています。 –

0

hazelcastプロジェクト(3.8.1ソースコードで使用)のScheduledExecutorContainerクラス、つまり、promoteStash()メソッドを変更することで、この問題を迅速に解決できました。基本的に私は以前のデータの移行でタスクがキャンセルされたというケースの条件を追加しました。 私は今この変更の副作用の可能性はありませんか、これが最善の方法であるかどうかは分かりません。

void promoteStash() { 
    for (ScheduledTaskDescriptor descriptor : tasks.values()) { 
     try { 
      if (logger.isFinestEnabled()) { 
       logger.finest("[Partition: " + partitionId + "] " + "Attempt to promote stashed " + descriptor); 
      } 

      if (descriptor.shouldSchedule()) { 
       doSchedule(descriptor); 
      } else if (descriptor.getTaskResult() != null && descriptor.getTaskResult().isCancelled() 
        && descriptor.getScheduledFuture() == null) { 
       // tasks that were already present in this node, once they get sent back to this node, since they 
       // have been cancelled when migrating the task to other node, are not rescheduled... 
       logger.fine("[Partition: " + partitionId + "] " + "Attempt to promote stashed canceled task " 
         + descriptor); 

       descriptor.setTaskResult(null); 
       doSchedule(descriptor); 
      } 

      descriptor.setTaskOwner(true); 
     } catch (Exception e) { 
      throw rethrow(e); 
     } 
    } 
} 
関連する問題