2012-03-13 4 views
2

私はJavaを学び、runnableを使って既存のアプリで多少のマルチスレッドを行うことができました。私は現在、スレッド間で変数を共有するために混乱を見ていましたが、スレッドが実際にどのようにスレッドを生成しているのかわかりません。このJavaアプリケーションはスレッドを拡張せずに複数のスレッドを実行していますか?

彼は私のプログラムで実行可能クラスを送信するために使用するExecutorを使用していますが、この例ではサブミット(または実行可能)はありません。私はOracleのチュートリアルで知っていることを学んだだけで、スレッドを拡張したり実行可能なものを実装する方法は2つしかありません(私はここでは表示されませんが、executorをdisruptorに提出しています。私は何かを逃しているのですか、あるいはこの人が別のやり方でやっているのですか?私の最終的な目標は、(完全に動作する)このコードを理解することですので、既存の(実行可能コードを使用して)コードに適用することができます。

App.java

import com.lmax.disruptor.*; 
import com.lmax.disruptor.dsl.*; 

import java.util.concurrent.Executor; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ThreadPoolExecutor; 

public class App { 

    private final static int RING_SIZE = 1024 * 8; 

    private static long handleCount = 0; 

    private final static long ITERATIONS = 1000L * 1000L * 300L; 
    private final static int NUM_EVENT_PROCESSORS = 8; 

    private final static EventHandler<ValueEvent> handler = 
     new EventHandler<ValueEvent>() { 
     public void onEvent(final ValueEvent event, 
           final long sequence, 
           final boolean endOfBatch) throws Exception { 
     handleCount++; 
    } 
    }; 

    public static void main(String[] args) { 
    System.out.println("Starting disruptor app."); 

    ExecutorService executor = Executors.newFixedThreadPool(NUM_EVENT_PROCESSORS); 

    Disruptor<ValueEvent> disruptor = 
     new Disruptor<ValueEvent>(ValueEvent.EVENT_FACTORY, executor, 
      new SingleThreadedClaimStrategy(RING_SIZE), 
      new SleepingWaitStrategy()); 
    disruptor.handleEventsWith(handler); 
    RingBuffer<ValueEvent> ringBuffer = disruptor.start(); 

    long start = System.currentTimeMillis(); 

     long sequence; 
     ValueEvent event; 
    for (long x=0; x<ITERATIONS; x++) { 
     sequence = ringBuffer.next(); 
     event = ringBuffer.get(sequence); 
     event.setValue(x); 
     ringBuffer.publish(sequence); 
     } 
    final long expectedSequence = ringBuffer.getCursor(); 

    while (handleCount < expectedSequence) { } 

    long opsPerSecond = (ITERATIONS * 1000L)/(System.currentTimeMillis() - start); 
    System.out.printf("op/s: %d, handled: %d", opsPerSecond, handleCount); 
    } 
} 

更新:

は、ここで問題のコードですかく乱スレッドの産卵を処理しているならば、どのように私はそれに私の既存の実行可能なクラスを提出することができますか?またはコードをもう一度書き直す必要がありますか?申し訳ありませんが、混乱している人が既存のコードで作業する場合や、私が完全に自分のものを変更する必要がある場合は混乱します。

+0

実行者は明らかに実行するスレッドを生成します。それはDisruptorに渡されます。あなたはそのクラスに関する情報を提供しませんが、executorにアクセスできるので、executorにジョブを渡すことができます。 – user1252434

+2

1秒間に100,000以上のタスクを処理するだけであれば、ExecutorServiceを単独で使用する方が簡単です。毎秒何百万ものタスクを処理できるなら、Disruptorは良い解決策です。このような高水準のパフォーマンスをサポートする必要がある場合は、コードを少し変更して処理モデルに合わせても問題ありません。私はあなたが必要とするものが、ExecutorServiceだけを使用することで可能な最も簡単な解決策であると考えています。 –

+0

@PeterLawrey私のプログラムは単純な数学プログラムですが、共有ハッシュマップを参照しています。問題は、毎秒数百万回のルックアップがあることです(また、それらのルックアップに基づいていくつかの書き込みが行われますが、私はしばらくredisを使用していましたが、60,000に達しました。私は混乱が法案に合うと思ったが、あなたの権利はExecutorServiceを試してからそこから行くと思う。 –

答えて

7

あなたが疑うように、実際のスレッド処理(作業項目の送信による)は、Disruptorの内部で行われます。だから、あなたは(あなたの運に、それがオープンソースである)its source codeを見る必要があり、これを見つけるために:

public RingBuffer<T> start() 
{ 
    EventProcessor[] gatingProcessors = eventProcessorRepository.getLastEventProcessorsInChain(); 
    ringBuffer.setGatingSequences(Util.getSequencesFor(gatingProcessors)); 

    checkOnlyStartedOnce(); 
    for (EventProcessorInfo<T> eventProcessorInfo : eventProcessorRepository) 
    { 
     executor.execute(eventProcessorInfo.getEventProcessor()); 
    } 

    return ringBuffer; 
} 
+0

Java初心者が既存の(実行可能ファイルを使用して)コードと混乱を起こす方法はありますか?それとも、混乱者固有の実行のためにすべてを書き直す必要があるのでしょうか? –

+0

@learningJava、Disruptorの経験がないので、あなたの質問に答えることはできません。 [そのイントロ](http://code.google.com/p/disruptor/)からは、スレッド間のメッセージングに特化した高性能スレッドプラットフォームのようです。だから、一般的には現時点ではあなたにとって過度のものかもしれません。まず、標準のExecutorフレームワークを学び、使用することをお勧めします。 –

+0

おかげでPeter、私はあなたの権利を考えています。私は障害物がとても違っていたことに気づいていませんでした。私はスレッド間で変数をすばやく共有できるように、コードにプラグインしたライブラリだと思っていました。あまりにも多くのサンプルコードを見つけることができなかったので、ビデオの高レベルの会話に基づいています。 –

関連する問題