私はCassandraには比較的新しいので、Executor Poolを介して実行されるプリペアドステートメントを使用してデータを取得しようとしています。私が受け取っているデータが一貫していないように見えます。Cassandraがスレッドプールを介してデータを取得する
私はこのuser_connectionsテーブルを持っています。ここで、user_idは行キー、friends_idリストはセットカラムです。 私はこの別のテーブル、friends_infoテーブルを持っています。ここでfriend_idは行キーで、他のすべての情報はカラムです。
ユーザーAAのフレンドリストを取得しようとしているときに、フレンドリストBBB、CCC、DDDを取得しています。どちらが完璧です。
プリペアドステートメントを使用して、実行プログラムプール経由でBBB、CCC、DDDを取得しようとしています。データに矛盾があります。すべての3つのレコードがBBBで、時には3つのレコードがすべてBBBで、1つがCCCなどです。
私が使用しているメソッドと関連するクラスを提供しました。これで私。準備されたステートメントはスレッドセーフであり、期待どおりに動作することが期待されています。
public Set<User> listUserConnections(String userId) {
Session session = client.getSession();
Set<String> listUserConnectionIds = listUserConnections(userId, session);
if (listUserConnectionIds.size() == 0)
return new HashSet<User>();
Set<User> listConnectionUserDetails = retrieveUserConnectionProfileInfo(
listUserConnectionIds, session);
return listConnectionUserDetails;
}
private Set<User> retrieveUserConnectionProfileInfo(Set<String> listUserConnectionIds,
Session session) {
Set<Callable<User>> callables = new HashSet<Callable<User>>();
for(String key:listUserConnectionIds){
logger.info("about to add callable" + key);
Callable<User> callable = new QueryTask(key);
callables.add(callable);
}
// invoke in parallel
List<Future<User>> futures;
Set<User> users = new HashSet<User>();
// TODO Revisit this
ExecutorService executorPool = Executors.newFixedThreadPool(100);
try {
futures = executorPool.invokeAll(callables);
for (Future<User> f : futures) {
User user = f.get();
users.add(user);
logger.info("User from future"+user.getUserId());
}
} catch (ExecutionException e) {
logger.error("Error in retrieving the stores in parallel ", e);
} catch (InterruptedException e) {
logger.error("Error in retrieving the stores in parallel as it was interrupted ", e);
} finally {
executorPool.shutdown();
}
return users;
}
// Executorのプールクラス
class QueryTask implements Callable<User> {
private String userName;
// final PreparedStatement statement =
// client.getSession().prepare(QueryConstants.GET_ALL_STORE_BRANDS);
QueryTask(String name) {
this.userName = name;
}
@Override
public User call() throws Exception {
// -------------I am seeing the userName is correct------------- for example BBB
logger.info("inside call processing queries for " + userName);
//------------This is a prepared statement, userPreparedStatement.getbStUserInfo()
BoundStatement bStUserInfo = userPreparedStatement.getbStUserInfo();
bStUserInfo.bind(userName);
Session session = client.getSession();
ResultSet rs = session.execute(bStUserInfo);
User user = new User();
Iterator<Row> rowIterator = rs.iterator();
while (rowIterator.hasNext())
{
Row row = rowIterator.next();
//-------This user id is not right
logger.info("Inside the callable after retrieval"+row.getString(TableConstants.Users.COLUMN_NAME_USER_ID));
user.setUserId(row.getString(TableConstants.Users.COLUMN_NAME_USER_ID));
return user;
}
logger.info("outside the while loop");
return user;
}
}
あなたの 'BoundStatement'インスタンスがどこに' new'しているのかわかりません。 – Ralf
補足として、ドライバはすでに非同期APIを提供しているので、ほとんどすべてのQueryTask/Executorコードを取り除き、テストに集中することができます。 –