2017-07-31 8 views
1

Redisアダプタを使用してGeodeに書き込んだ後、Geodeクライアントを使用してイベントに読み書きするにはどうすればよいですか?私は、次の操作を実行しようとしている

  1. データはのGeodeクライアントを使用していることCacheListener
  2. 読むを使用して、いくつかのキーを作成/更新イベントに反応Redisのコネクタ
  3. を使用してジオードにいくつかのデータを入れて地域の方法はentrySetです。

私のGeodeクライアントからのredisデータにはすでにアクセスできませんでした。

region.get(Coder.stringToByteArrayWrapper("key")); 

は私もregion.entrySet()作品を作るのトラブルの多くを持っていた:私は、次の操作を行う必要がありました。まず、それがすべてでClientRegionShortcut.PROXYでは動作しません、それはClientRegionShortcut.CACHING_PROXYと時間の50%のみが働くようです。

ここで(私はRedisのクライアントとしてlettuceを使用していますことに注意してください)私はこれをテストするために使用しているコードです:

@Test 
public void test_subscribe() throws InterruptedException, ExecutionException { 
    ClientCache cache = new ClientCacheFactory() 
     .addPoolLocator(HOST, LOCATOR_PORT) 
     .create(); 

    @SuppressWarnings({ "rawtypes", "unchecked" }) 
    CacheListener<ByteArrayWrapper, ByteArrayWrapper> cl = new CacheListenerAdapter() { 
     @Override 
     public void afterCreate(EntryEvent event) { 
      System.out.println("Created: " + event.getKey() + " = " + event.getNewValue()); 
     } 

     @Override 
     public void afterUpdate(EntryEvent event) { 
      System.out.println("Updated: " + event.getKey() + " replacing " + event.getOldValue() + "with" + event.getNewValue()); 
     } 
    }; 

    Region<ByteArrayWrapper, ByteArrayWrapper> region = cache 
     .<ByteArrayWrapper, ByteArrayWrapper> createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY) 
     .addCacheListener(cl) 
     .setKeyConstraint(ByteArrayWrapper.class) 
     .setValueConstraint(ByteArrayWrapper.class) 
     .create(GeodeRedisServer.STRING_REGION); 

    RedisClient client = RedisClient.create("redis://" + HOST); 
    StatefulRedisConnection<String, String> connection = client.connect(); 
    RedisAsyncCommands<String, String> cmd = connection.async(); 

    cmd.set("1", "HelloGeodeRedis").get(); 
    cmd.set("2", "WorldGeodeRedis" + System.currentTimeMillis()).get(); 

    System.out.println("FromRedis: " + cmd.get("2").get()); 
    System.out.println("FromGeode: " + region.get(Coder.stringToByteArrayWrapper("2"))); 

    for (Map.Entry<?, ?> entry : region.entrySet()) { 
     System.out.format("key = %s, value = %s\n", entry.getKey(), entry.getValue()); 
    } 

    cache.close(); 
} 

50%のブツは、サーバに関係している場合、私は思ったんだけど私が実行しているよ:

gfsh>describe region --name=ReDiS_StRiNgS 
.......................................................... 
Name   : ReDiS_StRiNgS 
Data Policy  : persistent partition 
Hosting Members : my-redis 

Non-Default Attributes Shared By Hosting Members 

Type | Name  | Value 
------ | ----------- | -------------------- 
Region | size  | 2 
     | data-policy | PERSISTENT_PARTITION 

gfsh>describe region --name my-region 
.......................................................... 
Name   : my-region 
Data Policy  : persistent replicate 
Hosting Members : my-server 
        my-redis 

Non-Default Attributes Shared By Hosting Members 

Type | Name  | Value 
------ | ----------- | -------------------- 
Region | data-policy | PERSISTENT_REPLICATE 
     | size  | 2 
     | scope  | distributed-ack 

gfsh>list members 
     Name  | Id 
---------------- | ----------------------------------------------------------- 
my-locator | 172.16.202.245(my-locator:21234:locator)<ec><v0>:1024 
my-server | 172.16.202.245(my-server:22154)<v1>:1025 
my-redis | 172.16.202.245(my-redis:24890)<v2>:1026 

あなたは私が手動で作成した領域は、両方のサーバーでホストされているが、Redisのによって作成されたものが唯一のRedisのサーバーでホストされて見ることができるように。

私は時間の50%を取得していますエラーは以下の通りです:

org.apache.geode.cache.client.ServerOperationException: remote server on My-Computer(4352:loner):64103:58d54999: While performing a remote get 
    at org.apache.geode.cache.client.internal.AbstractOp.processObjResponse(AbstractOp.java:285) 
    at org.apache.geode.cache.client.internal.GetOp$GetOpImpl.processResponse(GetOp.java:143) 
    at org.apache.geode.cache.client.internal.AbstractOp.attemptReadResponse(AbstractOp.java:171) 
    at org.apache.geode.cache.client.internal.AbstractOp.attempt(AbstractOp.java:382) 
    at org.apache.geode.cache.client.internal.ConnectionImpl.execute(ConnectionImpl.java:275) 
    at org.apache.geode.cache.client.internal.pooling.PooledConnection.execute(PooledConnection.java:332) 
    at org.apache.geode.cache.client.internal.OpExecutorImpl.executeWithPossibleReAuthentication(OpExecutorImpl.java:900) 
    at org.apache.geode.cache.client.internal.OpExecutorImpl.execute(OpExecutorImpl.java:158) 
    at org.apache.geode.cache.client.internal.OpExecutorImpl.execute(OpExecutorImpl.java:115) 
    at org.apache.geode.cache.client.internal.PoolImpl.execute(PoolImpl.java:763) 
    at org.apache.geode.cache.client.internal.GetOp.execute(GetOp.java:91) 
    at org.apache.geode.cache.client.internal.ServerRegionProxy.get(ServerRegionProxy.java:116) 
    at org.apache.geode.internal.cache.LocalRegion.findObjectInSystem(LocalRegion.java:2776) 
    at org.apache.geode.internal.cache.LocalRegion.nonTxnFindObject(LocalRegion.java:1488) 
    at org.apache.geode.internal.cache.LocalRegionDataView.findObject(LocalRegionDataView.java:175) 
    at org.apache.geode.internal.cache.LocalRegion.get(LocalRegion.java:1377) 
    at org.apache.geode.internal.cache.LocalRegion.get(LocalRegion.java:1310) 
    at org.apache.geode.internal.cache.LocalRegion.get(LocalRegion.java:1295) 
    at org.apache.geode.internal.cache.AbstractRegion.get(AbstractRegion.java:320) 
    at trial.GeodeTest.test_subscribe(GeodeTest.java:112) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) 
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) 
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) 
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) 
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) 
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) 
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363) 
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86) 
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) 
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459) 
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678) 
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382) 
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192) 
Caused by: org.apache.geode.cache.RegionDestroyedException: Server connection from [identity(192.168.64.106(4352:loner):64103:58d54999,connection=1; port=64103]: Region named /ReDiS_StRiNgS/ReDiS_StRiNgS was not found during get request 
    at org.apache.geode.internal.cache.tier.sockets.BaseCommand.writeRegionDestroyedEx(BaseCommand.java:615) 
    at org.apache.geode.internal.cache.tier.sockets.command.Get70.cmdExecute(Get70.java:126) 
    at org.apache.geode.internal.cache.tier.sockets.BaseCommand.execute(BaseCommand.java:165) 
    at org.apache.geode.internal.cache.tier.sockets.ServerConnection.doNormalMsg(ServerConnection.java:780) 
    at org.apache.geode.internal.cache.tier.sockets.ServerConnection.doOneMessage(ServerConnection.java:911) 
    at org.apache.geode.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:1166) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at org.apache.geode.internal.cache.tier.sockets.AcceptorImpl$1$1.run(AcceptorImpl.java:523) 
    at java.lang.Thread.run(Thread.java:745) 

、私はこれをテストしていた理由は、私はカフカからのデータを入れたいということですので、あなたは完全な免責条項を持っていますジオコードにkafka-connect-redisを使用してトピックにトピックを追加すると、ジオードkafkaコネクタを自分でコーディングする必要がなくなります。

EDIT:50%問題が@Swapnilのおかげで修正されましたが、今私は戻ってトラブルentrySetを作り、イベント通知の仕事を持つことです。私が強制的に私がredisクライアントを使用してキーを取得しない限り、私は何もEntryEvent通知を取得していないようだ。

答えて

1

Geodeサーバーの起動方法と思われます。

gfsh>start locator --name=loc1 
gfsh>start server --name=serv1 --redis-port=1111 --redis-bind-address=localhost 
gfsh>start server --name=serv2 --server-port=40405 --redis-port=2222 --redis-bind-address=localhost 

領域を記述するRedisStringsは、両方のサーバー上に作成されたことを私に示しています:次に

 
gfsh>describe region --name=/ReDiS_StRiNgS 
.......................................................... 
Name   : ReDiS_StRiNgS 
Data Policy  : partition 
Hosting Members : serv2 
        serv1 

Non-Default Attributes Shared By Hosting Members 

Type | Name  | Value 
------ | ----------- | --------- 
Region | size  | 0 
     | data-policy | PARTITION 

私のテストプログラムから、私は通じ使用してデータを挿入し、私のセットアップ私のGeodeクラスタそうは以下のようにするとき、私は問題はなかったですRedisの(Jedisクライアント)と、その後のGeodeからデータを正常に読み戻す:

public void putRedis() { 
    Jedis jedis = new Jedis("localhost", 1111); 
    for (int i=0; i<10; i++) { 
    jedis.set("foo"+i, "bar"+i); 
    } 
} 

public void readGeode() { 
    ClientCacheFactory clientCacheFactory = new ClientCacheFactory(); 
    clientCacheFactory.addPoolLocator("localhost", 10334); 
    ClientCache client = clientCacheFactory.create(); 
    Region redisStrings = client.createClientRegionFactory(ClientRegionShortcut.PROXY).create("ReDiS_StRiNgS"); 
    for (int i=0; i<10; i++) { 
    System.out.println(redisStrings.get(Coder.stringToByteArrayWrapper("foo"+i))); 
    } 
} 
+0

本当にありがとうございました。私は他のサーバーを停止し、それ以上のエラーはありません。私は、各サーバーが多くの場合、赤いアダプターをホストしていることを意味しています。 – Crystark

+0

50%の問題は修正されているようですが、今私は他のプレブレムに直面しているようです。ジオードクライアントを使用してキーを強制的に取得しない限り、イベント通知も取得もしません。 – Crystark

2

私はへの関心を登録しなかったので、私はイベントをして受信していないとしていた問題私が興味を持ったキー

region.registerInterestRegex(".*");を追加すると、すべてのキーに関心を示すことができます。 ClientCache.setPoolSubscriptionEnabled(true)を設定する必要があります。

ClientCache cache = new ClientCacheFactory() 
     .addPoolLocator(HOST, LOCATOR_PORT) 
     .setPoolSubscriptionEnabled(true) 
     .create(); 

    // ... 

    Region<ByteArrayWrapper, ByteArrayWrapper> region = cache 
     .<ByteArrayWrapper, ByteArrayWrapper> createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY) 
     .addCacheListener(cl) 
     .setKeyConstraint(ByteArrayWrapper.class) 
     .setValueConstraint(ByteArrayWrapper.class) 
     .create(GeodeRedisServer.STRING_REGION); 

    region.registerInterestRegex(".*"); 

    // ... 
+0

私は[同じ問題があった](https://stackoverflow.com/questions/45286249/apache-geode-cachelistener-not-executing) – rupweb

関連する問題