ヘイルキャストScheduledExecutorServiceを使用して定期的なタスクを実行しようとしています。私はヘイルキャストを使用しています。3.8.1。hazelcast ScheduledExecutorServiceノードシャットダウン後にタスクが失われる
私はあるノードを起動し、次にもう一方のノードを起動し、両方のノード間でタスクが分散されて正しく実行されます。
最初のノードをシャットダウンすると、2番目のノードが以前に最初のノードにあった定期的なタスクの実行を開始します。
問題は、最初のノードの代わりに2番目のノードを停止すると、そのタスクは最初のノードに再スケジュールされません。これは、ノードが増えても発生します。最後のノードをシャットダウンしてタスクを受信すると、それらのタスクは失われます。
シャットダウンは常にCtrl + C
で行われ、私はhazelcast例からいくつかのサンプルコードで、私はウェブ上で見つけたコードのいくつかの作品で、テストアプリケーションを作成しました。私はこのアプリの2つのインスタンスを開始します。
public class MasterMember {
/**
* The constant LOG.
*/
final static Logger logger = LoggerFactory.getLogger(MasterMember.class);
public static void main(String[] args) throws Exception {
Config config = new Config();
config.setProperty("hazelcast.logging.type", "slf4j");
config.getScheduledExecutorConfig("scheduler").
setPoolSize(16).setCapacity(100).setDurability(1);
final HazelcastInstance instance = Hazelcast.newHazelcastInstance(config);
Runtime.getRuntime().addShutdownHook(new Thread() {
HazelcastInstance threadInstance = instance;
@Override
public void run() {
logger.info("Application shutdown");
for (int i = 0; i < 12; i++) {
logger.info("Verifying whether it is safe to close this instance");
boolean isSafe = getResultsForAllInstances(hzi -> {
if (hzi.getLifecycleService().isRunning()) {
return hzi.getPartitionService().forceLocalMemberToBeSafe(10, TimeUnit.SECONDS);
}
return true;
});
if (isSafe) {
logger.info("Verifying whether cluster is safe.");
isSafe = getResultsForAllInstances(hzi -> {
if (hzi.getLifecycleService().isRunning()) {
return hzi.getPartitionService().isClusterSafe();
}
return true;
});
if (isSafe) {
System.out.println("is safe.");
break;
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
threadInstance.shutdown();
}
private boolean getResultsForAllInstances(
Function<HazelcastInstance, Boolean> hazelcastInstanceBooleanFunction) {
return Hazelcast.getAllHazelcastInstances().stream().map(hazelcastInstanceBooleanFunction).reduce(true,
(old, next) -> old && next);
}
});
new Thread(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
IScheduledExecutorService scheduler = instance.getScheduledExecutorService("scheduler");
scheduler.scheduleAtFixedRate(named("1", new EchoTask("1")), 5, 10, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(named("2", new EchoTask("2")), 5, 10, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(named("3", new EchoTask("3")), 5, 10, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(named("4", new EchoTask("4")), 5, 10, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(named("5", new EchoTask("5")), 5, 10, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(named("6", new EchoTask("6")), 5, 10, TimeUnit.SECONDS);
}).start();
new Thread(() -> {
try {
// delays init
Thread.sleep(20000);
while (true) {
IScheduledExecutorService scheduler = instance.getScheduledExecutorService("scheduler");
final Map<Member, List<IScheduledFuture<Object>>> allScheduledFutures =
scheduler.getAllScheduledFutures();
// check if the subscription already exists as a task, if so, stop it
for (final List<IScheduledFuture<Object>> entry : allScheduledFutures.values()) {
for (final IScheduledFuture<Object> objectIScheduledFuture : entry) {
logger.info(
"TaskStats: name {} isDone() {} isCanceled() {} total runs {} delay (sec) {} other statistics {} ",
objectIScheduledFuture.getHandler().getTaskName(), objectIScheduledFuture.isDone(),
objectIScheduledFuture.isCancelled(),
objectIScheduledFuture.getStats().getTotalRuns(),
objectIScheduledFuture.getDelay(TimeUnit.SECONDS),
objectIScheduledFuture.getStats());
}
}
Thread.sleep(15000);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}).start();
while (true) {
Thread.sleep(1000);
}
// Hazelcast.shutdownAll();
}
}
、タスク、私は私が何か間違ったことをやっている
public class EchoTask implements Runnable, Serializable {
/**
* serialVersionUID
*/
private static final long serialVersionUID = 5505122140975508363L;
final Logger logger = LoggerFactory.getLogger(EchoTask.class);
private final String msg;
public EchoTask(String msg) {
this.msg = msg;
}
@Override
public void run() {
logger.info("--> " + msg);
}
}
?予め
で
おかげ - EDIT -
修飾(及び上記更新)ログの代わりにSystem.outを使用するコード。タスク統計情報のロギングとConfigオブジェクトの固定使用法を追加しました。
ログ:
は、すべてのタスクが二番目を開始する前に、最初のノードで実行されてまで、私は待つことに言及し忘れました。
ありがとうございます。 3.9が出たときにこの修正が利用可能になるのでしょうか?リリースの予定日はありますか? –
これを3.8.3にバックポートしますが、いずれのリリースの日付もありません。彼らはすぐに両方とも期待されています。 –