私はIgnite 1.7.0を使用しており、Apache Igniteの書き戻し機能をテストしていました。この質問をする動機は、Apache Igniteでwrite write機能が有効になっている場面で何が起こっているのかをよりよく理解することです。内部でのIgnite Write
私はテストキャッシュに20個のエントリを挿入するIgnite Client Programを持っています( "test_cache"と呼んでいます)。
Ignite Serverは、同じマシン上で実行されていますが、別のJVM上で実行されています。
- が、通じ読んライトスルーと有効になっているの背後に書く:
のIgniteキャッシュは、次の構成設定があります。
- フラッシュサイズは13
- フラッシュスレッド数は、他のすべてのプロパティがデフォルトに設定されている1
です。
はまた、これにキャッシュ用に構成されたキャッシュ・ストアがあり、コードは以下の通りである:
Iが意図のThread.sleep writeAll()メソッド()メソッドのために呼び出されたpackage com.ignite.genericpoc;
import java.util.Collection;
import java.util.Map;
import javax.cache.Cache.Entry;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.resources.CacheNameResource;
import org.apache.ignite.resources.IgniteInstanceResource;
public class IgniteStoreTest implements CacheStore<String, String> {
@IgniteInstanceResource
Ignite gridReference;
@CacheNameResource
String cacheName;
@Override
public String load(String key) throws CacheLoaderException {
System.out.println("load method called for the key [ " + key + " ] and cache [ " + cacheName + " ] ");
return null;
}
@Override
public Map<String, String> loadAll(Iterable<? extends String> keys) throws CacheLoaderException {
IgniteCache<String, String> ic = gridReference.cache(cacheName);
int currentKeyNo = 0;
for (String key : keys) {
ic.put(key, "Value:" + currentKeyNo);
currentKeyNo++;
}
System.out.println("Got " + currentKeyNo + " entries");
return null;
}
@Override
public void write(Entry<? extends String, ? extends String> entry) throws CacheWriterException {
System.out.println("Write method called");
}
@Override
public void writeAll(Collection<Entry<? extends String, ? extends String>> entries) throws CacheWriterException {
System.out.println("Write all method called for [ " + entries.size() + " ] entries in the thread "
+ Thread.currentThread().getName());
System.out.println("Entries recieved by " + Thread.currentThread().getName() + " : " + entries.toString());
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void delete(Object key) throws CacheWriterException {
System.out.println("Delete method called");
}
@Override
public void deleteAll(Collection<?> keys) throws CacheWriterException {
System.out.println("Delete All method called");
}
@Override
public void loadCache(IgniteBiInClosure<String, String> clo, Object... args) throws CacheLoaderException {
System.out.println("Load cache method called with " + args[0].toString());
}
@Override
public void sessionEnd(boolean commit) throws CacheWriterException {
System.out.println("Session End called");
}
}
、遅いデータベースの書き込みをシミュレートします。次のように
キャッシュにデータをロードしているのIgniteクライアントのコードは次のとおりです。
package com.ignite.genericpoc;
import java.util.ArrayList;
import java.util.List;
import javax.cache.configuration.FactoryBuilder;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
public class IgnitePersistentStoreClientTest {
public static void main(String[] args) throws InterruptedException {
List<String> addressess = new ArrayList<>();
addressess.add("*.*.*.*:47500"); // Hiding the IP
Ignition.setClientMode(true);
Ignite i = IgniteConfigurationUtil.startIgniteServer(
IgniteConfigurationUtil.getIgniteConfiguration(false, IgniteTestConstants.GRID_NAME, addressess));
System.out.println("Client Started");
CacheConfiguration<String, String> ccfg = new CacheConfiguration<>();
ccfg.setName("Persistent_Store_Test_Cache");
ccfg.setCacheStoreFactory(FactoryBuilder.factoryOf(IgniteStoreTest.class));
ccfg.setReadThrough(true);
ccfg.setWriteThrough(true);
ccfg.setWriteBehindEnabled(true);
ccfg.setWriteBehindFlushSize(13);
ccfg.setWriteBehindFlushThreadCount(1);
System.out.println(ccfg.getWriteBehindBatchSize());
IgniteCache<String, String> ic = i.getOrCreateCache(ccfg);
System.out.println("Cache Created");
for (int t = 1; t <= 20; t++) {
System.out.println("Loading key "+t);
ic.put("Key:" + t,"Value: "+t);
System.out.println("Key "+ t + " loaded ");
}
System.out.println("Cache Loaded");
i.close();
}
}
次のように実行が起こる:
のIgniteサーバーが最初に起動されます。
データをロードするIgnite Clientは、サーバーの後に開始されます。
writeAll()メソッドで定義された60秒のスリープがあるため、Ignite Clientは20番目のエントリを書き込んでいる間にスタックされます。
また、2つのスレッドに対してwriteAll()メソッドが呼び出され、その中でFlushスレッドがストアに書き込む15のエントリを受け取り、システムスレッドが1つのエントリを受け取ったことがわかります。ストアに書き込みます。以下の通りのIgniteサーバーのログは、次のとおり
書き込みに呼び出されるすべての方法[15]フラッシャー-O-スレッド#66%のtest_grid%の
書き込みスレッドSYS [1]のエントリを求め、すべての方法のエントリ - #22%のtest_grid%
キャッシュ後書きがいっぱいになると、すべてのフラッシュのスレッドも忙しい書き込みデータであるので、私はのIgniteクライアントプットはエントリ20を書き込んで立ち往生していることを理解することができます。
次は私が明確に理解を持っている必要のあるポイントです:クライアントは、20日のエントリを挿入でブロックされているのはなぜ14日のエントリを挿入しながら
、それがベース(ブロックされている必要があります13のエントリの最大キャッシュサイズの)
私は512
にバッチサイズとがデフォルトに設定されていないとして、なぜ、唯一の15のエントリではなく、すべての19個のエントリと呼ばれるフラッシュスレッドでしたwriteAll()メソッドで呼び出されたシステムスレッドは、Ignite Clientからの要求を処理していた同じスレッドで20番目のエントリを配置していますか?
キャッシュがライトバックを有効にし、ライト・オーダー・モードがPRIMARY_SYNC(デフォルト)であり、キャッシュにバックアップがないと考えると、プライマリ・ノードが書き込みをコミットできるまで、キャッシュへのput呼び出しをブロックする必要があります。これはまた、Write Behindキャッシュにエントリを置くことができることを意味しますか?
サーバーにエントリを格納する場合、Ignite Serverはストレージ用のエントリとライトバックキャッシュ用のエントリの2つのコピーを作成します。または、同じエントリの参照が使用されています。
ご質問ありがとうございました。質問があまりにも長すぎる場合はお詫び申し上げますが、内容は関心のあるオーディエンスに状況を詳しく説明する上で不可欠でした。
すぐにお返事ありがとうございます。(flushSize * 1.5)は目が離せないものです。エントリの参照はライトバックキャッシュに置かれるため、Ignite Serverのメモリ要件を考慮してライトバックキャッシュのメモリを考慮する必要はありません。また、なぜフラッシャースレッドが19エントリの代わりに15エントリしか持っていないのか理解できません。なぜなら、ライトバックキャッシュが満杯で、バッチサイズが512であったことを考えると最適な選択だったはずです。それを1分に増加させても効果はなかった。 Flush Threadには常に15個のエントリがあります。 –
私は詳細を提供するために答えを更新しました。 – sk0x50