2016-09-22 5 views
0

lucene indexing(v6.1)を高速化するために、Slick 3.1(Scala)のデータを分割して分割したい異なるデータセットをスレッドに渡して、インデックス処理を高速化します。私はMySQLからデータをフェッチするためにScalaに次のコードを書いています。Slick 3.1(Scala)のデータを4つの部分に分割するには

class NotesService(val databaseService: DatabaseService)(implicit executionContext: ExecutionContext) extends NoteEntityTable {  
    import databaseService._ 
    import databaseService.driver.api._ 
    import com.github.t3hnar.bcrypt._  
    def getNotes(): Future[Seq[NoteEntity]] = db.run(notes.result)  
} 
case class NoteEntity(id: Option[Long] = None, title: String, teaser: String, description: String) 

コードNotesService

class NotesService(val databaseService: DatabaseService)(implicit executionContext: ExecutionContext) extends NoteEntityTable { 

    import databaseService._ 
    import databaseService.driver.api._ 
    import com.github.t3hnar.bcrypt._ 

    def getNotes(): Future[Seq[NoteEntity]] = db.run(notes.result) 

} 

のための私が使用したNotesServiceからデータをフェッチするには、次の行にここで

def setI = { 
    val NUM_THREADS = Runtime.getRuntime().availableProcessors() 
    val IndexStoreDir = Paths.get("/var/www/html/Index") 
    val analyzer = new StandardAnalyzer() 
    val writerConfig = new IndexWriterConfig(analyzer) 
    writerConfig.setOpenMode(OpenMode.CREATE_OR_APPEND) 
    writerConfig.setRAMBufferSizeMB(500) 
     .setMaxBufferedDocs(2) 
     .setMergeScheduler(new ConcurrentMergeScheduler()) 
    val directory = FSDirectory.open(IndexStoreDir) 
    var writer = new IndexWriter(directory, writerConfig) 

    val threads = Array.ofDim[IndexTh](NUM_THREADS) 
    val notes = notesService.getNotes() 

    for (i <- 0 until NUM_THREADS){ 

     threads(i) = new IndexTh(notesService, writer) 
     //here on this line I want to pass different sets of data to thread. 
    } 
    for (i <- 0 until NUM_THREADS) { 
     threads(i).start() 
     println("Thread " + i + " Started!") 
    } 
    } 

を:

私は分割することができますどのように
threads(i) = new IndexTh(notesService, writer) 

notesからのデータthreaに渡すサービスd? ノート内のデータを複数のチャンクに分割するにはどうすればよいですか? データをこのようにしたいとします。

notesService.getNotes()が20000行のデータを取得するとします。今度は、これらの行を4000行の5つの部分に分割して、4000行ごとのデータを異なるスレッドに渡すことができます。

+0

を?改ページ? – Roman

+0

複数のスレッドに異なるデータセットを渡したい(メインデータセットを分割する) – Sujit

+1

私はまだ質問が何であるか完全に理解していません。多分あなたが必要とする出力の例が役に立つでしょう。 – Roman

答えて

0

は、最後に私は長い時間を研究して答えを見つけた:

スレッドを使用する:

def setI = { 
    val NUM_THREADS = Runtime.getRuntime().availableProcessors() 
    val curNotes = notesService.getNotes() 

    val totalRows = Await.result(curNotes, Duration.Inf).length 
    var totalPages = totalRows/NUM_THREADS 
    if(totalPages != totalPages.toInt){ 
     totalPages = totalPages + 1 
    } 
    var tmp = Await.result(curNotes, Duration.Inf).grouped(totalPages).toList 
    val rows = tmp(tmp.length-2) ++ tmp.last 
    val threads = Array.ofDim[Index](NUM_THREADS) 

    val IndexStoreDir = Paths.get("/var/www/html/LuceneIndex") 
    val analyzer = new StandardAnalyzer() 
    val writerConfig = new IndexWriterConfig(analyzer) 
    writerConfig.setOpenMode(OpenMode.CREATE_OR_APPEND) 
    writerConfig.setRAMBufferSizeMB(500) 
     .setMaxBufferedDocs(10) 
     .setMergeScheduler(new ConcurrentMergeScheduler()) 
    val directory = FSDirectory.open(IndexStoreDir) 
    val writer = new IndexWriter(directory, writerConfig) 
    var count = 0 

    for(i <- 0 until tmp.length - 2){ 
     count = i 
     threads(i) = new Index(tmp(i), writer, i) 
    } 
    count = count + 1 
    threads(count) = new Index(rows, writer, count) 

    for (i <- 0 until NUM_THREADS) { 
     println("Thread :" + threads(i).getName + " => " + (i + 1) + " Started!") 
     threads(i).start() 
    } 
    } 

Scalaの今後の使用:あなたはチャンクとはどういう意味ですか

def setFutureIndex = { 
    val IndexStoreDir = Paths.get("/var/www/html/LuceneIndex") 
    val analyzer = new StandardAnalyzer() 
    val writerConfig = new IndexWriterConfig(analyzer) 
    writerConfig.setOpenMode(OpenMode.CREATE) 
    writerConfig.setRAMBufferSizeMB(500) 
    val directory = FSDirectory.open(IndexStoreDir) 
    val writer = new IndexWriter(directory, writerConfig) 
    val notes = notesService.getNotes() //Gets all notes from slick. Data is coming in getNotes() 
    var doc = new Document() 

    def indexingFuture = { 
     val list = Seq (
     notes.map(_.foreach { 
      case (note) => 
      writeToDoc(note, writer) 
     }) 
    ) 
     Future.sequence(list) 
    } 

    Await.result(indexingFuture, Duration.Inf) 

    /*indexingFuture.onComplete { 
     case Success(value) => println(value) 
     case Failure(e) => e.printStackTrace() 
    }*/ 
    } 

    def writeToDoc(note: NoteEntity, writer: IndexWriter) = Future { 
    println("*****Indexing: " + note.id.get) 
    var doc = new Document() 
    var field = new TextField("title", " {##" + note.id.get + "##} " + note.title, Field.Store.YES) 
    doc.add(field) 

    field = new TextField("teaser", note.teaser, Field.Store.YES) 
    doc.add(field) 

    field = new TextField("description", note.description, Field.Store.YES) 
    doc.add(field) 

    writer.addDocument(doc) 

    writer.commit() 
    println("*****Completed: " + note.id.get) 
    var status = "*****Completed: " + note.id.get 
    } 
関連する問題