0
私はバックエンド永続ストアとしてHBaseを使用してIgniteキャッシュストアを実装しました。キャッシュストアのコードは次のとおりです。Ignite Cache Store - リソースを解放する方法
public class BitDataCachePersistentStore implements CacheStore<Long, byte[]> {
@IgniteInstanceResource
Ignite gridReference;
@CacheNameResource
private String cacheName;
@Override
public byte[] load(Long key) {
String hbaseKey;
try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString());
Get rowToBeFetched = new Get(Bytes.toBytes(hbaseKey));
Result rowFetched = bitDataPersistentTable.get(rowToBeFetched);
if (rowFetched == null || rowFetched.isEmpty()) {
return null; // Can't return an empty array as Ignite will
// load the entry
}
return rowFetched.getValue(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES);
}
} catch (IOException e) {
throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_READ_ERROR, e,
"Error while performing read operation for the key [ " + key + " ] of the cache [ " + cacheName
+ " ] ");
}
}
@Override
public Map<Long, byte[]> loadAll(Iterable<? extends Long> keys) {
String hbaseKey;
long startTime = System.currentTimeMillis();
long numberOfKeysLoaded = 0l;
try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
IgniteCache<Long, byte[]> cacheToBeLoaded = gridReference.cache(cacheName);
Get rowToBeFetched;
Result rowFetched;
for (Long key : keys) {
hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString());
rowToBeFetched = new Get(Bytes.toBytes(hbaseKey));
rowFetched = bitDataPersistentTable.get(rowToBeFetched);
cacheToBeLoaded.put(key,
rowFetched.getValue(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES));
numberOfKeysLoaded++;
}
System.out.println("LoadAll for [ " + numberOfKeysLoaded + " ] keys of the cache [ " + cacheName
+ " ] took [ " + ((System.currentTimeMillis() - startTime)/1000.0) + " seconds ] ");
return null;
}
} catch (IOException e) {
throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_READ_ERROR, e,
"Error while reading multiple keys for the cache [ " + cacheName + " ] ");
}
}
@Override
public void write(Entry<? extends Long, ? extends byte[]> entry) {
String hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, entry.getKey().toString());
try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
Put rowToBeWritten = new Put(Bytes.toBytes(hbaseKey));
rowToBeWritten.addColumn(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES, entry.getValue());
bitDataPersistentTable.put(rowToBeWritten);
}
} catch (IOException e) {
throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_WRITE_ERROR, e,
"Error while writing the entry for the key [ " + entry.getKey() + " ] for the cache [ " + cacheName
+ " ] ");
}
}
@Override
public void writeAll(Collection<Entry<? extends Long, ? extends byte[]>> entries) {
long startTime = System.currentTimeMillis();
String hbaseKey;
List<Put> rowsToBeWritten = new ArrayList<>();
Put currentRowToBeWritten;
try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
for (Entry<? extends Long, ? extends byte[]> entryToBeInserted : entries) {
hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName,
entryToBeInserted.getKey().toString());
currentRowToBeWritten = new Put(hbaseKey.getBytes());
currentRowToBeWritten.addColumn(TagDuplicateConstants.BIT_DATA_COLUMN_FAMILY_NAME_AS_BYTES,
TagDuplicateConstants.BIT_DATA_COLUMN_QUALIFIER_NAME_AS_BYTES,
entryToBeInserted.getValue());
rowsToBeWritten.add(currentRowToBeWritten);
}
bitDataPersistentTable.put(rowsToBeWritten);
}
System.out.println("Time taken to load [ " + entries.size() + " entries ] for the cache [ " + cacheName
+ " ] is " + ((System.currentTimeMillis() - startTime)/1000.0) + " seconds");
} catch (IOException e) {
throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_WRITE_ERROR, e,
"Error while writing multiple keys for the cache [ " + cacheName + " ] ");
}
}
@Override
public void delete(Object key) {
String hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName, key.toString());
try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
Delete rowToBeDeleted = new Delete(Bytes.toBytes(hbaseKey));
bitDataPersistentTable.delete(rowToBeDeleted);
}
} catch (IOException e) {
throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_REMOVAL_ERROR, e,
"Error while deleting the entry for the key [ " + hbaseKey + " ] for the cache [ " + cacheName
+ " ] ");
}
}
@Override
public void deleteAll(Collection<?> keys) {
String hbaseKey;
List<Delete> rowsToBeDeleted = new ArrayList<>();
try (Connection con = HBaseConnectionUtil.getHbaseConnection()) {
try (Table bitDataPersistentTable = con.getTable(TagDuplicateConstants.BIT_DATA_TABLE_NAME)) {
for (Object keyToBeDeleted : keys) {
hbaseKey = TagDuplicateUtil.getPersistentStoreKeyForBitDataCache(cacheName,
keyToBeDeleted.toString());
rowsToBeDeleted.add(new Delete(hbaseKey.getBytes()));
}
bitDataPersistentTable.delete(rowsToBeDeleted);
}
} catch (IOException e) {
throw new ROCException(TagDuplicateErrorCodes.CACHE_ENTRY_REMOVAL_ERROR, e,
"Error while deleting entries for the cache [ " + cacheName + " ] ");
}
}
@Override
public void loadCache(IgniteBiInClosure<Long, byte[]> clo, Object... args) {
// No implementation provided
}
@Override
public void sessionEnd(boolean commit) {
// No implementation provided
}
}
キャッシュモードはPARTITIONEDです。
キャッシュアトミックモードはATOMICです。
私は実装されたメソッドのそれぞれにHBaseへの新しい接続を開始していることが店舗実装から明らかです。
データソース固有のリソース(この場合は、HBase接続)をすべてのメソッド呼び出しで実行するのではなく、よりマクロレベルでオープンしたりクローズしたりする制御が必要なのかどうかを知りたかったのです。
おそらく接続のプール – GurV