2011-12-06 5 views
0

並列コレクションを使用する行列乗算アルゴリズムを書いて、乗算を高速化しました。Akka - 負荷分散とプロセッサのフル使用

それはそのように行く:今

(0 until M1_ROWS).grouped(PARTITION_ROWS).toList.par.map(i => 
    singleThreadedMultiplicationFAST(i.toArray.map(m1(_)), m2) 
).reduce(_++_) 

私はので、私がやったことで、アッカで同じことをしたいと思っ:それが判明

val multiplyer = actorOf[Pool] 
multiplyer start 
val futures = (0 until M1_ROWS).grouped(PARTITION_ROWS).map(i => 
    multiplyer ? MultiplyMatrix(i.toArray.map(m1(_)), m2) 
) 
futures.map(_.get match { case res :Array[Array[Double]] => res }).reduce(_++_) 

class Multiplyer extends akka.actor.Actor{ 
    protected def receive = { 
    case MultiplyMatrix(m1, m2) => self reply singleThreadedMultiplicationFAST (m1,m2) 
    } 
} 
class Pool extends Actor with DefaultActorPool 
    with FixedCapacityStrategy with RoundRobinSelector { 

    def receive = _route 
    def partialFill = false 
    def selectionCount = 1 
    def instance = actorOf[Multiplyer] 
    def limit = 32 // I tried 256 with no effect either 
} 

、このアルゴリズムの俳優ベースのバージョン私のi7 sandybridgeでは の200%のみを使用していますが、並列コレクションのバージョンはプロセッサの600%を使用する で4〜5倍高速です。 は、私はそれがディスパッチャかもしれないと思ったし、これを試してみました:

self.dispatcher = Dispatchers.newThreadBasedDispatcher(self, mailboxCapacity = 100) 

と、この(私は俳優の間でこれを共有):

val messageDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("d1") 
    .withNewBoundedThrea dPoolWithLinkedBlockingQueueWithUnboundedCapacity(100) 
    .setCorePoolSize(16) 
    .setMaxPoolSize(128) 
    .setKeepAliveTimeInMillis(60000).build 

しかし、私はすべての変更を観察しませんでした。まだ200%プロセッサーの使用のみであり、 アルゴリズムはパラレルコレクション バージョンより4〜5倍遅いです。

私は私が助けてください愚かな何かをやっていると確信しています!!! :)

答えて

2

この式:

val futures = (0 until M1_ROWS).grouped(PARTITION_ROWS).map(i => 
    multiplyer ? MultiplyMatrix(i.toArray.map(m1(_)), m2) 
) 

は怠惰なコレクションを作成しますので、あなたの_.getは、プログラム全体のシリアルになります。 これを解決するには、toListなどを追加してその式を厳密にすることです。

関連する問題