2013-02-11 11 views
11

ジョブを処理するワーカースレッドを持つJavaアプリケーションがあります。労働者は、結果オブジェクトを生成し、のようなものを言う:あるスレッドから別のスレッドにコレクションオブジェクトをJavaスレッドで安全に渡す

class WorkerResult{ 
    private final Set<ResultItems> items; 
    public Worker(Set<ResultItems> pItems){ 
     items = pItems; 
    } 
} 

作業員が終了すると、それはこの操作を行います。

... 
final Set<ResultItems> items = new SomeNonThreadSafeSetImplSet<ResultItems>(); 
for(Item producedItem : ...){ 
     items.add(item); 
} 
passToGatherThread(items); 

itemsセットは「作業単位」ここの一種であります。 passToGatherThreadメソッドは、itemsセットを収集スレッドに渡します。そのうちの1つのみが実行時に存在します。

ここで同期化は必要ありません。これは、1つのスレッド(ギャザースレッド)のみがitemsのセットを読み取るため、競合状態が発生しないためです。 AFAICSでは、ギャザースレッドは、スレッドセーフではないため、すべてのアイテムを表示しない場合があります。

passToGatherThreadは、サードパーティのライブラリであるため、同期させることはできません。私は基本的に恐れているのは、キャッシング、VMの最適化などのために、ギャザースレッドがすべてのアイテムを見ることができないということです。ここでは、スレッドセーフな方法でアイテムを渡す方法です。ギャザースレッドは、適切な項目のセット?

+0

[PipedStreams](http://docs.oracle.com/javase/6/docs/api/java/io/PipedInputStream.html)がお手伝いできますか? – oliholz

+1

'passToGatherthread'の定義は何ですか?私はこれが以下の答えが正しいかどうかを理解する上で重要だと思います。 'items'はどのくらい正確にギャザースレッドに渡されますか? –

+0

オブジェクトの可視性に本当に問題がありますか、またはそのような動作しか疑うことはありませんか?そのような状況では、あなたの懸念はかなり遠いものに見えます。 – Dariusz

答えて

1

私が考えた(と議論)しているこの質問に多く、私は、私は願って、最適なソリューションとなり、別の答え、が出ています。

同期コレクションを渡すことは、コレクションの後続の各操作が同期されるため、効率的ではありません。多くの操作がある場合は、ハンディキャップである可能性があります。ポイントへの

:さんは(私は同意しない)いくつかの仮定をしてみましょう:

  • 言及passToGatherThread方法は確かに安全ではありません、しかし、ありそうもないことが
  • コンパイラは、コード内のイベントを並べ替えることができそうですコレクションは

採集メソッドに渡されたコレクションの準備ができていることを保証するための最も簡単な、クリーンそしておそらく最も効率的な方法を満たし、完全をtされる前passToGatherThreadが呼び出されるようにコレクションが渡される前に、我々はこのようにすべてのオブジェクトが正しく渡されていることを確認して、メモリの同期化と有効起こる-前にシーケンスを確保するその方法

synchronized(items) { 
    passToGatherThread(items); 
} 

:Oこのように、同期ブロックに収集プッシュを置きます。

+1

私はあなたが正しいと思うし、最終的にはこれがまさに私がこの問題に遭遇した方法です。それは少し醜いように見えますか?ほとんどの人には単に愚かに見える新しく作成されたセットを同期する;-)あなたの答えをありがとう。 – xSNRG

1

ここで同期の問題はないようです。 passToGatherThreadごとに新しいSetオブジェクトを作成し、セットを変更した後に行います。オブジェクトは失われません。

セット(およびほとんどのJavaコレクション)は、コレクションに変更を加えない限り、多数のスレッドによって同時にアクセスできます。それはCollections.unmodifiableCollectionのためです。

この方法は他のスレッドと通信するため、ある種の同期を使用しなければならず、各同期によってスレッド間のメモリの一貫性が保証されます。

また、渡されたコレクション内のオブジェクトへのすべての書き込みは、の前に行われ、の前に行われます。他のスレッドに渡されます。メモリがスレッドのローカルキャッシュにコピーされても、他のスレッドと同じ変更されていない値を持ちます。

+1

AFAIK JVMは、プロセッサコアレジスタに可能な限りデータをキャッシュすることができます。 "起こった"セマンティクスはデータを生成するスレッドでのみ有効であるため、他のスレッドは "失効"値を参照することがあります。 – xSNRG

+0

@ xSNRGは段落を追加しました。パスメソッドが有効であれば、心配する必要はありません。 – Dariusz

+0

さらに、これがうまくいかない場合は、大量のアプリケーションがうまくいきません。 – Dariusz

1

WorkerResultにJavaが提供するSetのスレッドセーフ実装のいずれかを使用できます。例えば参照:

別のオプションはCollections.synchronizedSet()を使用することです。

+0

これはなぜ必要なのですか?スレッドセーフでない 'Set'実装を使用すると何が問題になりますか? –

+0

私はOPがキャッシングを恐れていると思います。スレッドは非揮発性のデータをキャッシュすることがあり、書き込みが同期化されていない場合、更新を参照することはありません。 – joergl

+1

はいjoerglは絶対に正しいです。 Dariusz Wawerの答えの下に私のコメントを見てください。 – xSNRG

0

労働者が呼び出し可能な実装とWorkerResultを返します。

class Worker implements Callable<WorkerResult> { 
    private WorkerInput in; 

    public Worker(WorkerInput in) { 
     this.in = in; 
    } 

    public WorkerResult call() { 
     // do work here 
    } 
} 

その後、我々は、スレッドプールを管理するためにExecutorServiceのを使用して、将来を使用して経由で結果を収集。

public class PooledWorkerController { 

    private static final int MAX_THREAD_POOL = 3; 
    private final ExecutorService pool = 
     Executors.newFixedThreadPool(MAX_THREAD_POOL); 

    public Set<ResultItems> process(List<WorkerInput> inputs) 
      throws InterruptedException, ExecutionException{   
     List<Future<WorkerResult>> submitted = new ArrayList<>(); 
     for (WorkerInput in : inputs) { 
      Future<WorkerResult> future = pool.submit(new Worker(in)); 
      submitted.add(future); 
     } 
     Set<ResultItems> results = new HashSet<>(); 
     for (Future<WorkerResult> future : submitted) { 
      results.addAll(future.get().getItems()); 
     } 
     return results; 
    } 
} 
+0

ジョブを開始するたびにスレッドを作成します。それに対して-1。 – Dariusz

+0

あなたの投稿には、一連のジョブが呼び出されるたびにスレッドを作成するという悪い習慣が導入されました。スレッドの作成は非常に高価であり、可能な限り、スレッドを一度作成して再利用する必要があります。あなたのコードは、 'process'で使用される単一の静的に初期化されたスレッドプールを持っていれば、はるかに優れたIMOになります。 'process'呼び出しごとに新しいスレッドプールを作成することはあまり変わりません。 – Dariusz

関連する問題