0
クライアントからSocket Serverにテキストファイルを送信しようとしています。テキストファイルは、連続した操作で単一のスレッドを使用して読み取られます。私はBlockingQueueを使ってレコードを読み取り操作から取り出し、マルチスレッド・メカニズムを使用してレコードを処理しようとしています。このコードは、ソケット通信がない場合(つまり、ファイルを直接読み取っている場合)には問題ありません。サーバソケット読み込みファイル:BlockingQueueのレコードを処理できません
クライアントコード:
try{
Socket socket = new Socket(hostName, PORTNUMBER);
File file = new File(filePath);
byte[] bytes = new byte[16 * 1024];
InputStream in = new FileInputStream(file);
OutputStream out = socket.getOutputStream();
int count;
while ((count = in.read(bytes)) > 0) {
out.write(bytes, 0, count);
}
out.close();
in.close();
socket.close();
}catch(UnknownHostException e){
e.printStackTrace();
}catch(IOException e){
e.printStackTrace();
}
サーバコード:
public static final Map<Integer, Account> _resultMap= new ConcurrentHashMap<Integer, Account>();
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
while(true){
if(mode.equals("deamon")){
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(8888);
} catch (IOException ex) {
System.out.println("Unable to start up server on port 8888");
}
Socket socket = null;
try {
socket = serverSocket.accept();
} catch (IOException ex) {
System.out.println("Unable to accept client connection");
}
final int threadCount = 8;
final BlockingQueue<String> queue = new LinkedBlockingQueue<String>(200);
ExecutorService service = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < (threadCount - 1); i++) {
service.submit(new CalcTask(queue));
}
service.submit(new ReadFileTask(queue, socket)).get();
System.out.println(queue.size()); // Can see the count here
service.shutdownNow();
service.awaitTermination(365, TimeUnit.DAYS);
for (Map.Entry<Integer, String> e : Map.entrySet()) {
System.out.println(e.getKey());
}
socket.close();
serverSocket.close();
}
}
CalcTask:
Class CalcTask implements Runnable {
private final BlockingQueue<String> queue;
public CalcTask(BlockingQueue<String> queue) {
this.queue = queue;
}
@Override
public void run() {
String line;
while (true) {
try {
//Queue is empty - so this block is not executing.
line = queue.take();
procesThis(line);
} catch (InterruptedException ex) {
ex.printStackTrace();
break;
}
}
while ((line = queue.poll()) != null) {
procesThis(line);
}
}
}
ReadFileTask:
class ReadFileTask implements Runnable {
private final BlockingQueue<String> queue;
private Socket socket;
public ReadFileTask(BlockingQueue<String> queue, Socket socket) {
this.queue = queue;
this.socket = socket;
}
@Override
public void run() {
BufferedReader br = null;
try {
br = new BufferedReader(new InputStreamReader(socket.getInputStream()));
for (String line; (line = br.readLine()) != null;) {
queue.put(line);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
if (br != null) br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
このアプローチで何が間違っているのかを確認して教えてください。
ありがとうございました。私があなたが提案したように、初期化を先頭に移動しました。そして、私はループも削除しました(そのため、クライアント接続は1つしか終了しません)。しかし、私は、CalcTaskのスレッドが実行された瞬間、共有されたBlockingQueueが空であることを発見しました。どのように私はこれをデバッグすることができるかについての任意のポインター? – StanleyV