2017-12-23 40 views
0

大きなデータセットを並列処理するためのJavaSE8アプリがあります。私は1つの圧縮ファイルにシリアル化したい1Mのオブジェクトを生成しています。ファイルはWebアプリケーションからダウンロード/アップロードされます。 並列プロセスは最適化されています。しかし、シリアル化/圧縮は順番に行われ、それは私のアプリケーションのボトルネックです。Java並列シリアライズと圧縮

私はさまざまなソリューションをテストしました:Kryo、ChronicleMap ...私は現在、KryoとBz2圧縮を使用しています。それは働いている。しかし、パフォーマンスは十分ではありません。

パラレルシリアル化と圧縮を行うための解決策が見つかりません。これに関する情報はすべて歓迎です

答えて

1

データセットをどのように並行して処理するかは問題ではありません。明確な設計が原因です。シリアル化は常に出力ストリーム、ソケットなどのシーケンシャルな性質が原因ですあなたのデータセット処理を脇に置きます。したがって、シリアライズしてシリアライズしたデータセットをファイル、接続、またはRAWメモリに格納する場合、並行レースや望ましくない変更からデータを保護するバリアを定義する必要があります。

確かに、各作業スレッドがhttpサーバーなどのデータ自体をシリアル化するケースがありますが、ここでは並列処理されて最終的にシリアル化されている単一のデータセットについて説明しています。

したがって、上の説明によれば、これは正しいコードであるはずです。標準のJavaシリアル化+ GZIP圧縮を使用します。このコードおよびベンチマークでは、シリアライゼーションや圧縮を現在のソリューションと簡単に置き換えることができます。

package com.example.demo; 

import java.io.*; 
import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.List; 
import java.util.zip.GZIPInputStream; 
import java.util.zip.GZIPOutputStream; 

import static java.lang.String.format; 

public final class ParallelObjectsSerialization { 

    private static final int ONE_MILLION = 1_000_000; 
    private static final String SERIALIZE_FILE = "/tmp/out.bin"; 

    public static void main(String[] args) throws IOException, ClassNotFoundException { 
//  List<Player> players = parallelGenerate1MPlayers(); 
     List<Player> players = seqGenerate1MPlayers(); 
     serialize(players); 
     players.clear(); 
     players = deserialize(); 
    } 

    private static List<Player> deserialize() throws IOException, ClassNotFoundException { 
     long started = System.currentTimeMillis(); 
     List<Player> players = new ArrayList<>(); 
     try (ObjectInputStream in = new ObjectInputStream(new GZIPInputStream(new FileInputStream(SERIALIZE_FILE)))) { 
      for (int i = 0; i < ONE_MILLION; i++) { 
       players.add((Player) in.readObject()); 
      } 
     } 
     long time = System.currentTimeMillis() - started; 
     System.out.println(format("deserialization of %d objects took %d ms", players.size(), time)); 
     return players; 
    } 

    private static final class Player implements Serializable { 
     private final String name; 
     private final int level; 

     private Player(String name, int level) { 
      this.name = name; 
      this.level = level; 
     } 
    } 

    private static List<Player> seqGenerate1MPlayers() { 
     long started = System.currentTimeMillis(); 
     List<Player> players = new ArrayList<>(ONE_MILLION); 
     for (int i = 0; i < ONE_MILLION; i++) { 
      players.add(new Player(randomName(i), i)); 
     } 
     long time = System.currentTimeMillis() - started; 
     System.out.println(format("sequential generating of %d objects took %d ms", players.size(), time)); 
     return players; 
    } 

    private static List<Player> parallelGenerate1MPlayers() { 
     long started = System.currentTimeMillis(); 
     Player[] players = new Player[ONE_MILLION]; 
     Arrays.parallelSetAll(players, (i) -> new Player(randomName(i), i)); 
     long time = System.currentTimeMillis() - started; 
     System.out.println(format("parallel generating of %d objects took %d ms", players.length, time)); 
     return Arrays.asList(players); 
    } 

    private static void serialize(List<Player> players) throws IOException { 
     long started = System.currentTimeMillis(); 
     try (ObjectOutputStream out = new ObjectOutputStream(new GZIPOutputStream(new FileOutputStream(SERIALIZE_FILE)))) { 
      for (Player player : players) { 
       out.writeObject(player); 
      } 
     } 
     long time = System.currentTimeMillis() - started; 
     System.out.println(format("serialization of %d objects took %d ms", players.size(), time)); 
    } 

    private static String randomName(int seed) { 
     StringBuilder builder = new StringBuilder(); 
     double chance = 30.0; 
     for (char c = 'a'; c <= 'z'; c++) { 
      if (Math.random() * 100.0 <= chance) { 
       builder.append(c); 
       if (builder.length() == 7) { 
        break; 
       } 
      } 
     } 
     if (builder.length() == 0) { 
      builder.append("unknown").append(seed); 
     } 
     return builder.toString(); 
    } 
} 
+0

ありがとうアレクサンダー。私はすでにこのソリューションをテストしました。シリアライズは順次実行する必要があるため、プロセス全体のボトルネックです。 – kem

+0

分割コレクションを複数の小さなコレクションに分けて、それらを並行してシリアル化し、それらをzipまたはtarファイルにグループ化しますか? – kem

+1

この場合、 '' 'tar'''はボトルネックになります。私のポスト最終段階で言及したように、常にシーケンシャルな操作になります。一方で、シリアライゼーションがCPUバウンドで、オブジェクトのシリアライゼーション中にCPU使用率に問題が発生した場合、 '' tar'''は少数のシリアル化されたチャンクで動作しますが、ディスクI/Oの飽和に注意してください。 –

関連する問題