2017-11-08 5 views
-1

特定のソース(たとえばKafka)から消費していて、定期的に収集したメッセージを(たとえばS3に)ダンプしています。 - consumedump私は2つのスレッドを持ちたいJavaがクラス内で複数のスレッドを開始する

public class ConsumeAndDump { 
    private List<String> messages; 

    public ConsumeAndDump(){ 
     messages = new ArrayList<>(); 
     // initialize required resources 
    } 

    public void consume(){ 
     // this runs continuously and keeps consuming from the source. 
     while(true){ 
     final String message = ...// consume from Kafka 
     messages.add(message); 
     } 
    } 

    public void dump(){ 
     while(true){ 
     final String allMessages = String.join("\n", messages); 
     messages.clear(); // shown here simply, but i am synchronising this to avoid race conditions 
     // dump to destination (file, or S3, or whatever) 
     TimeUnit.SECONDS.sleep(60); // sleep for a minute 
     } 
    } 

    public void run() { 
     // This is where I don't know how to proceed. 
     // How do I start consume() and dump() as separate threads? 
     // Is it even possible in Java? 

     // start consume() as thread 
     // start dump() as thread 
     // wait for those to finish 
    } 
} 

次のように私のクラス定義があります。 consumeは継続的に実行する必要がありますが、dumpは定期的にウェイクアップしてメッセージをダンプし、バッファをクリアしてから再びスリープ状態に戻ります。

スレッドとしてconsume()dump()を開始する際に問題が発生しています。正直なところ、私はそれをどうやって行うのか分かりません。メンバーメソッドをスレッドとして実行できますか?または、の別のRunnableクラスを使用して、ダンプを消費する必要がありますか?もしそうなら、それらの間でどのようにmessagesを共有するのですか?

+0

古典的なプロデューサーのコンシューマーのシナリオがあるようですが、[ソフトウェアエンジニアリングに関する質問からの私の答え](https://softwareengineering.stackexchange.com/a/337332/250821)が役立つかもしれません。 –

答えて

3

まず、ArrayListを実際に使用することはできません。 ArrayListはスレッドセーフではありません。例えば、BlockingQueueをチェックしてください。あなたは背圧のようなものに対処しなければなりません。無制限キューを使用しないでください。

スレッドを開始するのはかなり簡単ですが、ラムダを使用することができます。

public void run() { new Thread(this::consume).start(); new Thread(this::produce).start(); }

は動作するはずですが、あなたはこれらのプロセスを終了する必要があるとき以上の無制御に少し与えます。

+0

Cool。このようなものを探していた。しかし、どのようにスレッドが終了するのを待つのですか?このように、 'run()'メソッドは、これら2つのスレッドを生成した後に終了するでしょう。 'ラン'を待たせる方法はありますか? 'new Thread()'の戻り値の型がないことがわかります。そのようなものとして、私はそれらのスレッドにオブジェクトをキャッチすることはできません。 – Nik

+3

両方の消費/生産がwhileループにあるため、スレッドは終了しません。 また、 'Thread t = new Thread(this :: consumume);を実行することもできます。 t.start(); ' consume/produceが終了することを期待するならば、' ExecutorService'と 'Executors'を見て、スレッドで実行するようにスケジュールされた個々のタスクをサブミットできるようにしてください。 –

関連する問題