スレッド間でデータを渡すために設計されたNIO機能のJava NIO Pipeがあることが判明しました。 ArrayBlockingQueueなどのキューを通過する従来のメッセージよりも、このメカニズムを使用する利点はありますか?Java NIO PipeとBlockingQueue
答えて
通常、別のスレッドのデータを処理する最も簡単な方法は、ExecutorServiceを使用することです。これは、キューとスレッドプール(1つのスレッドを持つことができる)の両方をまとめます。
NIOチャネルをサポートするライブラリがある場合、パイプを使用できます。 ByteBuffersのデータをスレッド間で渡したい場合にも便利です。
それ以外の場合は、通常、単純/より高速のArrayBlockingQueueを使用します。
スレッド間でデータをやり取りする方が速い場合は、Exchangerを参照することをお勧めしますが、ArrayBlockingQueueとは一般的な目的ではありません。
ありがとう、私は、エクスチェンジャーを使用するとGCのオーバーヘッドを最小限に抑えるという事実を決して考えませんでした。しかし、交換機の欠点は、それが同期していることです。通常、データを取得するまで待つことなく、別のスレッドにデータをポンピングするだけです。 – Maxaon3000
パイプは固定サイズです。生産者が生産を止めなければならない場合、問題は同じです。消費者が終了する前にプロデューサーがバッファーを満たすことが決してない場合、どちらの場合も停止する必要はありません –
パイプはSelector.wakeupを実装するために使用されます。カーネルを通過しないでください。 – bestsss
私はそれは非常に可能性が舞台裏でコルーチンを用いて実施することもできるよう、パイプが良く、待ち時間を持つことになりますと仮定します。したがって、プロデューサは、スレッドスケジューラが決定したときではなく、データが利用可能であるときに消費者に即座に結果を返す。
一般的に、パイプは消費者生産者の問題を表しており、このように実装されている可能性が高く、両方のスレッドが協調して外部で先取りされることはありません。
NIO Pipeは、スレッドセーフな方法でセレクタループ内のチャンネルにデータを送信できるように設計されていると思います。つまり、スレッドはパイプに書き込むことができ、データは他のスレッドで処理されます。パイプの極端な部分がセレクタループの内側にあります。パイプに書き込むときは、反対側のチャンネルを読みやすくします。
シンプルなキュー・ポーリングに比べてセレクタ・ループを使用してスレッド間でデータを渡す際のパフォーマンスの特性については疑問です。また、パイプを介してデータを渡すことは、オブジェクトではなくバイトで他のスレッドに渡さなければならないという不便さを抱えているようです。つまり、スレッド間データ交換のためのワイヤプロトコルを開発する必要があります。 – Maxaon3000
ConcurrentLinkedQueueを意味します。それはすばらしい質問です。私は自分のチップをConcurrentLinkedQueueに賭けました。 :)しかし、パイプの1つの利点は、他のすべての人がやっているようなメッセージを送信することです。言い換えれば、キューからオブジェクトをフェッチするのではなく、チャネルから読み取ることです。 – chrisapotek
パイプ(check here)に多くの問題があった後、NIOパイプで非ブロッキング並行キューを優先することに決めました。だから私はJavaのConcurrentLinkedQueueのベンチマークを行った。以下を参照してください:
public static void main(String[] args) throws Exception {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();
// first test nothing:
for (int j = 0; j < 20; j++) {
Benchmarker bench = new Benchmarker();
String s = "asd";
for (int i = 0; i < 1000000; i++) {
bench.mark();
// s = queue.poll();
bench.measure();
}
System.out.println(bench.results());
Thread.sleep(100);
}
System.out.println();
// first test empty queue:
for (int j = 0; j < 20; j++) {
Benchmarker bench = new Benchmarker();
String s = "asd";
for (int i = 0; i < 1000000; i++) {
bench.mark();
s = queue.poll();
bench.measure();
}
System.out.println(bench.results());
Thread.sleep(100);
}
System.out.println();
// now test polling one element on a queue with size one
for (int j = 0; j < 20; j++) {
Benchmarker bench = new Benchmarker();
String s = "asd";
String x = "pela";
for (int i = 0; i < 1000000; i++) {
queue.offer(x);
bench.mark();
s = queue.poll();
bench.measure();
if (s != x) throw new Exception("bad!");
}
System.out.println(bench.results());
Thread.sleep(100);
}
System.out.println();
// now test polling one element on a queue with size two
for (int j = 0; j < 20; j++) {
Benchmarker bench = new Benchmarker();
String s = "asd";
String x = "pela";
for (int i = 0; i < 1000000; i++) {
queue.offer(x);
queue.offer(x);
bench.mark();
s = queue.poll();
bench.measure();
if (s != x) throw new Exception("bad!");
queue.poll();
}
System.out.println(bench.results());
Thread.sleep(100);
}
}
結果:
totalLogs=1000000, minTime=0, maxTime=85000, avgTime=58.61 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=5281000, avgTime=63.35 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=725000, avgTime=59.71 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=25000, avgTime=58.13 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=378000, avgTime=58.45 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=15000, avgTime=57.71 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=170000, avgTime=58.11 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=1495000, avgTime=59.87 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=232000, avgTime=63.0 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=184000, avgTime=57.89 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=2600000, avgTime=65.22 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=850000, avgTime=60.5 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=150000, avgTime=63.83 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=43000, avgTime=59.75 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=276000, avgTime=60.02 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=457000, avgTime=61.69 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=204000, avgTime=60.44 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=154000, avgTime=63.67 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=355000, avgTime=60.75 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=338000, avgTime=60.44 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=345000, avgTime=110.93 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=396000, avgTime=100.32 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=298000, avgTime=98.93 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=1891000, avgTime=101.9 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=254000, avgTime=103.06 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=1894000, avgTime=100.97 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=230000, avgTime=99.21 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=348000, avgTime=99.63 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=922000, avgTime=99.53 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=168000, avgTime=99.12 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=686000, avgTime=107.41 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=320000, avgTime=95.58 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=248000, avgTime=94.94 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=217000, avgTime=95.01 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=159000, avgTime=93.62 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=155000, avgTime=95.28 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=106000, avgTime=98.57 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=370000, avgTime=95.01 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=1836000, avgTime=96.21 (times in nanos)
totalLogs=1000000, minTime=0, maxTime=212000, avgTime=98.62 (times in nanos)
結論:
MAXTIMEは怖いことができますが、私は我々が50のnanos値であると結論しても安全だと思うポーリング同時の範囲はキュー。
- 1. Java NIOとSSL
- 2. Java BlockingQueue while()whileループ
- 3. triggering SheduledExecutor with blockingQueue Java
- 4. Java NIOとWindowsのディスクアクセス
- 5. Java静的同期とBlockingQueueの実装
- 6. Java NIOのバッファーマーキング
- 7. NIOパーシステントアレイ:Java
- 8. Java nioパスと通常のJavaファイルAPI
- 9. Java NIO SocketChannel.read()with multithread
- 10. Java nio。空のパス
- 11. Java NIOストアのパスリスト
- 12. IOとJavaのNIOのパスとパス
- 13. Java NIOノンブロッキングモードとnode.js非同期操作
- 14. Java。 Consumer - BlockingQueueを持つプロデューサ検索ツール
- 15. Objective-Cに相当するJavaのBlockingQueue?
- 16. C++ JavaのBlockingQueueに相当する
- 17. java nioクライアント側多重化
- 18. Java Solaris NIO OP_CONNECTの問題
- 19. Java 7 NIO.2 Files.getLastModifiedTimeタイムゾーン
- 20. CharBufferのJava NIO発行
- 21. のJava NIO文字セットエンコードデコード
- 22. Java NIO UDPマルチキャスト - 廃棄パケット
- 23. 不完全なファイルコピーJava NIO
- 24. Java NIOを使用してファイルを作成するNIO
- 25. Javaの複数のプロデューサとコンシューマの問題(BlockingQueueなし)
- 26. BlockingMapはBlockingQueueとしてJavaにありますか?
- 27. BlockingQueue resource consuming
- 28. Java - サーブレット3.0の非同期とサーブレット3.1のNIOとの比較
- 29. 多機能ソケットjava、ソケット対NIOソケット
- 30. Java NIO SocketChannel.write()でのスレッドの問題
パイプはカーネルを通っていますが、セレクタが起床しているのであまり役に立ちません。これはLinuxのパイプ経由で実装されています... – bestsss
@bestsssは詳しく書いていますか?セレクタにパイプを登録して通知を受け取ることができますが、何が問題なのですか? – raffian
@raffian、簡単に言えば - 実際にはIPC用のPipesを使用することはできません。プロセス内では、より効率的な情報の受け渡し方法があります。 – bestsss