11

私は作業単位の流れを持っています。これは順次処理される "作業項目"と呼ばれます。私はマルチスレッドの仕事をして処理をスピードアップしたいと思います。完成した作業項目の順序が保存されるマルチスレッド実行

制約:これらの作業項目は処理中に特定の順序になりますが、順序は関係ありませんが、処理が終了するとその順序を復元する必要があります。

このような何か:

|.| 
    |.| 
    |4| 
    |3| 
    |2| <- incoming queue 
    |1| 
/| \ 
2 1 3 <- worker threads 
    \ |/
    |3| 
    |2| <- outgoing queue 
    |1| 

私は(、好ましくはエグゼキュータサービスずに、Javaでこの問題を解決したいと先物などが、待ち時間などの基本的な並行処理の方法と()、通知します)、など

理由:私のワークアイテムは非常に小さく細かく、約0.2ミリ秒で処理が完了します。ですから、私はjava.util.concurrent。*のものを使うことを恐れています。*オーバーヘッドが大きくなり、コードが遅くなる可能性があります。

私が今までに見つけた例は、すべて処理中に順序を保持しています(これは私の場合は関係ありません)。処理後の順序は気にしませんでした。

+0

どのように:スレッドAは、着信キューからアイテムを取り出し、ワーカースレッドを作成し、開始し、キューにプッシュします。スレッドBは、そのキューからスレッドを1つずつ取り出し、それぞれのスレッドが完了するのを待ってから、結果を送信キューにプッシュします。それはあなたの制約を満たすものですか? – RealSkeptic

+2

ここでの解決法のベストはすべて、java.util.concurrentの少なくともいくつかの要素を使用しています。理由があります。オーバーヘッドに基づいて 'java.util.concurrent'を拒否する前に、あなた自身またはこれらのソリューションの1つを使って、パフォーマンスを許容するかどうかを調べることをお勧めします。早すぎる最適化を避け、簡単な方法で最初に十分速く動作するかどうかを確認してください。 –

+1

@sparc_spreadをエコーすると、並行ライブラリが思うように悪くないことがあります。実際のオーバーヘッドはスレッドの作成です。スレッドプールが固定されているため、オーバーヘッドは作業アイテムの数に対して一定の時間で増加します(5つの作業アイテムまたは5kの作業アイテムがある場合、同じオーバーヘッドが発生します)。あなたが本当に本当に本当に、コンカレントライブラリなしで答えが必要な場合は、これについてコメントしてください。 – kag0

答えて

0

すべてのWorkItemに対してDoTaskスレッドを起動できます。このスレッドは作業を処理します。 作業が完了したら、コントロールオブジェクトに同期させてアイテムを投稿しようとします。アイテムが適切なIDであるかどうかをチェックし、そうでない場合は待機します。

ポストの実装は次のようになりますあなたがBlockingQueueを許可する場合は、なぜあなたはJavaで同時実行utilsパッケージの残りの部分を無視する

synchronized(controllingObject) { 
try { 
while(workItem.id != nextId) controllingObject.wait(); 
} catch (Exception e) {} 
//Post the workItem 
nextId++; 
object.notifyAll(); 
} 
+0

可能な限りスレッドの繰り返し作成を避け、代わりにスレッドプールを使用したいと思います。 – Frizz

+0

スレッドプールで同じことを使うことができます... –

+0

スレッドプールを使うつもりなら、あなた自身で書くか、他の3番目のものを使わない限り、あなたはすでに 'java.util.concurrent'に浸っています。 (Java 5以降にメンテナンスされているかどうかはわかりません)。 –

4

? 上記のためStream(あなたは、Java 1.8を持っている場合):

List<Type> data = ...; 
List<Other> out = data.parallelStream() 
    .map(t -> doSomeWork(t)) 
    .collect(Collectors.toList()); 

ご注文CollectionList)から開始し、Listにも収集しているので、あなたは、入力と同じ順序で結果を持っています。

+0

私は入力データの固定リストを持っていないので、これは私の問題を解決しません - 私はストリーム(キュー)があります。ここではStreamsを使用するのは好きではありません。なぜなら、それは私にコントロールを与えないからです(例えば、作成されるスレッドの数について)。 – Frizz

+1

スレッド数を制御できます。そのためには、ForkJoinPoolを使用する必要があります。http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPoolhtml –

3

BlockingQueueですべての先物を払います。 、

public class SequentialProcessor implements Consumer<Task> { 
    private final ExecutorService executor = Executors.newCachedThreadPool(); 
    private final BlockingDeque<Future<Result>> queue = new LinkedBlockingDeque<>(); 

    public SequentialProcessor(Consumer<Result> listener) { 
     new Thread(() -> { 
      while (true) { 
       try { 
        listener.accept(queue.take().get()); 
       } catch (InterruptedException | ExecutionException e) { 
        // handle the exception however you want, perhaps just logging it 
       } 
      } 
     }).start(); 
    } 

    public void accept(Task task) { 
     queue.add(executor.submit(callableFromTask(task))); 
    } 

    private Callable<Result> callableFromTask(Task task) { 
     return <how to create a Result from a Task>; // implement this however 
    } 
} 

その後、使用する(1回)SequentialProcessorを作成します:ここではあなたが必要なすべてのコードです

それに
SequentialProcessor processor = new SequentialProcessor(whatToDoWithResults); 

とポンプタスク:

Stream<Task> tasks; // given this 

tasks.forEach(processor); // simply this 

私はcallableFromTask()メソッドを作成しましたただし、TaskからResultを取得するのが簡単な場合は、代わりにラムダまたはメソッド参照を使用してください。

queue.add(executor.submit(task::getResult)); 

をしたり、(ラムダ)の発現を必要とする場合:

例えば、TaskgetResult()メソッドを持っていた場合、これを行う

queue.add(executor.submit(() -> task.getValue() + "foo")); // or whatever 
+0

私は入力データの固定リストを持っていないので、これは私の問題を解決しません - 私は作業項目のストリームを持っています。 – Frizz

+0

@Frizz私は自分の答えを変えて、連続した仕事の流れを処理しました。あなたの状況にタイプを指定していないので、私は 'Task'と' Result'を使用しました。 – Bohemian

3

あなたは3入力3出力キューを持つことができます - 各ワーカースレッドの各タイプの1つ。

入力キューに何かを挿入する場合は、3つの入力キューのうちの1つに入れるだけです。ラウンドロビン方式で入力キューを変更します。同じことが出力にも当てはまります。出力から何かを取り出し、最初に出力キューを選択し、要素を取得すると次のキューに切り替わります。

すべてのキューをブロックする必要があります。

4

これは私が以前のプロジェクト(しかし java.util.concurrentのと)であなたの問題を解決し方法です:

(1)作業項目のクラスは、実際の作業/処理を行います。

public class WorkItem implements Callable<WorkItem> { 
    Object content; 
    public WorkItem(Object content) { 
     super(); 
     this.content = content; 
    } 

    public WorkItem call() throws Exception { 
     // getContent() + do your processing 
     return this; 
    } 
} 

(2)このクラスは、作業項目を待ち行列に入れ、処理を開始します。

public class Producer { 
    ... 
    public Producer() { 
     super(); 
     workerQueue = new ArrayBlockingQueue<Future<WorkItem>>(THREADS_TO_USE); 
     completionService = new ExecutorCompletionService<WorkItem>(Executors.newFixedThreadPool(THREADS_TO_USE)); 
     workerThread = new Thread(new Worker(workerQueue)); 
     workerThread.start(); 
    } 

    public void send(Object o) throws Exception { 
     WorkItem workItem = new WorkItem(o); 
     Future<WorkItem> future = completionService.submit(workItem); 
     workerQueue.put(future); 
    } 
} 

(3)処理が終了すると、作業項目はデキューされます

public class MainApp { 
    public static void main(String[] args) throws Exception { 
     Producer p = new Producer(); 
     for (int i = 0; i < 10000; i++) 
      p.send(i); 
    } 
} 
4

ジャストIDは、処理のためのオブジェクトのそれぞれ、どのだろうプロキシを作成:

public class Worker implements Runnable { 
    private ArrayBlockingQueue<Future<WorkItem>> workerQueue = null; 

    public Worker(ArrayBlockingQueue<Future<WorkItem>> workerQueue) { 
     super(); 
     this.workerQueue = workerQueue; 
    } 

    public void run() { 
     while (true) { 
      Future<WorkItem> fwi = workerQueue.take(); // deqeueue it 
      fwi.get(); // wait for it till it has finished processing 
     } 
    } 
} 

(4)これは、あなたのコード内のものを使用して、新しい作品を提出する方法です。ここ編完了した作業を受け入れ、プッシュされたIDが連続している場合にのみ返すことを許可します。以下のサンプルコード。同期されていない自動ソートコレクションとAPIとしての2つの単純なメソッドを利用して、それがどれほどシンプルであるかに注意してください。

public class SequentialPushingProxy { 

    static class OrderedJob implements Comparable<OrderedJob>{ 
     static AtomicInteger idSource = new AtomicInteger(); 
     int id; 

     public OrderedJob() { 
      id = idSource.incrementAndGet(); 
     } 

     public int getId() { 
      return id; 
     } 

     @Override 
     public int compareTo(OrderedJob o) { 
      return Integer.compare(id, o.getId()); 
     } 
    } 

    int lastId = OrderedJob.idSource.get(); 

    public Queue<OrderedJob> queue; 

    public SequentialPushingProxy() { 
     queue = new PriorityQueue<OrderedJob>(); 
    } 

    public synchronized void pushResult(OrderedJob job) { 
     queue.add(job); 
    } 

    List<OrderedJob> jobsToReturn = new ArrayList<OrderedJob>(); 
    public synchronized List<OrderedJob> getFinishedJobs() { 
     while (queue.peek() != null) { 
      // only one consumer at a time, will be safe 
      if (queue.peek().getId() == lastId+1) { 
       jobsToReturn.add(queue.poll()); 
       lastId++; 
      } else { 
       break; 
      } 
     } 
     if (jobsToReturn.size() != 0) { 
      List<OrderedJob> toRet = jobsToReturn; 
      jobsToReturn = new ArrayList<OrderedJob>(); 
      return toRet; 
     } 
     return Collections.emptyList(); 
    } 

    public static void main(String[] args) { 
     final SequentialPushingProxy proxy = new SequentialPushingProxy(); 

     int numProducerThreads = 5; 

     for (int i=0; i<numProducerThreads; i++) { 
      new Thread(new Runnable() { 
       @Override 
       public void run() { 
        while(true) { 
         proxy.pushResult(new OrderedJob()); 
        } 
       } 
      }).start(); 
     } 


     int numConsumerThreads = 1; 

     for (int i=0; i<numConsumerThreads; i++) { 
      new Thread(new Runnable() { 
       @Override 
       public void run() { 
        while(true) { 
         List<OrderedJob> ret = proxy.getFinishedJobs(); 
         System.out.println("got "+ret.size()+" finished jobs"); 
         try { 
          Thread.sleep(200); 
         } catch (InterruptedException e) { 
          // TODO Auto-generated catch block 
          e.printStackTrace(); 
         } 
        } 
       } 
      }).start(); 
     } 


     try { 
      Thread.sleep(5000); 
     } catch (InterruptedException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

     System.exit(0); 
    } 

} 

このコードは簡単に

  • に改善することができ、同期コスト
  • が小さい中で仕事を成し遂げるために返されたコレクションに制限を導入減らすために、一度に複数のジョブの結果をプッシュすることができチャンク
  • これらの2つのパブリックメソッドのインターフェイスを抽出し、テストを実行するための実装を切り替えます。
+0

プライオリティキューにはコンシューマが1つしかありません。ドキュメントから: "いずれかのスレッドがキューを変更した場合、複数のスレッドが同時にPriorityQueueインスタンスにアクセスすべきではありません。代わりに、スレッドセーフなPriorityBlockingQueueクラスを使用してください。 – Bampfer

+1

プロキシのメソッドの 'synchronized'に注意してください。安全です。 – Dariusz

+0

良い点。それからあなたは「一人の消費者、安全でなければならない」というコメントは必要ありません。 :-) – Bampfer

0

私は、着信オーダーを保持するために余分なキューが必要だと思います。 IncomingOrderQueue。

IncomingOrderQueueから消費する別のスレッドからオブジェクトを消費した場合、オブジェクトのID(ハッシュ)を選択してこのHashMapから収集します。

このソリューションは、実行サービスなしで簡単に実装できます。

0

前処理:各アイテムに順序値を追加し、割り当てられていない場合は配列を準備します。

入力:キュー(順番で同時サンプリングが1,2,3,4の値が、サンプルなったトレッドは問題does notの)

出力:配列(すべてのを待つために同期ポイントを使用して、インデックス付きの要素への書き込みを最後のスレッドは、スレッドごとに異なる位置を書き込むため、衝突チェックは必要ありません)。

後処理:配列をキューに変換します。

n個のスレッドに対してn個の要素配列が必要です。または、後処理を1回だけ行うにはnの倍数。

1

反応的なプログラミングが役立ちます。 RxJavaでの私の短い経験の中で、私は、Futureなどのようなコア言語の機能よりも直感的で使いやすいと感じました。ここにいくつかの役立つ出発点があります。https://www.youtube.com/watch?v=_t06LRX0DV0

添付の例にもこれを実行する方法が示されています。以下の例では、処理が必要なパケットがあります。それらは簡単な情報を通し、ひとつのリストに統合されます。このメッセージに添付出力は、彼らが

import static java.time.Instant.now; 
import static rx.schedulers.Schedulers.io; 

import java.time.Instant; 
import java.util.List; 
import java.util.Random; 

import rx.Observable; 
import rx.Subscriber; 

public class RxApp { 

    public static void main(String... args) throws InterruptedException { 

    List<ProcessedPacket> processedPackets = Observable.range(0, 10) // 
     .flatMap(i -> { 
      return getPacket(i).subscribeOn(io()); 
     }) // 
     .map(Packet::transform) // 
     .toSortedList() // 
     .toBlocking() // 
     .single(); 

    System.out.println("===== RESULTS ====="); 
    processedPackets.stream().forEach(System.out::println); 
    } 

    static Observable<Packet> getPacket(Integer i) { 
    return Observable.create((Subscriber<? super Packet> s) -> { 
     // simulate latency 
     try { 
     Thread.sleep(new Random().nextInt(5000)); 
     } catch (Exception e) { 
     e.printStackTrace(); 
     } 
     System.out.println("packet requested for " + i); 
     s.onNext(new Packet(i.toString(), now())); 
     s.onCompleted(); 
    }); 
    } 

} 


class Packet { 
    String aString; 
    Instant createdOn; 

    public Packet(String aString, Instant time) { 
    this.aString = aString; 
    this.createdOn = time; 
    } 

    public ProcessedPacket transform() { 
    System.out.println("       Packet being transformed " + aString); 
    try { 
     Thread.sleep(new Random().nextInt(5000)); 
    } catch (Exception e) { 
     e.printStackTrace(); 
    } 
    ProcessedPacket newPacket = new ProcessedPacket(this, now()); 
    return newPacket; 
    } 

    @Override 
    public String toString() { 
    return "Packet [aString=" + aString + ", createdOn=" + createdOn + "]"; 
    } 
} 


class ProcessedPacket implements Comparable<ProcessedPacket> { 
    Packet p; 
    Instant processedOn; 

    public ProcessedPacket(Packet p, Instant now) { 
    this.p = p; 
    this.processedOn = now; 
    } 

    @Override 
    public int compareTo(ProcessedPacket o) { 
    return p.createdOn.compareTo(o.p.createdOn); 
    } 

    @Override 
    public String toString() { 
    return "ProcessedPacket [p=" + p + ", processedOn=" + processedOn + "]"; 
    } 

} 

解体

Observable.range(0, 10) // 
    .flatMap(i -> { 
     return getPacket(i).subscribeOn(io()); 
    }) // source the input as observables on multiple threads 


    .map(Packet::transform) // processing the input data 

    .toSortedList() // sorting to sequence the processed inputs; 
    .toBlocking() // 
    .single(); 

を受信した順に出力されたパケットを受信した時間ではなく、最終的に異なる点に変換されることを示しています特定の実行では、パケットは2,6,0,1,8,7,5,9,4,3の順で受信され、2,6,0,1,3,4,5,7,8 、9つの異なるスレッドで

packet requested for 2 
          Packet being transformed 2 
packet requested for 6 
          Packet being transformed 6 
packet requested for 0 
packet requested for 1 
          Packet being transformed 0 
packet requested for 8 
packet requested for 7 
packet requested for 5 
packet requested for 9 
          Packet being transformed 1 
packet requested for 4 
packet requested for 3 
          Packet being transformed 3 
          Packet being transformed 4 
          Packet being transformed 5 
          Packet being transformed 7 
          Packet being transformed 8 
          Packet being transformed 9 
===== RESULTS ===== 
ProcessedPacket [p=Packet [aString=2, createdOn=2016-04-14T13:48:52.060Z], processedOn=2016-04-14T13:48:53.247Z] 
ProcessedPacket [p=Packet [aString=6, createdOn=2016-04-14T13:48:52.130Z], processedOn=2016-04-14T13:48:54.208Z] 
ProcessedPacket [p=Packet [aString=0, createdOn=2016-04-14T13:48:53.989Z], processedOn=2016-04-14T13:48:55.786Z] 
ProcessedPacket [p=Packet [aString=1, createdOn=2016-04-14T13:48:54.109Z], processedOn=2016-04-14T13:48:57.877Z] 
ProcessedPacket [p=Packet [aString=8, createdOn=2016-04-14T13:48:54.418Z], processedOn=2016-04-14T13:49:14.108Z] 
ProcessedPacket [p=Packet [aString=7, createdOn=2016-04-14T13:48:54.600Z], processedOn=2016-04-14T13:49:11.338Z] 
ProcessedPacket [p=Packet [aString=5, createdOn=2016-04-14T13:48:54.705Z], processedOn=2016-04-14T13:49:06.711Z] 
ProcessedPacket [p=Packet [aString=9, createdOn=2016-04-14T13:48:55.227Z], processedOn=2016-04-14T13:49:16.927Z] 
ProcessedPacket [p=Packet [aString=4, createdOn=2016-04-14T13:48:56.381Z], processedOn=2016-04-14T13:49:02.161Z] 
ProcessedPacket [p=Packet [aString=3, createdOn=2016-04-14T13:48:56.566Z], processedOn=2016-04-14T13:49:00.557Z] 
関連する問題