データセットをどのように並行して処理するかは問題ではありません。明確な設計が原因です。シリアル化は常に出力ストリーム、ソケットなどのシーケンシャルな性質が原因ですあなたのデータセット処理を脇に置きます。したがって、シリアライズしてシリアライズしたデータセットをファイル、接続、またはRAWメモリに格納する場合、並行レースや望ましくない変更からデータを保護するバリアを定義する必要があります。
確かに、各作業スレッドがhttpサーバーなどのデータ自体をシリアル化するケースがありますが、ここでは並列処理されて最終的にシリアル化されている単一のデータセットについて説明しています。
したがって、上の説明によれば、これは正しいコードであるはずです。標準のJavaシリアル化+ GZIP圧縮を使用します。このコードおよびベンチマークでは、シリアライゼーションや圧縮を現在のソリューションと簡単に置き換えることができます。
package com.example.demo;
import java.io.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import static java.lang.String.format;
public final class ParallelObjectsSerialization {
private static final int ONE_MILLION = 1_000_000;
private static final String SERIALIZE_FILE = "/tmp/out.bin";
public static void main(String[] args) throws IOException, ClassNotFoundException {
// List<Player> players = parallelGenerate1MPlayers();
List<Player> players = seqGenerate1MPlayers();
serialize(players);
players.clear();
players = deserialize();
}
private static List<Player> deserialize() throws IOException, ClassNotFoundException {
long started = System.currentTimeMillis();
List<Player> players = new ArrayList<>();
try (ObjectInputStream in = new ObjectInputStream(new GZIPInputStream(new FileInputStream(SERIALIZE_FILE)))) {
for (int i = 0; i < ONE_MILLION; i++) {
players.add((Player) in.readObject());
}
}
long time = System.currentTimeMillis() - started;
System.out.println(format("deserialization of %d objects took %d ms", players.size(), time));
return players;
}
private static final class Player implements Serializable {
private final String name;
private final int level;
private Player(String name, int level) {
this.name = name;
this.level = level;
}
}
private static List<Player> seqGenerate1MPlayers() {
long started = System.currentTimeMillis();
List<Player> players = new ArrayList<>(ONE_MILLION);
for (int i = 0; i < ONE_MILLION; i++) {
players.add(new Player(randomName(i), i));
}
long time = System.currentTimeMillis() - started;
System.out.println(format("sequential generating of %d objects took %d ms", players.size(), time));
return players;
}
private static List<Player> parallelGenerate1MPlayers() {
long started = System.currentTimeMillis();
Player[] players = new Player[ONE_MILLION];
Arrays.parallelSetAll(players, (i) -> new Player(randomName(i), i));
long time = System.currentTimeMillis() - started;
System.out.println(format("parallel generating of %d objects took %d ms", players.length, time));
return Arrays.asList(players);
}
private static void serialize(List<Player> players) throws IOException {
long started = System.currentTimeMillis();
try (ObjectOutputStream out = new ObjectOutputStream(new GZIPOutputStream(new FileOutputStream(SERIALIZE_FILE)))) {
for (Player player : players) {
out.writeObject(player);
}
}
long time = System.currentTimeMillis() - started;
System.out.println(format("serialization of %d objects took %d ms", players.size(), time));
}
private static String randomName(int seed) {
StringBuilder builder = new StringBuilder();
double chance = 30.0;
for (char c = 'a'; c <= 'z'; c++) {
if (Math.random() * 100.0 <= chance) {
builder.append(c);
if (builder.length() == 7) {
break;
}
}
}
if (builder.length() == 0) {
builder.append("unknown").append(seed);
}
return builder.toString();
}
}
ありがとうアレクサンダー。私はすでにこのソリューションをテストしました。シリアライズは順次実行する必要があるため、プロセス全体のボトルネックです。 – kem
分割コレクションを複数の小さなコレクションに分けて、それらを並行してシリアル化し、それらをzipまたはtarファイルにグループ化しますか? – kem
この場合、 '' 'tar'''はボトルネックになります。私のポスト最終段階で言及したように、常にシーケンシャルな操作になります。一方で、シリアライゼーションがCPUバウンドで、オブジェクトのシリアライゼーション中にCPU使用率に問題が発生した場合、 '' tar'''は少数のシリアル化されたチャンクで動作しますが、ディスクI/Oの飽和に注意してください。 –