2016-07-20 10 views
0

私は "制御"メッセージを送信したいAkka Actorを持っています。 このActorの主な任務は、ループ内のポーリングプロセスであるKafkaキューを待ち受けることです。Akka Actorで安全にスレッドを作成できますか?

私は次のように単純に俳優をロックアップすることがわかってきましたし、それが「停止」(またはその他の)メッセージが表示されません。私はによって開始されたスレッドにループをラップすることができ

class Worker() extends Actor { 
    private var done = false 

    def receive = { 
    case "stop" => 
     done = true 
     kafkaConsumer.close() 
    // other messages here 
    } 

    // Start digesting messages! 
    while (!done) { 
    kafkaConsumer.poll(100).iterator.map { cr: ConsumerRecord[Array[Byte], String] => 
     // process the record 
    ), null) 
    } 
    } 
} 

をアクタからスレッドを開始するのは大丈夫ですか安全ですか?より良い方法がありますか?

+0

絶対にありません!あなたはおそらく "消費者の俳優"のようなものを作りたいと思うでしょう。 [Reactive Kafka](https://github.com/akka/reactive-kafka)を見てください。 –

答えて

2

基本的には、このアクタはブロックされていますが、ルールの大まかな点はアクタ内で決してブロックしないことです。それでもやりたい場合は、このアクタがネイティブのスレッドプールとは別のスレッドプールで実行されていることを確認して、アクターシステムのパフォーマンスに影響を与えないようにしてください。それを行う別の方法は、新しいメッセージをポーリングするためにメッセージを自分自身に送信することです。

1)カフカからのメッセージをポーリングするための受信

新しいメッセージを引っ張って を注文する関連俳優

3)自分自身にメッセージを送信する メッセージを超える2)ハンド

賢い

4)ハンドそれ以上...

コード:

case object PollMessage 

class Worker() extends Actor { 
    private var done = false 

    def receive = { 
    case PollMessage ⇒ { 
     poll() 
     self ! PollMessage 
    } 
    case "stop" => 
     done = true 
     kafkaConsumer.close() 
    // other messages here 
    } 

    // Start digesting messages! 

    def poll() = { 
    kafkaConsumer.poll(100).iterator.map { cr: ConsumerRecord[Array[Byte], String] => 
     // process the record 
    ), null) 
    } 
    } 

} 

俳優をつねにブロックしても停止メッセージが表示されることはありますか?

0

@ルイスF.アンサーを追加する。あなたの俳優の設定に応じて、彼らが受信したすべてのメッセージを、それらが忙しいか、またはそれらをメールボックスと呼ばれるキューに入れ、メッセージが後で処理される(通常はFIFO形式で)場合にドロップします。しかし、この特定のケースでは、俳優にPollMessageを洪水させているので、あなたのメッセージがドロップされないという保証はありません。

関連する問題