2012-06-18 29 views
23

いくつかのWebサーバーインスタンスを並行して実行することを検討してください。各サーバーは、すべてのサーバーから最後のN要求を保持する1つの共有「状態保持者」への参照を保持します。例えばJavaのスレッドセーフ循環バッファ

N=3):任意の時点で

Server a: "Request id = ABCD"  Status keeper=["ABCD"] 
Server b: "Request id = XYZZ"  Status keeper=["ABCD", "XYZZ"] 
Server c: "Request id = 1234"  Status keeper=["ABCD", "XYZZ", "1234"] 
Server b: "Request id = FOO"   Status keeper=["XYZZ", "1234", "FOO"] 
Server a: "Request id = BAR"   Status keeper=["1234", "FOO", "BAR"] 

、「ステータスキーパーは、」SLAレポートのこれらの最後のNの要求を読み込み、監視アプリケーションから呼び出すことがあります。

このプロデューサ - コンシューマシナリオをJavaで実装する最良の方法は、WebサーバーにSLAレポートよりも高い優先順位を与えますか?

CircularFifoBufferは、要求を保持するのに適切なデータ構造のようですが、効率的な並行処理を実装する最適な方法は何か分かりません。

+0

「より高い優先度」を定義します。レポートがバッファーの読み込みを開始したらどうなりますか?もし誰かがそれを書こうと思えば、それは壊れて始めるべきですか?それが飢えにつながるのだろうか? –

+0

これは決して餓死すべきではなく、決して止めるべきではありませんが、もう少し長く待つことができます。 –

+0

リングバッファに必要なプロデューサ数と消費者数は、データを提供する際にいくつかのコードを削除します。 – bestsss

答えて

16
Buffer fifo = BufferUtils.synchronizedBuffer(new CircularFifoBuffer()); 
+0

+1 '' fifo'は揮発性ではありませんか? –

+2

初期化コードが競合しない限り、実際には問題になりません。 – MahdeTo

+0

BufferUtilsはどこにありますか?私はApacheのこれをgradleファイルで使ってみました: "org.apache.commons:commons-collections4:4.1 '"をコンパイルしましたが、そこにはありません... –

2

私はArrayDequeを見ていますが、より多くの並行した実装では、Javaの最も洗練された/複雑なリングバッファの1つであるDisruptorライブラリを見てください。

代わりに、プロデューサがコンシューマを待つ必要がないため、コンカレントなバインドされていないキューを使用することもできます。 Java Chronicle

あなたの必要性が複雑さを正当化しない限り、ArrayDequeは必要なものだけかもしれません。

+0

一つの重要な問題: 'ArrayDeque'はサイズ制限されていません。それは、真の円形配列を使用しますが、必要に応じてより多くの要素に対応するようにサイズが変更されます。オペレータは、しばらくして新しい要素を挿入する前に要素を手動で 'pop()'しなければならず、すべてスレッドセーフを明示的に維持しながら... – thkala

+1

サイズが制限されている必要がある場合、ArrayBlockingQueueを使用できます。 –

+1

'ArrayBlockingQueue'は、要素が削除されるまでブロックすることによってサイズを制限します。私が知る限り、オペレータはキューに暗黙的に最も古い要素を削除/上書きし、最新の 'N'要素のみを保持したいとします。 – thkala

7

ここでは、ロックフリーリングバッファの実装について説明します。これは固定サイズのバッファを実装しています - FIFO機能はありません。代わりに各サーバーの要求のCollectionを保存することをお勧めします。そうすれば、データ構造をフィルタリングするのではなく、フィルタリングを行うことができます。

/** 
* Container 
* --------- 
* 
* A lock-free container that offers a close-to O(1) add/remove performance. 
* 
*/ 
public class Container<T> implements Iterable<T> { 

    // The capacity of the container. 
    final int capacity; 
    // The list. 
    AtomicReference<Node<T>> head = new AtomicReference<Node<T>>(); 
    // TESTING { 
    AtomicLong totalAdded = new AtomicLong(0); 
    AtomicLong totalFreed = new AtomicLong(0); 
    AtomicLong totalSkipped = new AtomicLong(0); 

    private void resetStats() { 
    totalAdded.set(0); 
    totalFreed.set(0); 
    totalSkipped.set(0); 
    } 
    // TESTING } 

    // Constructor 
    public Container(int capacity) { 
    this.capacity = capacity; 
    // Construct the list. 
    Node<T> h = new Node<T>(); 
    Node<T> it = h; 
    // One created, now add (capacity - 1) more 
    for (int i = 0; i < capacity - 1; i++) { 
     // Add it. 
     it.next = new Node<T>(); 
     // Step on to it. 
     it = it.next; 
    } 
    // Make it a ring. 
    it.next = h; 
    // Install it. 
    head.set(h); 
    } 

    // Empty ... NOT thread safe. 
    public void clear() { 
    Node<T> it = head.get(); 
    for (int i = 0; i < capacity; i++) { 
     // Trash the element 
     it.element = null; 
     // Mark it free. 
     it.free.set(true); 
     it = it.next; 
    } 
    // Clear stats. 
    resetStats(); 
    } 

    // Add a new one. 
    public Node<T> add(T element) { 
    // Get a free node and attach the element. 
    totalAdded.incrementAndGet(); 
    return getFree().attach(element); 
    } 

    // Find the next free element and mark it not free. 
    private Node<T> getFree() { 
    Node<T> freeNode = head.get(); 
    int skipped = 0; 
    // Stop when we hit the end of the list 
    // ... or we successfully transit a node from free to not-free. 
    while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) { 
     skipped += 1; 
     freeNode = freeNode.next; 
    } 
    // Keep count of skipped. 
    totalSkipped.addAndGet(skipped); 
    if (skipped < capacity) { 
     // Put the head as next. 
     // Doesn't matter if it fails. That would just mean someone else was doing the same. 
     head.set(freeNode.next); 
    } else { 
     // We hit the end! No more free nodes. 
     throw new IllegalStateException("Capacity exhausted."); 
    } 
    return freeNode; 
    } 

    // Mark it free. 
    public void remove(Node<T> it, T element) { 
    totalFreed.incrementAndGet(); 
    // Remove the element first. 
    it.detach(element); 
    // Mark it as free. 
    if (!it.free.compareAndSet(false, true)) { 
     throw new IllegalStateException("Freeing a freed node."); 
    } 
    } 

    // The Node class. It is static so needs the <T> repeated. 
    public static class Node<T> { 

    // The element in the node. 
    private T element; 
    // Are we free? 
    private AtomicBoolean free = new AtomicBoolean(true); 
    // The next reference in whatever list I am in. 
    private Node<T> next; 

    // Construct a node of the list 
    private Node() { 
     // Start empty. 
     element = null; 
    } 

    // Attach the element. 
    public Node<T> attach(T element) { 
     // Sanity check. 
     if (this.element == null) { 
     this.element = element; 
     } else { 
     throw new IllegalArgumentException("There is already an element attached."); 
     } 
     // Useful for chaining. 
     return this; 
    } 

    // Detach the element. 
    public Node<T> detach(T element) { 
     // Sanity check. 
     if (this.element == element) { 
     this.element = null; 
     } else { 
     throw new IllegalArgumentException("Removal of wrong element."); 
     } 
     // Useful for chaining. 
     return this; 
    } 

    public T get() { 
     return element; 
    } 

    @Override 
    public String toString() { 
     return element != null ? element.toString() : "null"; 
    } 
    } 

    // Provides an iterator across all items in the container. 
    public Iterator<T> iterator() { 
    return new UsedNodesIterator<T>(this); 
    } 

    // Iterates across used nodes. 
    private static class UsedNodesIterator<T> implements Iterator<T> { 
    // Where next to look for the next used node. 

    Node<T> it; 
    int limit = 0; 
    T next = null; 

    public UsedNodesIterator(Container<T> c) { 
     // Snapshot the head node at this time. 
     it = c.head.get(); 
     limit = c.capacity; 
    } 

    public boolean hasNext() { 
     // Made into a `while` loop to fix issue reported by @Nim in code review 
     while (next == null && limit > 0) { 
     // Scan to the next non-free node. 
     while (limit > 0 && it.free.get() == true) { 
      it = it.next; 
      // Step down 1. 
      limit -= 1; 
     } 
     if (limit != 0) { 
      next = it.element; 
     } 
     } 
     return next != null; 
    } 

    public T next() { 
     T n = null; 
     if (hasNext()) { 
     // Give it to them. 
     n = next; 
     next = null; 
     // Step forward. 
     it = it.next; 
     limit -= 1; 
     } else { 
     // Not there!! 
     throw new NoSuchElementException(); 
     } 
     return n; 
    } 

    public void remove() { 
     throw new UnsupportedOperationException("Not supported."); 
    } 
    } 

    @Override 
    public String toString() { 
    StringBuilder s = new StringBuilder(); 
    Separator comma = new Separator(","); 
    // Keep counts too. 
    int usedCount = 0; 
    int freeCount = 0; 
    // I will iterate the list myself as I want to count free nodes too. 
    Node<T> it = head.get(); 
    int count = 0; 
    s.append("["); 
    // Scan to the end. 
    while (count < capacity) { 
     // Is it in-use? 
     if (it.free.get() == false) { 
     // Grab its element. 
     T e = it.element; 
     // Is it null? 
     if (e != null) { 
      // Good element. 
      s.append(comma.sep()).append(e.toString()); 
      // Count them. 
      usedCount += 1; 
     } else { 
      // Probably became free while I was traversing. 
      // Because the element is detached before the entry is marked free. 
      freeCount += 1; 
     } 
     } else { 
     // Free one. 
     freeCount += 1; 
     } 
     // Next 
     it = it.next; 
     count += 1; 
    } 
    // Decorate with counts "]used+free". 
    s.append("]").append(usedCount).append("+").append(freeCount); 
    if (usedCount + freeCount != capacity) { 
     // Perhaps something was added/freed while we were iterating. 
     s.append("?"); 
    } 
    return s.toString(); 
    } 
} 

これはO1 putとgetに近いことに注意してください。 Separatorは、初めて ""立ち上がり、その後はそのパラメータが表示されます。

編集:テストメソッドを追加しました。

// ***** Following only needed for testing. ***** 
private static boolean Debug = false; 
private final static String logName = "Container.log"; 
private final static NamedFileOutput log = new NamedFileOutput("C:\\Junk\\"); 

private static synchronized void log(boolean toStdoutToo, String s) { 
    if (Debug) { 
    if (toStdoutToo) { 
     System.out.println(s); 
    } 
    log(s); 
    } 
} 

private static synchronized void log(String s) { 
    if (Debug) { 
    try { 
     log.writeLn(logName, s); 
    } catch (IOException ex) { 
     ex.printStackTrace(); 
    } 
    } 
} 
static volatile boolean testing = true; 

// Tester object to exercise the container. 
static class Tester<T> implements Runnable { 
    // My name. 

    T me; 
    // The container I am testing. 
    Container<T> c; 

    public Tester(Container<T> container, T name) { 
    c = container; 
    me = name; 
    } 

    private void pause() { 
    try { 
     Thread.sleep(0); 
    } catch (InterruptedException ex) { 
     testing = false; 
    } 
    } 

    public void run() { 
    // Spin on add/remove until stopped. 
    while (testing) { 
     // Add it. 
     Node<T> n = c.add(me); 
     log("Added " + me + ": " + c.toString()); 
     pause(); 
     // Remove it. 
     c.remove(n, me); 
     log("Removed " + me + ": " + c.toString()); 
     pause(); 
    } 
    } 
} 
static final String[] strings = { 
    "One", "Two", "Three", "Four", "Five", 
    "Six", "Seven", "Eight", "Nine", "Ten" 
}; 
static final int TEST_THREADS = Math.min(10, strings.length); 

public static void main(String[] args) throws InterruptedException { 
    Debug = true; 
    log.delete(logName); 
    Container<String> c = new Container<String>(10); 

    // Simple add/remove 
    log(true, "Simple test"); 
    Node<String> it = c.add(strings[0]); 
    log("Added " + c.toString()); 
    c.remove(it, strings[0]); 
    log("Removed " + c.toString()); 

    // Capacity test. 
    log(true, "Capacity test"); 
    ArrayList<Node<String>> nodes = new ArrayList<Node<String>>(strings.length); 
    // Fill it. 
    for (int i = 0; i < strings.length; i++) { 
    nodes.add(i, c.add(strings[i])); 
    log("Added " + strings[i] + " " + c.toString()); 
    } 
    // Add one more. 
    try { 
    c.add("Wafer thin mint!"); 
    } catch (IllegalStateException ise) { 
    log("Full!"); 
    } 
    c.clear(); 
    log("Empty: " + c.toString()); 

    // Iterate test. 
    log(true, "Iterator test"); 
    for (int i = 0; i < strings.length; i++) { 
    nodes.add(i, c.add(strings[i])); 
    } 
    StringBuilder all = new StringBuilder(); 
    Separator sep = new Separator(","); 
    for (String s : c) { 
    all.append(sep.sep()).append(s); 
    } 
    log("All: "+all); 
    for (int i = 0; i < strings.length; i++) { 
    c.remove(nodes.get(i), strings[i]); 
    } 
    sep.reset(); 
    all.setLength(0); 
    for (String s : c) { 
    all.append(sep.sep()).append(s); 
    } 
    log("None: " + all.toString()); 

    // Multiple add/remove 
    log(true, "Multi test"); 
    for (int i = 0; i < strings.length; i++) { 
    nodes.add(i, c.add(strings[i])); 
    log("Added " + strings[i] + " " + c.toString()); 
    } 
    log("Filled " + c.toString()); 
    for (int i = 0; i < strings.length - 1; i++) { 
    c.remove(nodes.get(i), strings[i]); 
    log("Removed " + strings[i] + " " + c.toString()); 
    } 
    c.remove(nodes.get(strings.length - 1), strings[strings.length - 1]); 
    log("Empty " + c.toString()); 

    // Multi-threaded add/remove 
    log(true, "Threads test"); 
    c.clear(); 
    for (int i = 0; i < TEST_THREADS; i++) { 
    Thread t = new Thread(new Tester<String>(c, strings[i])); 
    t.setName("Tester " + strings[i]); 
    log("Starting " + t.getName()); 
    t.start(); 
    } 
    // Wait for 10 seconds. 
    long stop = System.currentTimeMillis() + 10 * 1000; 
    while (System.currentTimeMillis() < stop) { 
    Thread.sleep(100); 
    } 
    // Stop the testers. 
    testing = false; 
    // Wait some more. 
    Thread.sleep(1 * 100); 
    // Get stats. 
    double added = c.totalAdded.doubleValue(); 
    double skipped = c.totalSkipped.doubleValue(); 
    //double freed = c.freed.doubleValue(); 
    log(true, "Stats: added=" + c.totalAdded + ",freed=" + c.totalFreed + ",skipped=" + c.totalSkipped + ",O(" + ((added + skipped)/added) + ")"); 
} 
+0

このアルゴリズムの正当性についての正式な検証はありますか?あなたがノードを再利用することを避けない限り、ロックフリーのデータ構造は正しく取得することは悪くも難しいです... – thkala

+0

@thkala - どのように '正式な'あなたが必要ですか?主なアルゴリズムは、空きノードを選択して使用するためにマークする 'getFree'メソッドにあります。それは非常に簡単で、その正しさは自明でなければなりません。私はテストメソッドを追加しました。多分彼らは助けるでしょう。 – OldCurmudgeon

+0

パブリッシュされピアレビューされたアルゴリズムが持つ「正式な」種類。私はロックフリーのデータ構造で広範囲に作業してきました。あまりにも多くの角の場合があります... – thkala

1

java.util.concurrentを見てみましょう。

消費するもの、または生成する(任意に)スペースがあるまでキューがブロックされるブロッキング:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html

同時連結キューは、非ブロッキングとにプロデューサとコンシューマを可能にする滑らかなアルゴリズムを使用しています同時にアクティブに:

http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html

+0

CLQはアンバインドされています。 – bestsss

1

HazelcastのQueueは、あなたが求めるほとんどすべてを提供していますが、supporていませんt循環性。しかし、あなたの記述から、あなたが実際にそれを必要とするかどうかはわかりません。

0

私の場合は、指定したとおりにCircularFIFOBufferを使用し、書き込み(追加)するときにバッファを中心に同期させます。監視アプリケーションがバッファーを読み取ろうとするときに、バッファー上で同期化し、それをコピーまたはクローン化して報告に使用します。

この提案は、バッファを新しいオブジェクトにコピー/クローンするためにレイテンシが最小であることを前提にしています。要素数が多く、コピー時間が遅い場合、これは良い考えではありません。

擬似コードの例:

public void writeRequest(String requestID) { 
    synchronized(buffer) { 
     buffer.add(requestID); 
    } 
} 

public Collection<String> getRequests() { 
    synchronized(buffer) { 
     return buffer.clone(); 
    } 
} 
2

はたぶん、あなたはDisruptor - Concurrent Programming Frameworkを見てみたいです。

  • 、ここjava.util.concurrent.ArrayBlockingQueueに設計し、またパフォーマンスcomparementを代替案についての論文を探す:pdf
  • はスティック、ライブラリが多すぎるとBlogsAndArticles

から最初の3件の記事を読むことを考えてみましょう〜java.util.concurrent.ArrayBlockingQueue

関連する問題