2017-11-29 7 views
1

私はIgnite 1.7.0を使用しており、Apache Igniteの書き戻し機能をテストしていました。この質問をする動機は、Apache Igniteでwrite write機能が有効になっている場面で何が起こっているのかをよりよく理解することです。内部でのIgnite Write

私はテストキャッシュに20個のエントリを挿入するIgnite Client Programを持っています( "test_cache"と呼んでいます)。

Ignite Serverは、同じマシン上で実行されていますが、別のJVM上で実行されています。

  1. が、通じ読んライトスルーと有効になっているの背後に書く:

    のIgniteキャッシュは、次の構成設定があります。

  2. フラッシュサイズは13
  3. フラッシュスレッド数は、他のすべてのプロパティがデフォルトに設定されている1

です。

はまた、これにキャッシュ用に構成されたキャッシュ・ストアがあり、コードは以下の通りである:

Iが意図のThread.sleep writeAll()メソッド()メソッドのために呼び出された
package com.ignite.genericpoc; 

import java.util.Collection; 
import java.util.Map; 

import javax.cache.Cache.Entry; 
import javax.cache.integration.CacheLoaderException; 
import javax.cache.integration.CacheWriterException; 

import org.apache.ignite.Ignite; 
import org.apache.ignite.IgniteCache; 
import org.apache.ignite.cache.store.CacheStore; 
import org.apache.ignite.lang.IgniteBiInClosure; 
import org.apache.ignite.resources.CacheNameResource; 
import org.apache.ignite.resources.IgniteInstanceResource; 

public class IgniteStoreTest implements CacheStore<String, String> { 

@IgniteInstanceResource 
Ignite gridReference; 

@CacheNameResource 
String cacheName; 

@Override 
public String load(String key) throws CacheLoaderException { 
    System.out.println("load method called for the key [ " + key + " ] and cache [ " + cacheName + " ] "); 
    return null; 
} 

@Override 
public Map<String, String> loadAll(Iterable<? extends String> keys) throws CacheLoaderException { 

    IgniteCache<String, String> ic = gridReference.cache(cacheName); 

    int currentKeyNo = 0; 

    for (String key : keys) { 
     ic.put(key, "Value:" + currentKeyNo); 
     currentKeyNo++; 
    } 

    System.out.println("Got " + currentKeyNo + " entries"); 

    return null; 
} 

@Override 
public void write(Entry<? extends String, ? extends String> entry) throws CacheWriterException { 
    System.out.println("Write method called"); 
} 

@Override 
public void writeAll(Collection<Entry<? extends String, ? extends String>> entries) throws CacheWriterException { 
    System.out.println("Write all method called for [ " + entries.size() + " ] entries in the thread " 
      + Thread.currentThread().getName()); 

    System.out.println("Entries recieved by " + Thread.currentThread().getName() + " : " + entries.toString()); 

    try { 
     Thread.sleep(60000); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 

} 

@Override 
public void delete(Object key) throws CacheWriterException { 
    System.out.println("Delete method called"); 
} 

@Override 
public void deleteAll(Collection<?> keys) throws CacheWriterException { 
    System.out.println("Delete All method called"); 
} 

@Override 
public void loadCache(IgniteBiInClosure<String, String> clo, Object... args) throws CacheLoaderException { 
    System.out.println("Load cache method called with " + args[0].toString()); 
} 

@Override 
public void sessionEnd(boolean commit) throws CacheWriterException { 
    System.out.println("Session End called"); 
} 

} 

、遅いデータベースの書き込みをシミュレートします。次のように

キャッシュにデータをロードしているのIgniteクライアントのコードは次のとおりです。

package com.ignite.genericpoc; 

import java.util.ArrayList; 
import java.util.List; 

import javax.cache.configuration.FactoryBuilder; 

import org.apache.ignite.Ignite; 
import org.apache.ignite.IgniteCache; 
import org.apache.ignite.Ignition; 
import org.apache.ignite.configuration.CacheConfiguration; 

public class IgnitePersistentStoreClientTest { 

public static void main(String[] args) throws InterruptedException { 

    List<String> addressess = new ArrayList<>(); 

    addressess.add("*.*.*.*:47500"); // Hiding the IP 

    Ignition.setClientMode(true); 

    Ignite i = IgniteConfigurationUtil.startIgniteServer(
      IgniteConfigurationUtil.getIgniteConfiguration(false, IgniteTestConstants.GRID_NAME, addressess)); 

    System.out.println("Client Started"); 

    CacheConfiguration<String, String> ccfg = new CacheConfiguration<>(); 

    ccfg.setName("Persistent_Store_Test_Cache"); 

    ccfg.setCacheStoreFactory(FactoryBuilder.factoryOf(IgniteStoreTest.class)); 

    ccfg.setReadThrough(true); 

    ccfg.setWriteThrough(true); 

    ccfg.setWriteBehindEnabled(true); 

    ccfg.setWriteBehindFlushSize(13); 

    ccfg.setWriteBehindFlushThreadCount(1); 

    System.out.println(ccfg.getWriteBehindBatchSize()); 

    IgniteCache<String, String> ic = i.getOrCreateCache(ccfg); 

    System.out.println("Cache Created"); 

    for (int t = 1; t <= 20; t++) { 
     System.out.println("Loading key "+t); 
     ic.put("Key:" + t,"Value: "+t); 
     System.out.println("Key "+ t + " loaded "); 
    } 

    System.out.println("Cache Loaded"); 

    i.close(); 

} 

} 

次のように実行が起こる:

  1. のIgniteサーバーが最初に起動されます。

  2. データをロードするIgnite Clientは、サーバーの後に開始されます。

  3. writeAll()メソッドで定義された60秒のスリープがあるため、Ignite Clientは20番目のエントリを書き込んでいる間にスタックされます。

  4. また、2つのスレッドに対してwriteAll()メソッドが呼び出され、その中でFlushスレッドがストアに書き込む15のエントリを受け取り、システムスレッドが1つのエントリを受け取ったことがわかります。ストアに書き込みます。以下の通りのIgniteサーバーのログは、次のとおり

    書き込みに呼び出されるすべての方法[15]フラッシャー-O-スレッド#66%のtest_grid%の

    書き込みスレッドSYS [1]のエントリを求め、すべての方法のエントリ - #22%のtest_grid%

キャッシュ後書きがいっぱいになると、すべてのフラッシュのスレッドも忙しい書き込みデータであるので、私はのIgniteクライアントプットはエントリ20を書き込んで立ち往生していることを理解することができます。

次は私が明確に理解を持っている必要のあるポイントです:クライアントは、20日のエントリを挿入でブロックされているのはなぜ14日のエントリを挿入しながら

  • 、それがベース(ブロックされている必要があります13のエントリの最大キャッシュサイズの)

  • 私は512

  • にバッチサイズとがデフォルトに設定されていないとして、なぜ、唯一の15のエントリではなく、すべての19個のエントリと呼ばれるフラッシュスレッドでしたwriteAll()メソッドで呼び出されたシステムスレッドは、Ignite Clientからの要求を処理していた同じスレッドで20番目のエントリを配置していますか?

  • キャッシュがライトバックを有効にし、ライト・オーダー・モードがPRIMARY_SYNC(デフォルト)であり、キャッシュにバックアップがないと考えると、プライマリ・ノードが書き込みをコミットできるまで、キャッシュへのput呼び出しをブロックする必要があります。これはまた、Write Behindキャッシュにエントリを置くことができることを意味しますか?

  • サーバーにエントリを格納する場合、Ignite Serverはストレージ用のエントリとライトバックキャッシュ用のエントリの2つのコピーを作成します。または、同じエントリの参照が使用されています。

ご質問ありがとうございました。質問があまりにも長すぎる場合はお詫び申し上げますが、内容は関心のあるオーディエンスに状況を詳しく説明する上で不可欠でした。

答えて

2

後書きストアは、ボンネットの背圧制御を備えています。これは、システムがすべてを処理できない場合、非同期操作をオンザフライで同期に変換できることを意味します。
重要なライトビハインドキャッシュのサイズがクリティカルサイズ(flushSize * 1.5)を超える場合、書き込み操作を実行しているスレッドが、flusherThreadの代わりに使用されます。
あなたのログにこれらのスレッドを参照してください理由です:

  • フラッシャー、0〜#66%のtest_gridの%(通常のフラッシャースレッド)
  • sys-#22%のtest_grid%(背圧制御が実行されている、操作は、クライアント・スレッドを使用しています)

私のキャッシュが有効になって背後に書いて、注文モードを書くいる考慮 PRIMARY_SYNC(デフォルト)で、何のバックアップがキャッシュに存在しない、のいずれかの プット・コールキャッチプライマリノードが に書き込みをコミットできるようになるまで、eをブロックする必要があります。また、これは、キャッシュの後ろに書き込みを行うことができることを意味しますか?

はい、あります。サーバーのエントリを格納する場合には

、のIgniteサーバーは、エントリの 二つのコピー保管用と キャッシュの背後にある書き込みのための1つを作るん。または、同じエントリの参照が使用されています。

同じエントリの参照を使用する必要があります。

の手順で、このシナリオのステップを考えてみましょう:

  • クライアントスレッドは、14個のエントリをアップロードしました。 GridCacheWriteBehindStoreは、基礎となるキャッシュへのエントリの数がフラッシュサイズを超えていることを検出し、フラッシャースレッドを起動するための信号を送信します。 GridCacheWriteBehindStore#updateCache()

  • フラッシャスレッドが目を覚ますとwrite-behind-cache.entrySet().iterator()経由ライトビハインドキャッシュ(ConcurrentLinkedHashMapである)からデータを取得しようと参照してください。 このイテレータは弱く一貫性のあるトラバーサルを提供します。つまり、構築後の変更が反映されているとは限りません。 重要なことは、クライアントスレッドが新しいエントリを並行して置くことです。

  • クライアントスレッドは最後の値を入れます[key=Key:20, val=Value: 20]。同時に、フラッシャースレッドは、メソッドでThread.sleep()によってブロックされます。 GridCacheWriteBehindStoreは、ライトバックキャッシュの現在のサイズがクリティカルサイズ(フラッシュサイズ* 1.5)を超えていることを検出するため、バックプレッシャーメカニズムを使用する必要があります。 GridCacheWriteBehindStoreflushSingleValue()メソッドを呼び出すことで、最も古い値をライトビハインドキャッシュからフラッシュすることができます(もちろん、前のフラッシャースレッドではこの値を取得しないでください)。 flushSingleValue()メソッドは、クライアントスレッドのコンテキストで呼び出されます。

  • その後、フラッシャースレッドがウェイクアップして残りのエントリを処理します。

私は、後書きストアの実装について理解していただければ幸いです。

ありがとうございます!

+0

すぐにお返事ありがとうございます。(flushSize * 1.5)は目が離せないものです。エントリの参照はライトバックキャッシュに置かれるため、Ignite Serverのメモリ要件を考慮してライトバックキャッシュのメモリを考慮する必要はありません。また、なぜフラッシャースレッドが19エントリの代わりに15エントリしか持っていないのか理解できません。なぜなら、ライトバックキャッシュが満杯で、バッチサイズが512であったことを考えると最適な選択だったはずです。それを1分に増加させても効果はなかった。 Flush Threadには常に15個のエントリがあります。 –

+0

私は詳細を提供するために答えを更新しました。 – sk0x50

関連する問題