2013-08-09 8 views
9

いくつかの文字列を見た回数を追跡するためにScalaで可変マップを使用したいとします。シングルスレッドのコンテキストでは、これは簡単です:変更可能なマップの値をスレッドセーフに変換する

import scala.collection.mutable.{ Map => MMap } 

class Counter { 
    val counts = MMap.empty[String, Int].withDefaultValue(0) 

    def add(s: String): Unit = counts(s) += 1 
} 

残念ながら、これはスレッドセーフを、getupdateがアトミックに発生しませんので、ではありません。

import scala.concurrent.stm._ 

class Counter { 
    val counts = TMap.empty[String, Int] 

    def add(s: String): Unit = atomic { implicit txn => 
    counts(s) = counts.get(s).getOrElse(0) + 1 
    } 
} 
:私は ScalaSTMさん​​を使用することができます知っている

def replace(k: A, f: B => B): Option[B] 

Concurrent maps次のようになり、私は必要なものを、変更可能なマップのAPIにa few atomic operationsを追加ではなく、

しかし、(今のところ)それはまだ余分な依存関係です。他のオプションには、アクター(別の依存関係)、同期(潜在的に効率の悪い)、Javaのatomic referencesless idiomatic)などがあります。

一般的に私はScalaの可変マップを避けたいと思っていますが、時にはこの種のものが必要でした。最近では、私の指を横切るのではなく、ナイーブな解決策に噛まれている)。

私は(など明快対パフォーマンス、対余分な依存関係)のトレードオフの数がここにあります知っているが、Scalaの2.10で、この問題を「右」の答えのようなものはありますか?

+1

trait SingleThreadedExecutionContext { val ec = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()) } class Counter extends SingleThreadedExecutionContext { private val counts = MMap.empty[String, Int].withDefaultValue(0) def get(s: String): Future[Int] = future(counts(s))(ec) def add(s: String): Future[Unit] = future(counts(s) += 1)(ec) } 

テストは次のようになりますか? 'Counter.add'は単にそれにfire-and-forgetメッセージを送ります。読書に関しては、あなたの必要に応じて、同時に起こることも、俳優を通過することもできます。 – gourlaysama

答えて

3

最も簡単な解決策は間違いなく同期です。多すぎる競合がなければ、パフォーマンスはそれほど悪くないかもしれません。

それ以外の場合は、独自のSTM-like replaceの実装をロールアップすることができます。これは次のようなことがあります。

object ConcurrentMapOps { 
    private val rng = new util.Random 
    private val MaxReplaceRetryCount = 10 
    private val MinReplaceBackoffTime: Long = 1 
    private val MaxReplaceBackoffTime: Long = 20 
} 
implicit class ConcurrentMapOps[A, B](val m: collection.concurrent.Map[A,B]) { 
    import ConcurrentMapOps._ 
    private def replaceBackoff() { 
    Thread.sleep((MinReplaceBackoffTime + rng.nextFloat * (MaxReplaceBackoffTime - MinReplaceBackoffTime)).toLong) // A bit crude, I know 
    } 

    def replace(k: A, f: B => B): Option[B] = { 
    m.get(k) match { 
     case None => return None 
     case Some(old) => 
     var retryCount = 0 
     while (retryCount <= MaxReplaceRetryCount) { 
      val done = m.replace(k, old, f(old)) 
      if (done) { 
      return Some(old) 
      } 
      else {   
      retryCount += 1 
      replaceBackoff() 
      } 
     } 
     sys.error("Could not concurrently modify map") 
    } 
    } 
} 

ここで、衝突の問題は特定のキーに限定されています。 2つのスレッドが同じマップにアクセスするが異なるキーで作業する場合、あなたは何の衝突を持っていないだろうと置換操作は、必ず最初に成功します。衝突が検出された場合は、少し待ってから(同じキーでスレッドが永久に戦う可能性を最小限に抑えるため、ランダムな時間をとってください)、やり直してください。

私は、これは生産対応(私はちょうど今それを投げ)であることを保証することはできませんが、それはトリックを行う可能性があります。

UPDATE:もちろん(はIonut G.スタンが指摘したように)あなたが望むすべてが増分の場合は、/値をデクリメント、JavaのConcurrentHashMapはすでにロックフリーでthoses操作を提供します。あなたは、パラメータとして変換関数を取ると、より一般的なreplace方法が必要な場合は 私の上記の溶液が適用されます。

+0

私はMapコードに気づき、ThreadLocalRandomに切り替えました。https://github.com/scala/scala/blob/master/src/library/scala/collection/concurrent/TrieMap.scala#L473 –

10

これはどうですか。あなたが今実際に一般的なreplaceメソッドを必要としないと仮定すると、カウンタだけです。

マップ全体の同期よりもパフォーマンスが向上し、アトミック・インクリメントも得られます。

+0

ありがとうございます。一般的なケースですが、これは簡単であることがわかりました。 –

+0

これは適切なソリューションであり、非常に高性能なJava並行ライブラリを活用しています。 –

+1

concurrent.TheMapの代わりにConcurrentHashMapに到達する理由があるのなら、私は不思議です。私は、フォーラムがAPIの広告であるという意見はありません。 –

2

マップがちょうどヴァルとしてそこに座っている場合は、トラブルを求めています。あなたのユースケースを満たしている場合は、低コンテンションの用途向けに

class Counter { 
    private[this] myCounts = MMap.empty[String, Int].withDefaultValue(0) 
    def counts(s: String) = myCounts.synchronized { myCounts(s) } 
    def add(s: String) = myCounts.synchronized { myCounts(s) += 1 } 
    def getCounts = myCounts.synchronized { Map[String,Int]() ++ myCounts } 
} 

のようなものをお勧めします。競合率が高い場合は、そのような使用をサポートするように設計されたコンカレントマップ(例:java.util.concurrent.ConcurrentHashMap)を使用し、値をAtomicWhateverにラップしてください。

2

あなたは将来ベースのインターフェイスで作業しても大丈夫です場合:どのような変更可能なマップに書き込み、単一アッカの俳優について

class MutableMapSpec extends Specification { 

    "thread safe" in { 

    import ExecutionContext.Implicits.global 

    val c = new Counter 
    val testData = Seq.fill(16)("1") 
    await(Future.traverse(testData)(c.add)) 
    await(c.get("1")) mustEqual 16 
    } 
} 
+0

これはスレッドセーフではありません。一度に1つのライターを保証する一方で、マップが変更されている間にスレッドを読み取ることができます –

+0

私が理解するように、ecをコンテキストとして使用するすべての操作はスレッドセーフです。そのコンテキスト外のOpsはスレッドセーフではありません。この理解が正しいならば、他の人から聞いてうれしいでしょう。 –

+0

しかし、問題は、読み込みは直接行われます: 'c.counts'にアクセスすると、' ExecutionContext'をまったく使用していないことになります。 –