2017-01-19 17 views
2

ThreadPoolExecutor内のスレッド間でタスクを分散する方法、 私は問題以下している

は、私はタスクのキューを持っているなど、タスクの種類がたくさんあります。

A, B, C, D, ... 

私は、これらのタスクのスレッドで実行しますプール。

しかし、私はそれゆえ、同じ時間に同じタイプのタスクの実行を制限する必要があり、これは悪いです:タイプAとBの

Thread-1: [A, D, C, B, ...] 
Thread-2: [A, C, D, B, ...] 

のタスクを同時に実行することができます。

しかし、これは良いですが:

Thread-1: [A,B,A,B,...] 
Thread-2: [C,D,D,C,...] 

したがって、同じタイプのタスクは常に順次実行されています。

この機能を実装する最も簡単な方法は何ですか?上記の何が起こっている

+0

あなたの質問はかなり曖昧ですが、わかりません。しかし、2つの別々のスレッドプールを使うことだけを考えてみることもできます。 – Jamie

+0

@Jamieこれは非常に不器用な解決策です。 – corvax

+0

これもOKでしょうか?スレッド-1 [A1、A2、B1、B2]? – john16384

答えて

0
CompletableFuture.supplyAsync(this::doTaskA) 
       .thenAccept(this::useResultFromTaskAinTaskB); 

は、タスクAと関連タスクBが実際に同じスレッド(次々、タスクBの実行を開始するために、新しいスレッドを「取得」する必要はありません)で実行されていることです。あなたはそれからどんな情報を必要としませんが、それはデフォルトでタスクB.

を実行する前に完了するのを待つ必要がない場合は、タスクAのためrunAsyncを使用することができます

あるいは、CompletableFutureのは、共通のスレッドを使用します。 ThreadPoolの使用をより詳細に制御したい場合は、独自のThreadPoolを使用する独自のExecutorを使用して、asyncメソッドに2番目の引数を渡すことができます。

1

この問題は、Akkaのようなアクターフレームワークで簡単に解決できます。

各種類のタスク。アクターを作成します。

それぞれのタスクごとに、メッセージを作成し、対応するタイプのアクタに送信します。メッセージのタイプはRunnableである可能性があります。俳優の反応方法は @Override public void onReceive(Object msg) { ((Runnable)msg).run(); }

です。この方法で、プログラムは任意の数のスレッドに対して正しく実行されます。

+0

はい、これはAkkaによって簡単に解決できますが、この問題のみを解決するためにakkaを統合するのは大きなオーバーヘッドだと私は考えています。 – corvax

1

私はあなた自身のDistributedThreadPoolを実装してスレッドを制御できると思います。これは、ある種のトピックのサブスクライバ/パブリッシャ構造のようなものです。

class DistributeThreadPool { 

Map<String, TypeThread> TypeCenter = new HashMap<String, TypeThread>(); 

public void execute(Worker command) { 
    TypeCenter.get(command.type).accept(command); 
} 

class TypeThread implements Runnable{ 

    Thread t = null; 
    LinkedBlockingDeque<Runnable> lbq = null; 

    public TypeThread() { 
     lbq = new LinkedBlockingDeque<Runnable>(); 
    } 

    public void accept(Runnable inRun) { 
     lbq.add(inRun); 
    } 

    public void start() { 
     t = new Thread(this); 
     t.start(); 
    } 


    @Override 
    public void run() { 
     while (!Thread.interrupted()) { 
      try { 
       lbq.take().run(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 
} 

public DistributeThreadPool(String[] Types) { 
    for (String t : Types) { 
     TypeThread thread = new TypeThread(); 
     TypeCenter.put(t, thread); 
     thread.start(); 
    } 
} 

public static void main(String [] args) { 
     DistributeThreadPool dtp = new DistributeThreadPool(new String[] {"AB","CD"}); 

     Worker w1 = new Worker("AB",()->System.out.println(Thread.currentThread().getName() +"AB")); 
     Worker w2 = new Worker("AB",()->System.out.println(Thread.currentThread().getName() +"AB")); 
     Worker w3 = new Worker("CD",()->System.out.println(Thread.currentThread().getName() +"CD")); 
     Worker w4 = new Worker("CD",()->System.out.println(Thread.currentThread().getName() +"CD")); 
     Worker w5 = new Worker("CD",()->System.out.println(Thread.currentThread().getName() +"CD")); 

     List<Worker> workers = new ArrayList<Worker>(); 
     workers.add(w1); 
     workers.add(w2); 
     workers.add(w3); 
     workers.add(w4); 
     workers.add(w5); 

     workers.forEach(e->dtp.execute(e)); 
    } 
} 
+0

1000種類のタスクがある場合はどうなりますか? – Anton

+0

同時に1000種類のタスクを順番に実行したいと思っていますか? @Anton – Chuck

0

興味深い問題:

は、私は次のように例をしました。

いくつの種類のタスクがありますか?

ほとんどの場合、最も簡単な方法は、各タイプごとに1つのスレッドを作成し、各着信タスクをその種類のスレッドに割り当てることです。タスクがタイプ間でバランスが取れている限り(それは大きな前提です)、利用率は十分に高くなります。

タスクの完了に期待される適時性/遅延時間はどのくらいですか?

はあなたの問題が適時に柔軟である場合、あなたは数または時間間隔によって、各種類のバッチの着信タスクは、あなたがプールに引退各バッチを提出することができ、その後、同じ種類の別を提出するバッチの完了を待ちます。

バッチサイズを1に小さくすることができます。この場合、完成を待つメカニズムが効率化のために重要になります。 CompletableFutureは、ここに法案を収めるだろう。あなたはthenRunAsyncでタスクに「タイプAの次のタスクをポーリングしてプールに送信」アクションを連鎖させ、そのタスクを忘れて忘れることができます。

タスクタイプごとに1つの外部タスクキューを維持する必要があります。 FJプールの作業キューは進行中のタスクのみになります。それでも、この設計は、タスク数とタイプごとの作業負荷の不均衡を合理的に処理する良い機会です。

これが役に立ちます。

0

鍵付き実行プログラムを実装します。各タスクにはキーが必要です。同じキーを持つタスクはキューに入れられ、連続して実行され、異なるキーを持つタスクが並行して実行されます。

Implementation in netty

あなたはそれを自分で作ってみることができますが、それは難しいとエラーが発生しやすくなります。私はそこに提案された答えにはほとんどバグを見ることができません。

関連する問題