シンプルなフォームにコードを縮小したような感じです(下記参照)。これは、弱い参照ターゲットをあまりにも早く収集させる原因となった問題は、自分のコードのどこかに存在しなければならないということです。だから、私自身の質問に答えるために、弱参照は複数のスレッドから安全にアクセスできるように見えます。ここで
は私のテストコードです:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
namespace Test
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Starting the app");
Test test = new Test();
// uncomment these lines to cause automatic unsubscription from Message1
// test = null;
// GC.Collect();
// GC.WaitForPendingFinalizers();
// publish Message1 on this thread
// MessageBus.Publish<Message1>(new Message1());
// publish Message1 on another thread
ThreadPool.QueueUserWorkItem(delegate
{
MessageBus.Publish<Message1>(new Message1());
});
while (!MessageBus.IamDone)
{
Thread.Sleep(100);
}
Console.WriteLine("Exiting the app");
Console.WriteLine("Press <ENTER> to terminate program.");
Console.WriteLine();
Console.ReadLine();
}
}
public class Test
{
public Test()
{
Console.WriteLine("Subscribing to message 1.");
MessageBus.Subscribe<Message1>(OnMessage1);
Console.WriteLine("Subscribing to message 2.");
MessageBus.Subscribe<Message2>(OnMessage2);
}
public void OnMessage1(Message1 message)
{
Console.WriteLine("Got message 1. Publishing message 2");
MessageBus.Publish<Message2>(new Message2());
}
public void OnMessage2(Message2 message)
{
Console.WriteLine("Got message 2. Closing the app");
MessageBus.IamDone = true;
}
}
public abstract class MessageBase
{
public string Message;
}
public class Message1 : MessageBase
{
}
public class Message2 : MessageBase
{
}
public static class MessageBus
{
// This is here purely for this test
public static bool IamDone = false;
/////////////////////////////////////
/// <summary>
/// A dictionary of lists of handlers of messages by message type
/// </summary>
private static ConcurrentDictionary<string, List<WeakReference>> handlersDict = new ConcurrentDictionary<string, List<WeakReference>>();
/// <summary>
/// Thread synchronization object to use with Publish calls
/// </summary>
private static object _lockPublishing = new object();
/// <summary>
/// Thread synchronization object to use with Subscribe calls
/// </summary>
private static object _lockSubscribing = new object();
/// <summary>
/// Creates a work queue item that encapsulates the provided parameterized message
/// and dispatches it.
/// </summary>
/// <typeparam name="TMessage">Message argument type</typeparam>
/// <param name="message">Message argument</param>
public static void Publish<TMessage>(TMessage message)
where TMessage : MessageBase
{
// create the dictionary key
string key = String.Empty;
key = typeof(TMessage).ToString();
// initialize a queue work item argument as a tuple of the dictionary type key and the message argument
Tuple<string, TMessage, Exception> argument = new Tuple<string, TMessage, Exception>(key, message, null);
// push the message on the worker queue
ThreadPool.QueueUserWorkItem(new WaitCallback(_PublishMessage<TMessage>), argument);
}
/// <summary>
/// Publishes a message to the bus, causing observers to be invoked if appropriate.
/// </summary>
/// <typeparam name="TArg">Message argument type</typeparam>
/// <param name="stateInfo">Queue work item argument</param>
private static void _PublishMessage<TArg>(Object stateInfo)
where TArg : class
{
try
{
// translate the queue work item argument to extract the message type info and
// any arguments
Tuple<string, TArg, Exception> arg = (Tuple<string, TArg, Exception>)stateInfo;
// call all observers that have registered to receive this message type in parallel
Parallel.ForEach(handlersDict.Keys
// find the right dictionary list entry by message type identifier
.Where(handlerKey => handlerKey == arg.Item1)
// dereference the list entry by message type identifier to get a reference to the observer
.Select(handlerKey => handlersDict[handlerKey]), (handlerList, state) =>
{
lock (_lockPublishing)
{
List<int> descopedRefIndexes = new List<int>(handlerList.Count);
// search the list of references and invoke registered observers
foreach (WeakReference weakRef in handlerList)
{
// try to obtain a strong reference to the target
Delegate dlgRef = (weakRef.Target as Delegate);
// check if the underlying delegate reference is still valid
if (dlgRef != null)
{
// yes it is, get the delegate reference via Target property, convert it to Action and invoke the observer
try
{
(dlgRef as Action<TArg>).Invoke(arg.Item2);
}
catch (Exception e)
{
// trouble invoking the target observer's reference, mark it for deletion
descopedRefIndexes.Add(handlerList.IndexOf(weakRef));
Console.WriteLine(String.Format("Error looking up target reference: {0}", e.Message));
}
}
else
{
// the target observer's reference has been descoped, mark it for deletion
descopedRefIndexes.Add(handlerList.IndexOf(weakRef));
Console.WriteLine(String.Format("Message type \"{0}\" has been unsubscribed from.", arg.Item1));
MessageBus.IamDone = true;
}
}
// remove any descoped references
descopedRefIndexes.ForEach(index => handlerList.RemoveAt(index));
}
});
}
// catch all Exceptions
catch (AggregateException e)
{
Console.WriteLine(String.Format("Error dispatching messages: {0}", e.Message));
}
}
/// <summary>
/// Subscribes the specified delegate to handle messages of type TMessage
/// </summary>
/// <typeparam name="TArg">Message argument type</typeparam>
/// <param name="action">WeakReference that represents the handler for this message type to be registered with the bus</param>
public static void Subscribe<TArg>(Action<TArg> action)
where TArg : class
{
// validate input
if (action == null)
throw new ArgumentNullException(String.Format("Error subscribing to message type \"{0}\": Specified action reference is null.", typeof(TArg)));
// build the queue work item key identifier
string key = typeof(TArg).ToString();
// check if a message of this type was already added to the bus
if (!handlersDict.ContainsKey(key))
{
// no, it was not, create a new dictionary entry and add the new observer's reference to it
List<WeakReference> newHandlerList = new List<WeakReference>();
handlersDict.TryAdd(key, newHandlerList);
}
lock (_lockSubscribing)
{
// append this new observer's reference to the list, if it does not exist already
if (!handlersDict[key].Any(existing => (existing.Target as Delegate) != null && (existing.Target as Delegate).Equals(action)))
{
// append the new reference
handlersDict[key].Add(new WeakReference(action, true));
}
}
}
}
}
あなたの質問は? (質問の形でそれを明記してください) – Kiril
また、私たちはここでどの言語/プラットフォームを話していますか? Java? C#? –
ご意見ありがとうございます。私はVS2010で.NET 4.0でC#を使用しています。問題はこれです:適切なスレッドの同期が適切であれば、複数のスレッドからWeakReferenceオブジェクトに確実にアクセスできないはずですか?私には分かりませんが、私には分かりません。それで、私は何を正しくしていないのですか? – user1144037