反応的なプログラミングが役立ちます。 RxJavaでの私の短い経験の中で、私は、Futureなどのようなコア言語の機能よりも直感的で使いやすいと感じました。ここにいくつかの役立つ出発点があります。https://www.youtube.com/watch?v=_t06LRX0DV0
添付の例にもこれを実行する方法が示されています。以下の例では、処理が必要なパケットがあります。それらは簡単な情報を通し、ひとつのリストに統合されます。このメッセージに添付出力は、彼らが
import static java.time.Instant.now;
import static rx.schedulers.Schedulers.io;
import java.time.Instant;
import java.util.List;
import java.util.Random;
import rx.Observable;
import rx.Subscriber;
public class RxApp {
public static void main(String... args) throws InterruptedException {
List<ProcessedPacket> processedPackets = Observable.range(0, 10) //
.flatMap(i -> {
return getPacket(i).subscribeOn(io());
}) //
.map(Packet::transform) //
.toSortedList() //
.toBlocking() //
.single();
System.out.println("===== RESULTS =====");
processedPackets.stream().forEach(System.out::println);
}
static Observable<Packet> getPacket(Integer i) {
return Observable.create((Subscriber<? super Packet> s) -> {
// simulate latency
try {
Thread.sleep(new Random().nextInt(5000));
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("packet requested for " + i);
s.onNext(new Packet(i.toString(), now()));
s.onCompleted();
});
}
}
class Packet {
String aString;
Instant createdOn;
public Packet(String aString, Instant time) {
this.aString = aString;
this.createdOn = time;
}
public ProcessedPacket transform() {
System.out.println(" Packet being transformed " + aString);
try {
Thread.sleep(new Random().nextInt(5000));
} catch (Exception e) {
e.printStackTrace();
}
ProcessedPacket newPacket = new ProcessedPacket(this, now());
return newPacket;
}
@Override
public String toString() {
return "Packet [aString=" + aString + ", createdOn=" + createdOn + "]";
}
}
class ProcessedPacket implements Comparable<ProcessedPacket> {
Packet p;
Instant processedOn;
public ProcessedPacket(Packet p, Instant now) {
this.p = p;
this.processedOn = now;
}
@Override
public int compareTo(ProcessedPacket o) {
return p.createdOn.compareTo(o.p.createdOn);
}
@Override
public String toString() {
return "ProcessedPacket [p=" + p + ", processedOn=" + processedOn + "]";
}
}
解体
Observable.range(0, 10) //
.flatMap(i -> {
return getPacket(i).subscribeOn(io());
}) // source the input as observables on multiple threads
.map(Packet::transform) // processing the input data
.toSortedList() // sorting to sequence the processed inputs;
.toBlocking() //
.single();
を受信した順に出力されたパケットを受信した時間ではなく、最終的に異なる点に変換されることを示しています特定の実行では、パケットは2,6,0,1,8,7,5,9,4,3の順で受信され、2,6,0,1,3,4,5,7,8 、9つの異なるスレッドで
packet requested for 2
Packet being transformed 2
packet requested for 6
Packet being transformed 6
packet requested for 0
packet requested for 1
Packet being transformed 0
packet requested for 8
packet requested for 7
packet requested for 5
packet requested for 9
Packet being transformed 1
packet requested for 4
packet requested for 3
Packet being transformed 3
Packet being transformed 4
Packet being transformed 5
Packet being transformed 7
Packet being transformed 8
Packet being transformed 9
===== RESULTS =====
ProcessedPacket [p=Packet [aString=2, createdOn=2016-04-14T13:48:52.060Z], processedOn=2016-04-14T13:48:53.247Z]
ProcessedPacket [p=Packet [aString=6, createdOn=2016-04-14T13:48:52.130Z], processedOn=2016-04-14T13:48:54.208Z]
ProcessedPacket [p=Packet [aString=0, createdOn=2016-04-14T13:48:53.989Z], processedOn=2016-04-14T13:48:55.786Z]
ProcessedPacket [p=Packet [aString=1, createdOn=2016-04-14T13:48:54.109Z], processedOn=2016-04-14T13:48:57.877Z]
ProcessedPacket [p=Packet [aString=8, createdOn=2016-04-14T13:48:54.418Z], processedOn=2016-04-14T13:49:14.108Z]
ProcessedPacket [p=Packet [aString=7, createdOn=2016-04-14T13:48:54.600Z], processedOn=2016-04-14T13:49:11.338Z]
ProcessedPacket [p=Packet [aString=5, createdOn=2016-04-14T13:48:54.705Z], processedOn=2016-04-14T13:49:06.711Z]
ProcessedPacket [p=Packet [aString=9, createdOn=2016-04-14T13:48:55.227Z], processedOn=2016-04-14T13:49:16.927Z]
ProcessedPacket [p=Packet [aString=4, createdOn=2016-04-14T13:48:56.381Z], processedOn=2016-04-14T13:49:02.161Z]
ProcessedPacket [p=Packet [aString=3, createdOn=2016-04-14T13:48:56.566Z], processedOn=2016-04-14T13:49:00.557Z]
どのように:スレッドAは、着信キューからアイテムを取り出し、ワーカースレッドを作成し、開始し、キューにプッシュします。スレッドBは、そのキューからスレッドを1つずつ取り出し、それぞれのスレッドが完了するのを待ってから、結果を送信キューにプッシュします。それはあなたの制約を満たすものですか? – RealSkeptic
ここでの解決法のベストはすべて、java.util.concurrentの少なくともいくつかの要素を使用しています。理由があります。オーバーヘッドに基づいて 'java.util.concurrent'を拒否する前に、あなた自身またはこれらのソリューションの1つを使って、パフォーマンスを許容するかどうかを調べることをお勧めします。早すぎる最適化を避け、簡単な方法で最初に十分速く動作するかどうかを確認してください。 –
@sparc_spreadをエコーすると、並行ライブラリが思うように悪くないことがあります。実際のオーバーヘッドはスレッドの作成です。スレッドプールが固定されているため、オーバーヘッドは作業アイテムの数に対して一定の時間で増加します(5つの作業アイテムまたは5kの作業アイテムがある場合、同じオーバーヘッドが発生します)。あなたが本当に本当に本当に、コンカレントライブラリなしで答えが必要な場合は、これについてコメントしてください。 – kag0