2016-03-22 3 views
0

クライアントからSocket Serverにテキストファイルを送信しようとしています。テキストファイルは、連続した操作で単一のスレッドを使用して読み取られます。私はBlockingQueueを使ってレコードを読み取り操作から取り出し、マルチスレッド・メカニズムを使用してレコードを処理しようとしています。このコードは、ソケット通信がない場合(つまり、ファイルを直接読み取っている場合)には問題ありません。サーバソケット読み込みファイル:BlockingQueueのレコードを処理できません

クライアントコード:

 try{ 
      Socket socket = new Socket(hostName, PORTNUMBER); 
      File file = new File(filePath); 

      byte[] bytes = new byte[16 * 1024]; 
      InputStream in = new FileInputStream(file); 
      OutputStream out = socket.getOutputStream(); 

      int count; 
      while ((count = in.read(bytes)) > 0) { 
       out.write(bytes, 0, count); 
      } 

      out.close(); 
      in.close(); 
      socket.close(); 
     }catch(UnknownHostException e){ 
      e.printStackTrace(); 
     }catch(IOException e){ 
      e.printStackTrace(); 
     } 

サーバコード:

public static final Map<Integer, Account> _resultMap= new ConcurrentHashMap<Integer, Account>(); 

public static void main(String[] args) throws IOException, InterruptedException, ExecutionException { 
    while(true){ 
      if(mode.equals("deamon")){ 
       ServerSocket serverSocket = null; 

       try { 
        serverSocket = new ServerSocket(8888); 
       } catch (IOException ex) { 
        System.out.println("Unable to start up server on port 8888"); 
       } 

       Socket socket = null; 

       try { 
        socket = serverSocket.accept(); 
       } catch (IOException ex) { 
        System.out.println("Unable to accept client connection"); 
       } 


       final int threadCount = 8; 
       final BlockingQueue<String> queue = new LinkedBlockingQueue<String>(200); 


       ExecutorService service = Executors.newFixedThreadPool(threadCount); 
       for (int i = 0; i < (threadCount - 1); i++) { 
        service.submit(new CalcTask(queue)); 
       } 
       service.submit(new ReadFileTask(queue, socket)).get(); 
       System.out.println(queue.size()); // Can see the count here 

       service.shutdownNow(); 
       service.awaitTermination(365, TimeUnit.DAYS); 


       for (Map.Entry<Integer, String> e : Map.entrySet()) { 
        System.out.println(e.getKey()); 
       } 

       socket.close(); 
       serverSocket.close(); 
      } 
    } 

CalcTask:

Class CalcTask implements Runnable { 
private final BlockingQueue<String> queue; 

public CalcTask(BlockingQueue<String> queue) { 
    this.queue = queue; 
} 

@Override 
public void run() { 
    String line; 

    while (true) { 
     try { 
      //Queue is empty - so this block is not executing. 
      line = queue.take(); 
      procesThis(line); 
     } catch (InterruptedException ex) { 
      ex.printStackTrace(); 
      break; 
     } 
    } 
    while ((line = queue.poll()) != null) { 
     procesThis(line); 
    } 
} 
} 

ReadFileTask:

class ReadFileTask implements Runnable { 
private final BlockingQueue<String> queue; 
private Socket socket; 


public ReadFileTask(BlockingQueue<String> queue, Socket socket) { 
    this.queue = queue; 
    this.socket = socket; 
} 

@Override 
public void run() { 
    BufferedReader br = null; 
    try { 
     br = new BufferedReader(new InputStreamReader(socket.getInputStream())); 
     for (String line; (line = br.readLine()) != null;) { 
      queue.put(line); 
     } 
    } catch (IOException e) { 
     e.printStackTrace(); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } finally { 
     try { 
      if (br != null) br.close(); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
    } 
} 
} 

このアプローチで何が間違っているのかを確認して教えてください。

答えて

0
  • サーバーソケットを再作成しないでください。初期化をループの前に移動します。例外とクライアントの接続拒否をバインドする危険性のある方法。

  • 同様に、ior shutdownをループ内に作成しないでください。

+0

ありがとうございました。私があなたが提案したように、初期化を先頭に移動しました。そして、私はループも削除しました(そのため、クライアント接続は1つしか終了しません)。しかし、私は、CalcTaskのスレッドが実行された瞬間、共有されたBlockingQueueが空であることを発見しました。どのように私はこれをデバッグすることができるかについての任意のポインター? – StanleyV

関連する問題