2017-10-06 1 views
0

を動作していませんスーパーバイザはコンシューマを再起動できます。アッカカフカストリームsupervison戦略は、私がアッカストリームカフカのアプリケーションを実行していると私は、そのようなブローカーがダウンして、ストリームの消費者は、停止タイムアウト後に死亡した場合、そのストリームの消費者の監督の戦略を組み込みたい

UserEventStream

import akka.actor.{Actor, PoisonPill, Props} 
import akka.kafka.{ConsumerSettings, Subscriptions} 
import akka.kafka.scaladsl.Consumer 
import akka.stream.scaladsl.Sink 
import akka.util.Timeout 
import org.apache.kafka.clients.consumer.ConsumerConfig 
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer} 

import scala.concurrent.duration._ 
import scala.concurrent.ExecutionContext.Implicits.global 
import scala.util.{Failure, Success} 
import akka.pattern.ask 
import akka.stream.ActorMaterializer 

class UserEventStream extends Actor { 

    val settings = Settings(context.system).KafkaConsumers 
    implicit val timeout: Timeout = Timeout(10 seconds) 
    implicit val materializer = ActorMaterializer() 

    override def preStart(): Unit = { 
    super.preStart() 
    println("Starting UserEventStream....s") 
    } 
    override def receive = { 
    case "start" => 
     val consumerConfig = settings.KafkaConsumerInfo 
     println(s"ConsumerConfig with $consumerConfig") 
     startStreamConsumer(consumerConfig("UserEventMessage" + ".c" + 1)) 
    } 

    def startStreamConsumer(config: Map[String, String]) = { 
    println(s"startStreamConsumer with config $config") 

    val consumerSource = createConsumerSource(config) 
    val consumerSink = createConsumerSink() 
    val messageProcessor = context.actorOf(Props[MessageProcessor], "messageprocessor") 

    println("START: The UserEventStream processing") 
    val future = 
     consumerSource 
     .mapAsync(parallelism = 50) { message => 
      val m = s"${message.record.value()}" 
      messageProcessor ? m 
     } 
     .runWith(consumerSink) 
    future.onComplete { 
     case Failure(ex) => 
     println("FAILURE : The UserEventStream processing, stopping the actor.") 
     self ! PoisonPill 
     case Success(ex) => 
    } 
    } 

    def createConsumerSource(config: Map[String, String]) = { 
    val kafkaMBAddress = config("bootstrap-servers") 
    val groupID = config("groupId") 
    val topicSubscription = config("subscription-topic").split(',').toList 
    println(s"Subscriptiontopics $topicSubscription") 

    val consumerSettings = ConsumerSettings(context.system, new ByteArrayDeserializer, new StringDeserializer) 
     .withBootstrapServers(kafkaMBAddress) 
     .withGroupId(groupID) 
     .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") 
     .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") 

    Consumer.committableSource(consumerSettings, Subscriptions.topics(topicSubscription: _*)) 
    } 

    def createConsumerSink() = { 
    Sink.foreach(println) 
    } 
} 

StreamProcessorSupervisor(これはUserEventStreamクラスのスーパーバイザークラスです):

import akka.actor.{Actor, Props} 
import akka.pattern.{Backoff, BackoffSupervisor} 
import akka.stream.ActorMaterializer 
import stream.StreamProcessorSupervisor.StartClient 
import scala.concurrent.duration._ 

object StreamProcessorSupervisor { 
    final case object StartSimulator 
    final case class StartClient(id: String) 
    def props(implicit materializer: ActorMaterializer) = 
    Props(classOf[StreamProcessorSupervisor], materializer) 
} 

class StreamProcessorSupervisor(implicit materializer: ActorMaterializer) extends Actor { 
    override def preStart(): Unit = { 
    self ! StartClient(self.path.name) 
    } 

    def receive: Receive = { 
    case StartClient(id) => 
     println(s"startCLient with id $id") 
     val childProps = Props(classOf[UserEventStream]) 
     val supervisor = BackoffSupervisor.props(
     Backoff.onFailure(
      childProps, 
      childName = "usereventstream", 
      minBackoff = 1.second, 
      maxBackoff = 1.minutes, 
      randomFactor = 0.2 
     ) 
    ) 
     context.actorOf(supervisor, name = s"$id-backoff-supervisor") 
     val userEventStrean = context.actorOf(Props(classOf[UserEventStream]),"usereventstream") 
     userEventStrean ! "start" 
    } 
} 

App(メインアプリケーションクラス)ここで

は私の完全なコードです:

は:

kafka { 

    consumer { 

    num-consumers = "1" 
    c1 { 
     bootstrap-servers = "localhost:9092" 
     bootstrap-servers = ${?KAFKA_CONSUMER_ENDPOINT1} 
     groupId = "localakkagroup1" 
     subscription-topic = "test" 
     subscription-topic = ${?SUBSCRIPTION_TOPIC1} 
     message-type = "UserEventMessage" 
     poll-interval = 50ms 
     poll-timeout = 50ms 
     stop-timeout = 30s 
     close-timeout = 20s 
     commit-timeout = 15s 
     wakeup-timeout = 10s 
     max-wakeups = 10 
     use-dispatcher = "akka.kafka.default-dispatcher" 
     kafka-clients { 
     enable.auto.commit = true 
     } 
    } 
    } 
} 

アプリケーションを実行した後、私は意図的カフカブローカーを殺害した後、30秒後に、俳優がポイズンピルを送信することによって、自分自身を停止していることがわかりました。しかし、奇妙なことに、それはBackoffSupervisor戦略で述べたように再開しません。

ここで問題が発生する可能性がありますか?

答えて

0

コード内UserEventStreamの2件のインスタンスがあります。一つはBackoffSupervisorが内部で使用すると、それに渡すPropsを作成し、他はStreamProcessorSupervisorの子であるval userEventStreanある子役ですが。あなたはそのメッセージを前者に送るべきときに、後者に"start"メッセージを送ります。

BackoffSupervisorが子アクターを作成するため、val userEventStreanは必要ありません。 BackoffSupervisorに送信されたメッセージは、そう、子供に"start"メッセージを送るにそれを送信するために、子供に転送されBackoffSupervisor

class StreamProcessorSupervisor(implicit materializer: ActorMaterializer) extends Actor { 
    override def preStart(): Unit = { 
    self ! StartClient(self.path.name) 
    } 

    def receive: Receive = { 
    case StartClient(id) => 
     println(s"startCLient with id $id") 
     val childProps = Props[UserEventStream] 
     val supervisorProps = BackoffSupervisor.props(...) 
     val supervisor = context.actorOf(supervisorProps, name = s"$id-backoff-supervisor") 
     supervisor ! "start" 
    } 
} 

他の問題は、俳優がPoisonPillを受信したとき、それは同じではないということですその俳優が例外を投げるようなもの。したがってBackoff.onFailureは、UserEventStreamPoisonPillを送信したときにトリガーされません。 PoisonPillので代わりにBackoff.onStopを使用し、俳優を停止します。

val supervisorProps = BackoffSupervisor.props(
    Backoff.onStop(// <--- use onStop 
    childProps, 
    ... 
) 
) 
val supervisor = context.actorOf(supervisorProps, name = s"$id-backoff-supervisor") 
supervisor ! "start" 
+0

も、上記の子供が停止された後、監督者が子供に「開始」メッセージを送信していない変更行った後。子供は再び作成されますが。監督者はどのようにメッセージを子供に送ることができますか – Deepakkumar

関連する問題