2017-07-19 8 views
-1
public class DataMiner { 

private static BigData app = new BigData(); 
private static DomainOfConstants doc = new DomainOfConstants(); 
private static Logger log = Logger.getLogger(DataMiner.class); 
private static DBManager conn = new DBManager(); 
private static java.sql.Connection con = null; 
private static AmazonS3 s3Client; 
private static Iterator<String> itr; 
private static List<String> entries = new ArrayList<String>(); 
private static S3Object s3Object; 
private static ObjectMetadata meta; 
public static InputStream dataStream; 
public static byte[] buffer = new byte[1024]; 
public static File file = new File(app.getCurrentPacsId()+".txt"); 



private static void obtainConnection(){ 
    conn.connection(); 
    entries = conn.grabDataSet();  
    conn.closeDb(); 
    downloadBucket(); 
} 

/* 
* 
* The Java heap size limits for Windows are: 
* maximum possible heap size on 32-bit Java: 1.8 GB 
* recommended heap size limit on 32-bit Java: 1.5 GB (or 1.8 GB with /3GB option) 
* 
* */ 
/*-------------Download and un-zip backup file-------------*/ 
private static void downloadBucket(){ 

    try { 
     app.setAwsCredentials(doc.getAccessKey(), doc.getSecretKey()); 
     s3Client = AmazonS3ClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(app.getAwsCredentials())).withRegion(Regions.US_EAST_1).build(); 
     System.out.println("Connected to S3"); 
     itr = entries.iterator(); 
     while(itr.hasNext()){ 
      app.setBucketKey(itr.next()); 
      String key = app.getBucketKey(); 
      app.setCurrentPacsId(key); 
      s3Object = s3Client.getObject(new GetObjectRequest(doc.getDesiredBucket(), app.getBucketKey())); 
      try { 
       ZipInputStream zis = new ZipInputStream(s3Object.getObjectContent()); 
       ZipEntry entry = zis.getNextEntry(); 
       extractObjects(buffer, s3Client, zis, entry);     
      } catch (AmazonServiceException e) { 
       log.error(e); 
      } catch (SdkClientException e) { 
       log.error(e); 
      } catch (IOException e) { 
       log.error(e); 
      } 
     } 
     System.out.println("Processing complete"); 


    } catch (IllegalArgumentException e) { 
     e.printStackTrace(); 
    } 
} 

public static void extractObjects(byte[] buffer, AmazonS3 s3Client, ZipInputStream zis, ZipEntry entry) throws IOException { 
    PipedOutputStream outputStream = null; 
    PipedInputStream is = null; 
    try { 
     while (entry != null) 
     { 
      String fileName = entry.getName(); 
      if (fileName == "lib") { 
       fileName = entry.getName(); 
      } 
      boolean containsBackup = fileName.contains(doc.getDesiredFile()); 

      if (containsBackup == true) { 
       System.out.println("A back up file was found"); 
       long start = System.currentTimeMillis(); 
       formatSchemaName(); 
       System.out.println("Extracting :" + app.getCurrentPacsId()); 
       log.info("Extracting " + app.getCurrentPacsId() + ", 
       compressed: " + entry.getCompressedSize() + " bytes, 
       extracted: " + 
       entry.getSize() + " bytes"); 
     //ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); 


       outputStream = new PipedOutputStream(); 
       is = new PipedInputStream(outputStream); 

       int len; 
       while ((len = zis.read(buffer)) >= 0) 
       { 
        outputStream.write(buffer, 0, len); 
       } 
    //InputStream is = new ByteArrayInputStream(outputStream.toByteArray()); 
       meta = new ObjectMetadata(); 
       meta.setContentLength(file.length()); 
       fileName = app.getCurrentPacsId(); 
       runDataConversion(is,s3Client,fileName); 
       recordTime(start); 
       is.close(); 
       outputStream.close(); 
       System.out.println("Unzip complete");    
      } 
      else{ 
       System.out.println("No back up found"); 
      } 
      entry = zis.getNextEntry(); 
     } 
     zis.closeEntry(); 
     zis.close(); 
    } catch (AmazonServiceException e) { 
     log.error(e); 
    } catch (SdkClientException e) { 
     log.error(e); 
    } 
} 


/*------------Formating the replacment file name---------*/ 
private static void formatSchemaName(){ 
    String s3Key = app.getCurrentPacsId(); 
    String id = s3Key.replace(".zip", ".txt"); 
    id = id.substring(id.indexOf("_")); 
    id = id.replaceFirst("_", ""); 
    app.setCurrentPacsId(id); 
} 

/*---------------Process the data file----------------------*/ 
private static void runDataConversion(PipedInputStream is, AmazonS3 s3Client, String fileName) { 
    DataProcessor convert = new DataProcessor(s3Client); 
    convert.downloadBucket(is,fileName); 
} 

/*-------Records execution time of program in min/sec------*/ 
private static void recordTime(long start) throws IOException { 
    long end = System.currentTimeMillis(); 
    long minutes = TimeUnit.MILLISECONDS.toMinutes(end - start); 
    long seconds = TimeUnit.MILLISECONDS.toSeconds(end - start); 
    System.out.println("Execution speed "+ minutes + ":" + (seconds % 60) +" min/sec\n"); 
} 

これはテキストファイルの処理を行うクラスです.3.5GBまでのファイルを処理する場合、コードは全体的に非常に遅いです。走っている間はそうするのに3時間かかります。私は、バイトストリームでパイプストリームを使用しようとしました。 Javaヒープサイズは、64ビットJDKでは-xms2800mに設定されています。読み書きでこれをより速くするにはどうすればよいですか?

public class DataProcessor { 

private static AmazonS3 s3Client; 
private static ObjectMetadata meta; 
private static DomainOfConstants doc = new DomainOfConstants(); 
private static BigData app = new BigData(); 
public static File file = new File(app.getCurrentPacsId()+".txt"); 
private static Logger log = Logger.getLogger(DataProcessor.class); 

//Construct connection 
public DataProcessor (AmazonS3 s3Client){ 
    this.s3Client = s3Client; 
} 

// 
public void downloadBucket(PipedInputStream is, String fileName) { 
    try { 
     File dataStream = dataConversion(is); 
     s3Client.putObject(doc.getDestinationBucket(),FilenameUtils.getFullPath(doc.getDestinationKey()) + "Modified_"+ fileName, dataStream); 
    } catch (AmazonServiceException e) { 
     e.printStackTrace(); 
     log.error(e); 
    } catch (SdkClientException e) { 
     e.printStackTrace(); 
     log.error(e); 

    }    
} 

//Setup reading and writing streams 
public static File dataConversion(PipedInputStream stream) { 
    BufferedReader reader = null; 
    BufferedOutputStream streamOut = null; 
    String line; 

    try { 
     reader = new BufferedReader(new InputStreamReader(stream,doc.getFileFormat())); 
     streamOut = new BufferedOutputStream(new FileOutputStream(file)); 
     meta = new ObjectMetadata(); 
     while((line = reader.readLine()) != null) 
     { 
      processLine(reader, streamOut, line); 
     } 
    } 
    catch (IOException e) { 
     e.printStackTrace(); 
    } finally { 
     try { 
      streamOut.close(); 
      reader.close(); 

     } catch (IOException e) { 
      e.printStackTrace(); 
      log.error(e); 
     } 
    } 
    return file; 
} 


/*---------------------------------------Data processing------------------------------------------------*/ 

    /*-----------Process and print lines---------*/ 
private static void processLine(BufferedReader reader, BufferedOutputStream streamOut, String line) { 
    try { 
     String newLine = System.getProperty("line.separator"); 

     while (reader.ready()) { 
      if (line.contains(doc.getInsert())) { 
       handleData(streamOut, line); 
      } else if (line.contains(doc.getUse())) { 
       handleSchemaName(streamOut, line); 
      } else { 
       streamOut.write(line.toLowerCase().getBytes(Charset.forName(doc.getFileFormat()).toString())); 
       streamOut.write(newLine.getBytes()); 
      } 
      line = reader.readLine(); 
     } 
    } catch (UnsupportedEncodingException e) { 
     e.printStackTrace(); 
     log.error(e); 

    } catch (IOException e) { 
     e.printStackTrace(); 
     log.error(e); 

    } 
} 

    /*-----------Replace-Schema-Name-----------*/ 
private static void handleSchemaName(BufferedOutputStream streamOut, String line) throws IOException { 
    line = line.replace(line, "USE " + "`" + doc.getSchemaName() + app.getCurrentPacsId() + "`;"); 
    streamOut.write(line.getBytes(Charset.forName(doc.getFileFormat()))); 
} 


    /*--------Avoid-Formating-Data-Portion-of-file--------*/ 
private static void handleData(BufferedOutputStream streamOut, String line) throws IOException { 
    StringTokenizer tk = new StringTokenizer(line); 
    while (tk.hasMoreTokens()) { 
     String data = tk.nextToken(); 
     if (data.equals(doc.getValue())) { 
      streamOut.write(data.toLowerCase().getBytes(Charset.forName(doc.getFileFormat()).toString())); 
      data = tk.nextToken(); 
      while (tk.hasMoreTokens()) { 
       streamOut.write(data.getBytes(Charset.forName(doc.getFileFormat()))); 
       data = tk.nextToken(); 
      } 
     } 
     streamOut.write(line.toLowerCase().getBytes(Charset.forName(doc.getFileFormat().toString()))); 
     streamOut.write(" ".getBytes(Charset.forName(doc.getFileFormat()))); 
    } 
} 
+0

賢明なSystem.out.printlnステートメントを試して、コードの低速領域を特定してください。条件が満たされた場合にループの1つに 'break;'を導入して、データを処理し続ける必要がなくなります。それが絞られていない場合は、Javaプロファイラを使用してみてください。 – bakoyaro

+1

あなたはそれをプロファイルしようとしましたか?ボトルネックがどこにあるかを知る必要があります。 – marstran

+0

はい私はそれをプロファイルするためにVisualVMを使用しています。私はまた、コードが実際のデータ変換のポイントまで非常に高速で、入力ストリームを読み込んで処理していることを印刷ステートメントで発見しました。 3.5GBのファイルを読み込み、テキストを処理して新しいファイルに書き込むことを期待しているプログラムの実行時間はどのようなものでしょうか? –

答えて

2
  1. ルール1は、大きなバッファを使用することが常にあります。 1024はひどく小さいです。 32-64Kを試してください。
  2. パイプへの書き込みを行う前にパイプ読み取りスレッドを開始する必要があります。実際、私はあなたが「終わりの死んだ」エラーを読まないことに驚いています。このコードは実際にはまったく機能しますか?
  3. パイプされたストリームを実際に取り除きます。 1つのスレッドを使用し、すべての処理を行います。
  4. ready()テストを削除してください。それは何のための余分なシステムコールです。ストリームの終わりまで読んでください。
  5. BufferedOutputStreamの代わりにBufferedWriterを使用し、すべての文字列をバイトに変換しないでください(システムプロパティの代わりにBufferedWriter.newLine()を使用してください)。
+0

私がしたのは、バッファを推奨32kまで増やしただけで、プログラムは速度が5倍に増えました!小さなファイルをテストするためにはそれほど小さく設定していないのは忘れていました。私はbufferedwritterを使用して調べる必要があります、私はストリームに以前書いていた。これは、AWS PUTオブジェクトが入力ストリームをパラメータとして必要としたためです。 –

+0

バッファを64kに増やすと、4GBの大容量ファイルの処理に問題が発生しますか? xms3000mにヒープ問題がありますか? –

+0

? 64Kは64Kです。ファイルのサイズとは関係ありません。しかし、64Kで32Kと2倍速くなるとは期待しないでください。利点は漸近的である。 5xのほとんどは1Kから8Kに移行したものです。 – EJP

関連する問題