2016-05-19 22 views
0

mutable.ListBufferで何か問題が起きていることがわかりましたが、修正方法や問題の適切な説明がわかりません。Scala:ListBufferを追加して並列実行すると、予期しない結果が生成されない

以下のコードを単純化して動作を再現しました。

私は、最初のリストが処理されるときに、基本的に並行して関数を実行して、リストに要素を追加しようとしています。私は "失う"要素に終わる。

返し
import java.util.Properties 

import scala.collection.mutable.ListBuffer 
import scala.concurrent.duration.Duration 
import scala.concurrent.{Await, Future} 


import scala.concurrent.{ExecutionContext} 
import ExecutionContext.Implicits.global 


object MyTestObject { 

    var listBufferOfInts = new ListBuffer[Int]() // files that are processed 

    def runFunction(): Int = { 
    listBufferOfInts = new ListBuffer[Int]() 
    val inputListOfInts = 1 to 1000 
    val fut = Future.traverse(inputListOfInts) { i => 
     Future { 
     appendElem(i) 
     } 
    } 
    Await.ready(fut, Duration.Inf) 
    listBufferOfInts.length 
    } 

    def appendElem(elem: Int): Unit = { 
    listBufferOfInts ++= List(elem) 
    } 
} 

MyTestObject.runFunction() 
MyTestObject.runFunction() 
MyTestObject.runFunction() 

明らか
res0: Int = 937 
res1: Int = 992 
res2: Int = 997 

を私は1000はすべての時間を返されることを期待します。私のコードを修正して "アーキテクチャ"を維持するが、ListBufferを "同期"させるにはどうすればよいですか?

synchronized { 
    listBufferOfInts ++= List(elem) 
} 

listBufferOfInts ++= List(elem) 

を変更

答えて

2

I私はこれを取るとき

は私が正しく期待していたすべてのイベントを収集しますあなたがそれを単純化したと言いましたが、正確な問題は何か分かりませんが、依然として明白な競合状態です。複数のスレッドが単一の変更可能なコレクションを修正します。他の答えが指摘しているように、あるスレッドだけが同時にコレクションを変更できるように、いくつかのロックが必要です。計算が重い場合は、バッファに同期して結果を追加することはパフォーマンスに顕著な影響を与えるべきではありませんが、疑わしい場合は常に測定してください。

しかし、同期は必要ありません。代わりに、varsや可変状態なしで何か他の操作を行うことができます。それぞれFutureがあなたの部分的な結果を返してからリストにマージするようにしてください。実際にはFuture.traverseだけです。

import scala.concurrent.duration._ 
import scala.concurrent.{Await, Future} 
import scala.concurrent.ExecutionContext.Implicits.global 

def runFunction: Int = { 
    val inputListOfInts = 1 to 1000 

    val fut: Future[List[Int]] = Future.traverse(inputListOfInts.toList) { i => 
    Future { 
     // some heavy calculations on i 
     i * 4 
    } 
    } 

    val listOfInts = Await.result(fut, Duration.Inf) 
    listOfInts.size 
} 

Future.traverseはすでにあなたに結合されたすべての検索結果で不変のリストを与える、可変バッファにそれらを追加する必要はありません。 言うまでもなく、あなたはいつも1000を返すでしょう。

@ List.fill(10000)(runFunction).exists(_ != 1000) 
res18: Boolean = false 
+0

大変ありがとうございます! – Stephane

0

はそれを動作させます。パフォーマンス上の問題になる可能性はありますか?私はまだ説明に興味があり、おそらくもっと良いことをする方法です!

1

私はあなたが正しく何をしようとしているのかわかりません。おそらく問題は、実際には、あなたがrunFunction内で再初期化するvar ListBufferを共有していることです。

import java.util.Properties 

import scala.collection.mutable.ListBuffer 
import scala.concurrent.duration.Duration 
import scala.concurrent.{ Await, Future } 

import scala.concurrent.{ ExecutionContext } 
import ExecutionContext.Implicits.global 

object BrokenTestObject extends App { 

    var listBufferOfInts = (new ListBuffer[Int]()) 

    def runFunction(): Int = { 
    val inputListOfInts = 1 to 1000 
    val fut = Future.traverse(inputListOfInts) { i => 
     Future { 
     appendElem(i) 
     } 
    } 
    Await.ready(fut, Duration.Inf) 
    listBufferOfInts.length 
    } 

    def appendElem(elem: Int): Unit = { 
    listBufferOfInts.append(elem) 
    } 

    BrokenTestObject.runFunction() 
    BrokenTestObject.runFunction() 
    BrokenTestObject.runFunction() 

    println(s"collected ${listBufferOfInts.length} elements") 
} 

あなたが本当に同期の問題がある場合は、次のようなものを使用することができます:

import java.util.Properties 

import scala.collection.mutable.ListBuffer 
import scala.concurrent.duration.Duration 
import scala.concurrent.{ Await, Future } 

import scala.concurrent.{ ExecutionContext } 
import ExecutionContext.Implicits.global 

class WrappedListBuffer(val lb: ListBuffer[Int]) { 
    def append(i: Int) { 
    this.synchronized { 
     lb.append(i) 
    } 
    } 
} 

object MyTestObject extends App { 

    var listBufferOfInts = new WrappedListBuffer(new ListBuffer[Int]()) 

    def runFunction(): Int = { 
    val inputListOfInts = 1 to 1000 
    val fut = Future.traverse(inputListOfInts) { i => 
     Future { 
     appendElem(i) 
     } 
    } 
    Await.ready(fut, Duration.Inf) 
    listBufferOfInts.lb.length 
    } 

    def appendElem(elem: Int): Unit = { 
    listBufferOfInts.append(elem) 
    } 

    MyTestObject.runFunction() 
    MyTestObject.runFunction() 
    MyTestObject.runFunction() 

    println(s"collected ${listBufferOfInts.lb.size} elements") 
} 
関連する問題