1
基本的には、マルチスレッドを使用している大量のcsvファイルをほぼ100万レコードで処理する必要があります。ExecutorServiceは正常に動作しませんが、デバッグモードで正常に動作します
私は私は私が20のレコードの小さなcsvファイルでテスト
public class IngestionThread implements Runnable {
InputStream is;
long startPosition;
long length;
public IngestionThread(InputStream targetStream, long position, long length) {
this.is = targetStream;
this.startPosition = position;
this.length = length;
}
@Override
public void run() {
// TODO Auto-generated method stub
int currentPosition = 0;
try {
is.reset();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
BufferedReader fileInputStreamBufferedReader = new BufferedReader(new InputStreamReader(is));
if (startPosition != 0) {
String line;
try {
while (((line = fileInputStreamBufferedReader.readLine())) != null) {
if (currentPosition + 1 == startPosition)
break;
currentPosition++;
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
try {
int execLength = 0;
String line;
while ((line = fileInputStreamBufferedReader.readLine()) != null && execLength < length) {
System.out.println(line);
execLength++;
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
を実装している別のRunnableクラスを呼び出すために使用
public class IngestionCallerThread {
public static void main(String[] args) {
try {
int count = 0;
InputStream ios = IngestionCallerThread.class.getClassLoader().getResourceAsStream("aa10.csv");
byte[] buff = new byte[8000];
int bytesRead = 0;
ByteArrayOutputStream bao = new ByteArrayOutputStream();
while ((bytesRead = ios.read(buff)) != -1) {
bao.write(buff, 0, bytesRead);
}
byte[] data = bao.toByteArray();
ByteArrayInputStream bin = new ByteArrayInputStream(data);
BufferedReader fileInputStreamBufferedReader = new BufferedReader(new InputStreamReader(bin));
while ((fileInputStreamBufferedReader.readLine()) != null) {
count++;
}
bin.reset();
int numberOfThreads = 12;
int rowsForEachThread = count/numberOfThreads;
int remRows = count % numberOfThreads;
int startPosition = 0;
System.out.println(count);
ExecutorService es = Executors.newCachedThreadPool();
for (int i = 0; i < numberOfThreads && startPosition < count; i++) {
if (remRows > 0 && i + 1 >= numberOfThreads)
rowsForEachThread = remRows;
IngestionThread ingThread = new IngestionThread(bin, startPosition, rowsForEachThread);
es.execute(ingThread);
startPosition = (startPosition + rowsForEachThread);
}
es.shutdown();
if (es.isTerminated()) {
System.out.println("Completed");
}
// t2.start();
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
IngestionCallerThread
クラスを作成しました。問題は、ほとんどすべてのレコードが印刷されているクラスをデバッグするときです。しかし、私はクラスを実行するときに15レコードが読み込まれることがあります、時には12レコードが読み込まれます。私は何が問題なのか分かりません。どんな助けでも大歓迎です。前もって感謝します。
問題が修正され、魅力的な働きをしました..ありがとうございます.. :) –