特定のディレクトリからxmlファイルを読み込んでインポートするファイル処理サービスを作成しました。マルチスレッドlinq2sqlアプリケーションTransactionScopeの問題
サービスは、新しいファイルのファイルキューをポーリングし、linac2sqlをデータアクセスに使用する複数のワーカーを開始します。各ワーカースレッドには独自のデータコンテキストがあります。処理されている
ファイルには、いくつかの注文を含み、各注文は、私は、各ファイルの取り扱い周りのTransactionScopeを定義した複数のアドレス(顧客/契約/下請け)
が含まれています。この方法では、私は例外が発生したときに、ファイル全体が正しく処理されていること、またはファイル全体がロールバックされていることを確認したい:サービスの要件の
try
{
using (var tx = new TransactionScope(TransactionScopeOption.RequiresNew))
{
foreach (var order in orders)
{
HandleType1Order(order);
}
tx.Complete();
}
}
catch (SqlException ex)
{
if (ex.Number == SqlErrorNumbers.Deadlock)
{
throw new FileHandlerException("File Caused a Deadlock, retrying later", ex, true);
}
else
throw;
}
一つは、それがあるのは、作成または更新は、のアドレスを発見しましたxmlファイルそこで、アドレス管理を担当するアドレスサービスを作成しました。次のコードは、xmlインポートファイル()の各注文(メソッドHandleType1Order()
内)で実行され、ファイル全体のTransactionScopeの一部です)。
using (var tx = new TransactionScope())
{
address = GetAddressByReference(number);
if (address != null) //address is already known
{
Log.Debug("Found address {0} - {1}. Updating...", address.Code, address.Name);
UpdateAddress(address, name, number, isContractor, isSubContractor, isCustomer);
}
else
{
//address not known, so create it
Log.Debug("Address {0} not known, creating address", number);
address = CreateAddress(name, number, sourceSystemId, isContractor, isSubContractor,
isCustomer);
_addressRepository.Save(address);
}
_addressRepository.Flush();
tx.Complete();
}
私がここでやろうとしているのは、番号が一意であるアドレスを作成または更新することです。
メソッドGetAddressByReference(string number)
は、既知のアドレスを返します。アドレスが見つからない場合はnullを返します。
public virtual Address GetAddressByReference(string reference)
{
return _addressRepository.GetAll().SingleOrDefault(a=>a.Code==reference);
}
サービスを実行すると、同じ番号の複数のアドレスが作成されます。メソッドGetAddressByReference()
getは同時に呼び出され、2番目のスレッドが同じアドレス番号を持つメソッドを実行するときに既知のアドレスを返す必要がありますが、nullを返します。私のトランザクションの境界や分離レベルには何らかの問題がありますが、それを動作させることはできません。
誰かが私を正しい方向に向けることができますか?ヘルプは大変感謝しています!
p.s.デッドロックが発生したときにトランザクションがデッドロックされ、ロールバックが発生しても問題はありません。デッドロックが発生したときにファイルが再試行されます。
編集1スレッディングコード:
public void Work()
{
_isRunning = true;
while (true)
{
ImportFileTask task = _queue.Dequeue(); //dequeue blocks on empty queue
if (task == null)
break; //Shutdown worker when a null task is read from the queue
IFileImporter importer = null;
try
{
using (new LockFile(task.FilePath).Acquire()) //create a filelock to sync access accross all processes to the file
{
importer = _kernel.Resolve<IFileImporter>();
Log.DebugFormat("Processing file {0}", task.FilePath);
importer.Import(task.FilePath);
Log.DebugFormat("Done Processing file {0}", task.FilePath);
}
}
catch(Exception ex)
{
Log.Fatal(
"A Fatal exception occured while handling {0} --> {1}".FormatWith(task.FilePath, ex.Message), ex);
}
finally
{
if (importer != null)
_kernel.ReleaseComponent(importer);
}
}
_isRunning = false;
}
上記の方法は、私たちのワーカースレッドのすべてで実行されます。これは、Castle Windsorを使用して、一時的なライフスタイルを持つFileImporterを解決します(したがって、スレッド間で共有されません)。
各トランザクションは一つだけのSQLConnectionを使用していますので、トランザクションは分散トランザクションに昇格されていません。lockキーワードに関しては、同期を使用していますが、トランザクションを使用してデータベースアクセスの同期化を実現しようとしています(これは正しい方法です) – Rik