2016-05-29 13 views
0

SomeObjectをいくつかの外部システムに委譲するSomeObjectHandlerからhandle()メソッドを実装する必要があります(下記参照)。 SomeObjectは正しいhashCodeとequalsメソッドを持ちます。メソッドハンドル(SomeObject someObject)は、多くのスレッド(たとえば10)から呼び出すことができます。外部システムは同時にsomeObjectと等しくない場合もありますが、システムが等しいsomeObjectで同時に動作しようとすると破損します。 equal someObjectの同時処理を防ぐために、このクラスを実装する必要があります。 someObjectが等しい場合でも、それらのすべてが処理される必要があります。Javaの多くのスレッドからメッセージを送信する

今、私は、並行ライブラリからQueueのようなものを使用する必要があると思っていますが、どちらがわからないのですか。

UPD:標準のJavaライブラリのみを使用する必要があります。最大スループットを達成することが可能ならば。

答えて

0

私は完全にあなたの質問を得た場合私は100%ではないが、私はこの問題に近づくための複数の方法があると思う。あなたはすでにあなたがオブジェクトを挿入するためのキューを使用して、あなたが言ったように、あなたの外側のシステムが同時に同じオブジェクトを処理することができないので、外側のシステムが同期してオブジェクトを処理して確認することができます述べたように

1)。

2)送信者コード自体でこれを処理します。私はこれを複数回試しました。ここにコードスニペットがあります。このアプローチのプロは、等しいオブジェクトだけを同期させるということです。最後にブロックする際にも削除部分を処理するようにしてください。コードがきちんと整っていなければ、申し訳ありません。私はこれで新しいです:)

Map<SomeObject,Integer> objMap=new ConcurrentHashMap<SomeObject,Integer>(); 

public void handle(SomeObject someObject) { 
synchronized(this.class) 
{ 
Integer count=objMap.get(someObject); 
if(count==null) 
{ 
    count=0; 
} 
objMap.put(someObject,++count); 
} 

synchronized(objectMap.get(someObject) 
{ 
    outerSystem.process(someObject); 

    Integer count=objMap.get(someObject); 
    if(count>1) 
{ 
    objMap.put(someObject,--count); 
} 
else 
{ 
    objectMap.remove(someObject); 
} 
} 

} 
+0

あなたの試していただきありがとうございます、このようなものは仕事になる可能性があります。しかし、私はそれがより最適な解決策を持っていると思います。最大のスループットを達成するのは良いことです。我々は、すべてのアクションFO /ロックを同期しないだろう場合は、それを大幅にスループット – AskProgram

0

RxJavaはここで助けることができます。これは、特に、非同期変換を含む、必要なときにカバーの下の話をしているキューイングんデータの取り扱いストリームで非常に良いことだ(同期修飾子を経由してブロックせずに!)。あなたの問題を解決するために、私はこのようなものだろう:私が正しくあなたの問題を理解していれば、「不平等のオブジェクトが」parallellで処理することができる一方で

public class SomeHandler{ 

    private final OuterSystem outerSystem; 

    private final PublishSubject<SomeObject> subject; 

    public SomeHandler() { 
     subject 
      // handle calls from multiple threads (non-blocking) 
      .serialized() 
      // buffer in memory if not keeping up  
      .onBackpressureBuffer() 
      // use equals/hashCode to order messages (the queues you referred to) 
      .groupBy(x -> x) 
      .flatMap(g -> 
       g.doOnNext(x -> outerSystem.process(x)) 
       // process groups in parallel 
       .subscribeOn(Schedulers.computation())) 
      // do something if an error occurs 
      .doOnError(e -> e.printStackTrace()) 
      // start consuming data when arrives 
      .subscribe(); 
    } 

    public void handle(SomeObject someObject) { 
     subject.doOnNext(someObject); 
    } 
} 
+0

は、あなたの答えをありがとう、それは本当に仕事ができる減らすことができます。しかし、私は唯一のスタンダールJavaライブラリを使用する必要が – AskProgram

0

を、あなたは、「同じオブジェクト」のシリアル実行を保証する必要があります。これを実現する1つの方法は、N個のプロセッサを配置し、オブジェクトの決定論的特性に応じてワークロードを分散させることです。 2つのオブジェクトが等しい場合hasCode() modulo N各エグゼキュータは、単一のスレッドが含まれているNエグゼキュータ、上に負荷を分散するために使用することができるので、それらのハッシュコードは、等しくなければならないあなたのケースで

public class SomeHandler { 
    static int N = ...; 
    // Each executor is an Executors.newSingleThreadScheduledExecutor() 
    Executor[N] executors = ....; 
    OuterSystem system; 

    public void handle(SomeObject so) { 
     executors[so.hashCode() % N].execute(() -> system.process(so)); 
    } 
} 
+0

ナイスアプローチを(それは私のミスだ、私はそれについて書かなければなりませんでした)。 )経由で]であるべきで、OPは 'handle'が複数のスレッドから呼び出されたことを言ったように私は' SomeHandler'決勝でこれらすべてのフィールドを作ると思います。 –

関連する問題