2017-06-22 7 views
0

私は100スレッドしか持っていないので、12スレッドしか処理できません。これらのスレッドの完了後、他の12個のスレッドは処理されなければならないが、それは最初の12個のスレッドしか処理していないが、その後は終了する。
ここ は私のロジックです:Akkaスレッド調整

class AkkaProcessing extends Actor { 
    def receive = { 
case message: List[Any] => 
var meterName = message(0) // It Contains only 12 threads , it process them and terminates. Am unable to get remaining threads 

val sqlContext = message(1).asInstanceOf[SQLContext] 
val FlagDF = message(2).asInstanceOf[DataFrame] 
     { 

       All the business logic here 
      } 

     context.system.shutdown() 
    } 
    } 
} 
object Processing { 
    def main(args: Array[String]) = { 
    val rawBuff = new ArrayBuffer[Any]() 
    val actorSystem = ActorSystem("ActorSystem") // Creating ActorSystem 
    val actor = actorSystem.actorOf(Props[AkkaProcessing].withRouter(RoundRobinPool(200)), "my-Actor") 
    implicit val executionContext = actorSystem.dispatchers.lookup("akka.actor.my-dispatcher") 

    for (i <- 0 until meter_list.length) { 

    var meterName = meter_list(i)  // All 100 Meters here 

    rawBuff.append(meterName, sqlContext, FlagDF) 
    actor ! rawBuff.toList 
    } 
    } 
    } 

非常に私はあなたが2つの役者タイプを作成するのが最善かもしれないと思う

答えて

0

を高く評価し、任意の入力:12個のスレッドのタスクを取る(並列で実行)、消費者やコーディネーターを(それらを消費者に渡す)。コーディネーターは、消費者が終了してから次のバッチを実行するまで待つ。あなただけの同様に先物を使用することができ、それに失敗Can Scala actors process multiple messages simultaneously?

は、コード例については、この答えを参照してください。