2017-06-14 21 views
-2

約200万レコードを持つamazon S3にファイルがあります。今はスレッドを使ってこれらのレコードを処理したいので、処理を素早く行うことができます。私は、これがスパークまたはマップリダクションを使用して実行できることを知っています。しかし、スパークやMRを使用することはできません。行数に基づいてs3オブジェクトを分割する方法

現在、私はJavaでそれを行う方法上の任意の提案は大きな助けになるだろう

for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) { 
    S3Object s3object = s3Client 
         .getObject(new GetObjectRequest(s3Conn.getBucket(), objectSummary.getKey())); 
    BufferedReader reader = new BufferedReader(new InputStreamReader(s3object.getObjectContent())); 

    List<Events> ingEvents = new LinkedList<>(); 
    while ((fileLine = reader.readLine()) != null) { 

       //Processing the line 

       } 
} 

を次のようでしたしています。前もって感謝します。乾杯!

+0

Javaコアでファイルを1行ずつ読み込みますか? –

+1

何のための提案ですか?ファイルを処理したい以外のものは、実際に私たちに言いませんでした。 –

+0

@JaySmith申し訳ありませんが、私はJavaを使いたいと言いました。はい、私はJavaを使用して、行単位でS3スレッドを使用してファイルを読み込みたいと思います。 –

答えて

0

私は

例えば、10000行で小さいファイルごとに大きなファイルを分割するためのLinuxで「分割」コマンドを使用します。その後、Javaプログラムは、個々のファイルを処理することができます

$ split --lines=10000 --numeric-suffixes <original file> <prefix for split files> 

+0

残念ながら私はそれを行うことができませんでした。ファイルはS3にあります。私の要件はUIからinputPathを取得してJava Webサービスで処理することです。現在、スレッドなしでプレーンJavaを使用しています。しかし、私は、おそらく "randomAccessFile"を使って、 –

+0

行でファイル行を処理するスレッドを使用したいですか?例はこちらをご覧ください:http://tutorials.jenkov.com/java-io/randomaccessfile.html次に、各スレッドは、ターゲットファイルの異なる行のセットを処理できます。 –

0

java.util.Scannerを使用すると、行単位または正規表現単位でファイルを読み取ることができます。それを行う方法を示してショートデモ:

String xmlFile = null; 
     Scanner sc = new Scanner(new File(xmlFile)); 

     String nextLine; 
     while ((nextLine = sc.nextLine()) != null) { 

     } 

まず、あなたはそれをパラメータとしてFile xmlFileを与えるScannerオブジェクトを作成します。次に、ファイルを1行ずつ読み込み、whileループで行を処理します。すべての行が読み取られると、sc.nextLine()はヌルを返します。

+0

私は私のケースをはっきりと伝えていないと思います。入力は通常のファイルではありません。それはS3で利用可能なファイルです。 –

+0

s3のファイルを通常の 'java.util.File'に変換する必要があります –

+0

ああ...だからS3ファイルでスレッドを直接使用することはできませんか? –

0

ファイルのマルチスレッド処理に簡単な方法は、例えば、Javaの8ラムダを使用することです:

public class ThreadTest { 
    static final int THREAD_POOL_SIZE = 3; 

    static final String []myData = { 
      "Line 1","Line 2","Line 3","Line 4","Line 5","Line 6","Line 7","Line 8","Line 9","Line 10","Line 11","Line 12" 
    }; 
    static final List<String> myList = Arrays.asList(myData); 

    public static void main(String[] args) { 
     ExecutorService service = Executors.newFixedThreadPool(THREAD_POOL_SIZE); 
     myList.stream().parallel().forEach(item->{ 
      System.out.println("Processing " + item + " in thread " + Thread.currentThread().getName()); 
     }); 
    } 
} 

あなたがこれを実行する場合は、同時に複数のスレッド間で処理されている行が表示されます:

Processing Line 8 in thread main 
Processing Line 4 in thread ForkJoinPool.commonPool-worker-1 
Processing Line 9 in thread main 
Processing Line 11 in thread ForkJoinPool.commonPool-worker-2 
Processing Line 2 in thread ForkJoinPool.commonPool-worker-3 
Processing Line 12 in thread ForkJoinPool.commonPool-worker-2 
Processing Line 7 in thread main 
Processing Line 6 in thread ForkJoinPool.commonPool-worker-1 
Processing Line 1 in thread main 
Processing Line 10 in thread ForkJoinPool.commonPool-worker-2 
Processing Line 3 in thread ForkJoinPool.commonPool-worker-3 
Processing Line 5 in thread ForkJoinPool.commonPool-worker-1 
関連する問題