2016-04-22 19 views
4

私はjava nioインターフェイスを介してファイルをストリームに直接読み込んでいます。これにより、非同期HTTP要求が起動され、将来これらの要求が処理されます。 10,000レコードごとに、この結果をサーバーにアップロードしてレコードをクリアするので、メモリ消費量が削減されます。
バイト配列から始まり、常にメモリにとどまります。 httpクライアント(commons CloseableHttpAsyncClient)はリクエストを非同期で起動します。したがって、これらは最初から一斉に起動されます。
同時に処理される行数を制限できるようにラムダストリームを制限する方法はありますか?したがって、私の記憶を制御する。メモリ消費量javaファイルストリーム

new BufferedReader(new InputStreamReader(new ByteArrayInputStream(file))) 
    .lines() 
    .map(line -> CsvLine.create(line)) 
    .filter(line -> !line.isHeader()) 
    .forEach(line -> getResult(line, new FutureCallback<HttpResponse>() { 
     @Override 
     public void completed(HttpResponse response) { 
      try { 
       result.addLine(response); 
      } catch (IOException e) { 
       LOGGER.error("IOException, cannot write to server", e); 
       todo.set(-1); // finish in error 
      } finally { 
       todo.decrementAndGet(); 
      } 
     } 

     @Override 
     public void failed(Exception ex) { 
      handleError(); 
     } 

     @Override 
     public void cancelled() { 
      handleError(); 
     } 
    } 
)); 
+0

ちょうど10000のチャンクでコードを実行できませんか?ストリームをループし、一度10000を押すとすべての呼び出しを実行します。 –

+0

'for'ループを使います。 30行のうちの2行だけが実際の関数型プログラミングである場合、ストリームソリューションとして記述する理由はありません。 – Holger

+0

私は1000万行を処理しており、forループを使用して将来的に大きくなりたいメモリ内ですべてのメモリを消費する – jelle

答えて

1

セマフォを使用してストリームを絞り込んで、一度に特定の最大非同期要求のみが未処理になるように試みることがあります。

Semaphore semaphore = new Semaphore(MAX_CONCURRENT_REQUESTS, true); // false if FIFO is not important 
new BufferedReader(new InputStreamReader(new ByteArrayInputStream(file))) 
.lines() 
     .map(line -> CsvLine.create(line)) 
     .filter(line -> !line.isHeader()) 
     .forEach(line -> { 
      try { 
       if (!semaphore.tryAcquire(ASYNC_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS)) { 
        handleTimeout(); 
       } else { 
        getResult(line, new FutureCallback<HttpResponse>() { 
         @Override 
         public void completed(HttpResponse response) { 
          try { 
           result.addLine(response); 
          } catch (IOException e) { 
           LOGGER.error("IOException, cannot write to server", e); 
           todo.set(-1); // finish in error 
          } finally { 
           todo.decrementAndGet(); 
           semaphore.release(); 
          } 
         } 

         @Override 
         public void failed(Exception ex) { 
          handleError(); 
          semaphore.release(); 
         } 

         @Override 
         public void cancelled() { 
          handleError(); 
          semaphore.release(); 
         } 
        } 
        ); 
       } 
      } catch (InterruptedException e) { 
       // handle appropriately 
      } 

     }); 
+0

私はこれで少しこれを解決しました、私はそれに応じて私のコードをリファクタリングします。ありがとう – jelle

関連する問題