2017-12-19 23 views
0

私はバックエンド永続ストアとしてHBaseを使用してIgniteキャッシュストアを実装しました。キャッシュストアのコードは次のとおりです。Ignite Cache Store - リソースを解放する方法

public class BitDataCachePersistentStore implements CacheStore<Long, byte[]> { 

@IgniteInstanceResource 
Ignite gridReference; 

@CacheNameResource 
private String cacheName; 

@Override 
public byte[] load(Long key) { 

    String hbaseKey; 

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) { 

     try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) { 

      hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString()); 

      Get rowToBeFetched = new Get(Bytes.toBytes(hbaseKey)); 

      Result rowFetched = bitDataPersistentTable.get(rowToBeFetched); 

      if (rowFetched == null || rowFetched.isEmpty()) { 
       return null; // Can't return an empty array as Ignite will 
           // load the entry 
      } 

      return rowFetched.getValue(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES, 
        TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES); 

     } 

    } catch (IOException e) { 
     throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_READ_ERROR, e, 
       "Error while performing read operation for the key [ " + key + " ] of the cache [ " + cacheName 
         + " ] "); 
    } 

} 

@Override 
public Map<Long, byte[]> loadAll(Iterable<? extends Long> keys) { 

    String hbaseKey; 

    long startTime = System.currentTimeMillis(); 

    long numberOfKeysLoaded = 0l; 

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) { 

     try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) { 

      IgniteCache<Long, byte[]> cacheToBeLoaded = gridReference.cache(cacheName); 

      Get rowToBeFetched; 

      Result rowFetched; 

      for (Long key : keys) { 

       hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString()); 

       rowToBeFetched = new Get(Bytes.toBytes(hbaseKey)); 

       rowFetched = bitDataPersistentTable.get(rowToBeFetched); 

       cacheToBeLoaded.put(key, 
         rowFetched.getValue(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES, 
           TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES)); 

       numberOfKeysLoaded++; 

      } 

      System.out.println("LoadAll for [ " + numberOfKeysLoaded + " ] keys of the cache [ " + cacheName 
        + " ] took [ " + ((System.currentTimeMillis() - startTime)/1000.0) + " seconds ] "); 

      return null; 

     } 

    } catch (IOException e) { 
     throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_READ_ERROR, e, 
       "Error while reading multiple keys for the cache [ " + cacheName + " ] "); 
    } 

} 

@Override 
public void write(Entry<? extends Long, ? extends byte[]> entry) { 

    String hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, entry.getKey().toString()); 

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) { 

     try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) { 

      Put rowToBeWritten = new Put(Bytes.toBytes(hbaseKey)); 

      rowToBeWritten.addColumn(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES, 
        TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES, entry.getValue()); 

      bitDataPersistentTable.put(rowToBeWritten); 

     } 

    } catch (IOException e) { 
     throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_WRITE_ERROR, e, 
       "Error while writing the entry for the key [ " + entry.getKey() + " ] for the cache [ " + cacheName 
         + " ] "); 
    } 

} 

@Override 
public void writeAll(Collection<Entry<? extends Long, ? extends byte[]>> entries) { 

    long startTime = System.currentTimeMillis(); 

    String hbaseKey; 

    List<Put> rowsToBeWritten = new ArrayList<>(); 

    Put currentRowToBeWritten; 

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) { 

     try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) { 

      for (Entry<? extends Long, ? extends byte[]> entryToBeInserted : entries) { 

       hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, 
         entryToBeInserted.getKey().toString()); 

       currentRowToBeWritten = new Put(hbaseKey.getBytes()); 

       currentRowToBeWritten.addColumn(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES, 
         TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES, 
         entryToBeInserted.getValue()); 

       rowsToBeWritten.add(currentRowToBeWritten); 

      } 

      bitDataPersistentTable.put(rowsToBeWritten); 

     } 

     System.out.println("Time taken to load [ " + entries.size() + " entries ] for the cache [ " + cacheName 
       + " ] is " + ((System.currentTimeMillis() - startTime)/1000.0) + " seconds"); 

    } catch (IOException e) { 
     throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_WRITE_ERROR, e, 
       "Error while writing multiple keys for the cache [ " + cacheName + " ] "); 
    } 

} 

@Override 
public void delete(Object key) { 

    String hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString()); 

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) { 

     try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) { 

      Delete rowToBeDeleted = new Delete(Bytes.toBytes(hbaseKey)); 

      bitDataPersistentTable.delete(rowToBeDeleted); 

     } 

    } catch (IOException e) { 
     throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_REMOVAL_ERROR, e, 
       "Error while deleting the entry for the key [ " + hbaseKey + " ] for the cache [ " + cacheName 
         + " ] "); 
    } 

} 

@Override 
public void deleteAll(Collection<?> keys) { 

    String hbaseKey; 

    List<Delete> rowsToBeDeleted = new ArrayList<>(); 

    try (Connection con = HBaseConnectionUtil.getHbaseConnection()) { 

     try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) { 

      for (Object keyToBeDeleted : keys) { 

       hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, 
         keyToBeDeleted.toString()); 

       rowsToBeDeleted.add(new Delete(hbaseKey.getBytes())); 

      } 

      bitDataPersistentTable.delete(rowsToBeDeleted); 

     } 

    } catch (IOException e) { 
     throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_REMOVAL_ERROR, e, 
       "Error while deleting entries for the cache [ " + cacheName + " ] "); 
    } 

} 

@Override 
public void loadCache(IgniteBiInClosure<Long, byte[]> clo, Object... args) { 
    // No implementation provided 
} 

@Override 
public void sessionEnd(boolean commit) { 
    // No implementation provided 
} 

} 

キャッシュモードはPARTITIONEDです。

キャッシュアトミックモードはATOMICです。

私は実装されたメソッドのそれぞれにHBaseへの新しい接続を開始していることが店舗実装から明らかです。

データソース固有のリソース(この場合は、HBase接続)をすべてのメソッド呼び出しで実行するのではなく、よりマクロレベルでオープンしたりクローズしたりする制御が必要なのかどうかを知りたかったのです。

+0

おそらく接続のプール – GurV

答えて

2

店舗内で接続プールを使用する必要があります。 c3p0をチェックしてください。