2017-04-14 35 views
1

基本的には、マルチスレッドを使用している大量のcsvファイルをほぼ100万レコードで処理する必要があります。ExecutorServiceは正常に動作しませんが、デバッグモードで正常に動作します

私は私は私が20のレコードの小さなcsvファイルでテスト

public class IngestionThread implements Runnable { 

InputStream is; 
long startPosition; 
long length; 

public IngestionThread(InputStream targetStream, long position, long length) { 
    this.is = targetStream; 
    this.startPosition = position; 
    this.length = length; 
} 

@Override 
public void run() { 
    // TODO Auto-generated method stub 
    int currentPosition = 0; 
    try { 
     is.reset(); 
    } catch (IOException e1) { 
     // TODO Auto-generated catch block 
     e1.printStackTrace(); 
    } 
    BufferedReader fileInputStreamBufferedReader = new BufferedReader(new InputStreamReader(is)); 
    if (startPosition != 0) { 
     String line; 
     try { 
      while (((line = fileInputStreamBufferedReader.readLine())) != null) { 
       if (currentPosition + 1 == startPosition) 
        break; 
       currentPosition++; 
      } 
     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
    try { 
     int execLength = 0; 
     String line; 
     while ((line = fileInputStreamBufferedReader.readLine()) != null && execLength < length) { 
      System.out.println(line); 
      execLength++; 
     } 
    } catch (IOException e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 
} 

} 

を実装している別のRunnableクラスを呼び出すために使用

public class IngestionCallerThread { 

public static void main(String[] args) { 

    try { 
     int count = 0; 
     InputStream ios = IngestionCallerThread.class.getClassLoader().getResourceAsStream("aa10.csv"); 

     byte[] buff = new byte[8000]; 

     int bytesRead = 0; 
     ByteArrayOutputStream bao = new ByteArrayOutputStream(); 

     while ((bytesRead = ios.read(buff)) != -1) { 
      bao.write(buff, 0, bytesRead); 
     } 

     byte[] data = bao.toByteArray(); 

     ByteArrayInputStream bin = new ByteArrayInputStream(data); 
     BufferedReader fileInputStreamBufferedReader = new BufferedReader(new InputStreamReader(bin)); 

     while ((fileInputStreamBufferedReader.readLine()) != null) { 
      count++; 
     } 
     bin.reset(); 

     int numberOfThreads = 12; 
     int rowsForEachThread = count/numberOfThreads; 
     int remRows = count % numberOfThreads; 
     int startPosition = 0; 
     System.out.println(count); 
     ExecutorService es = Executors.newCachedThreadPool(); 
     for (int i = 0; i < numberOfThreads && startPosition < count; i++) { 
      if (remRows > 0 && i + 1 >= numberOfThreads) 
       rowsForEachThread = remRows; 

      IngestionThread ingThread = new IngestionThread(bin, startPosition, rowsForEachThread); 
      es.execute(ingThread); 
      startPosition = (startPosition + rowsForEachThread); 
     } 
     es.shutdown(); 
     if (es.isTerminated()) { 
      System.out.println("Completed"); 
     } 
     // t2.start(); 
    } catch (IOException e1) { 
     // TODO Auto-generated catch block 
     e1.printStackTrace(); 
    } 

    catch (Exception e) { 
     // TODO Auto-generated catch block 
     e.printStackTrace(); 
    } 

} 

} 

IngestionCallerThread

クラスを作成しました。問題は、ほとんどすべてのレコードが印刷されているクラスをデバッグするときです。しかし、私はクラスを実行するときに15レコードが読み込まれることがあります、時には12レコードが読み込まれます。私は何が問題なのか分かりません。どんな助けでも大歓迎です。前もって感謝します。

答えて

2

ByteArrayInputStreamを共有する異なるBufferedReaderオブジェクトから読み取ったスレッド数が問題の原因です。同期はありません。これは、異なるスレッドが、他のスレッドが読み込むはずのストリームのセクションを読み込むことを意味します。

各スレッドにはそれぞれByteArrayInputStreamが必要です。

+0

問題が修正され、魅力的な働きをしました..ありがとうございます.. :) –

関連する問題