0
私は100スレッドしか持っていないので、12スレッドしか処理できません。これらのスレッドの完了後、他の12個のスレッドは処理されなければならないが、それは最初の12個のスレッドしか処理していないが、その後は終了する。
ここ は私のロジックです:Akkaスレッド調整
class AkkaProcessing extends Actor {
def receive = {
case message: List[Any] =>
var meterName = message(0) // It Contains only 12 threads , it process them and terminates. Am unable to get remaining threads
val sqlContext = message(1).asInstanceOf[SQLContext]
val FlagDF = message(2).asInstanceOf[DataFrame]
{
All the business logic here
}
context.system.shutdown()
}
}
}
object Processing {
def main(args: Array[String]) = {
val rawBuff = new ArrayBuffer[Any]()
val actorSystem = ActorSystem("ActorSystem") // Creating ActorSystem
val actor = actorSystem.actorOf(Props[AkkaProcessing].withRouter(RoundRobinPool(200)), "my-Actor")
implicit val executionContext = actorSystem.dispatchers.lookup("akka.actor.my-dispatcher")
for (i <- 0 until meter_list.length) {
var meterName = meter_list(i) // All 100 Meters here
rawBuff.append(meterName, sqlContext, FlagDF)
actor ! rawBuff.toList
}
}
}
非常に私はあなたが2つの役者タイプを作成するのが最善かもしれないと思う