2013-06-04 14 views
5

私はelasticsearchバルクAPIを使用してJavaを使用して、バッチサイズを設定する方法が不思議です。elasticsearch javaバルクバッチサイズ

現在、私のようにそれを使用しています:

BulkRequestBuilder bulkRequest = getClient().prepareBulk(); 
while(hasMore) { 
    bulkRequest.add(getClient().prepareIndex(indexName, indexType, artist.getDocId()).setSource(json)); 
    hasMore = checkHasMore(); 
} 
BulkResponse bResp = bulkRequest.execute().actionGet(); 
//To check failures 
log.info("Has failures? {}", bResp.hasFailures()); 

私はバルク/バッチ・サイズを設定することができますどのように任意のアイデアを?

+1

をしてください役に立てば幸い..... –

答えて

21

主に、ドキュメントのサイズ、クライアント上の利用可能なリソース、およびクライアントのタイプ(トランスポートクライアントまたはノードクライアント)によって異なります。

ノードクライアントは、クラスタ上の断片を認識しており、索引付けされるはずの断片を保持するノードに直接文書を送信します。一方、トランスポートクライアントは通常のクライアントであり、要求をラウンドロビン方式でノードのリストに送信します。バルクリクエストは1つのノードに送信され、インデックス作成時にはゲートウェイになります。

Java APIを使用しているので、BulkProcessorをご覧になることをお勧めします。BulkProcessorを使用すると、インデックス作成が簡単になり、柔軟にインデックスを作成できます。最後の一括実行以降、アクションの最大数、最大サイズ、および最大時間間隔のいずれかを定義できます。必要に応じて自動的に一括処理が実行されます。最大同時バルク要求数を設定することもできます。

あなたはこのようなBulkProcessorを作成した後:

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { 
    @Override 
    public void beforeBulk(long executionId, BulkRequest request) { 
     logger.info("Going to execute new bulk composed of {} actions", request.numberOfActions()); 
    } 

    @Override 
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { 
     logger.info("Executed bulk composed of {} actions", request.numberOfActions()); 
    } 

    @Override 
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) { 
     logger.warn("Error executing bulk", failure); 
    } 
    }).setBulkActions(bulkSize).setConcurrentRequests(maxConcurrentBulk).build(); 

あなたはそれにあなたの要求を追加する必要があります。

bulkProcessor.add(indexRequest); 

と持つかもしれない最終的な要求をフラッシュする最後でそれを閉じますまだ実行されていません:

bulkProcessor.close(); 

最後にあなたの質問に答えるには:thまた、についての素晴らしい点は、5MBのサイズ、1000のアクション、1つの同時リクエスト、フラッシュ間隔なし(設定に便利かもしれない)といった賢明なデフォルトがあることです。

0

バッチ・リクエスト・ビルダーがバッチ・サイズ制限に達したときにそれらをカウントし、それらを索引付けして古いバルク・ビルドをフラッシュする必要があります。ここ コード

Settings settings = ImmutableSettings.settingsBuilder() 
    .put("cluster.name", "MyClusterName").build(); 

TransportClient client = new TransportClient(settings); 
String hostname = "myhost ip"; 
int port = 9300; 
client.addTransportAddress(new InetSocketTransportAddress(hostname, port)); 

BulkRequestBuilder bulkBuilder = client.prepareBulk(); 
BufferedReader br = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream("my_file_path")))); 
long bulkBuilderLength = 0; 
String readLine = ""; 
String index = "my_index_name"; 
String type = "my_type_name"; 
String id = ""; 

while((readLine = br.readLine()) != null){ 
    id = somefunction(readLine); 
    String json = new ObjectMapper().writeValueAsString(readLine); 
    bulkBuilder.add(client.prepareIndex(index, type, id).setSource(json)); 
    bulkBuilderLength++; 
    if(bulkBuilderLength % 1000== 0){ 
     logger.info("##### " + bulkBuilderLength + " data indexed."); 
     BulkResponse bulkRes = bulkBuilder.execute().actionGet(); 
     if(bulkRes.hasFailures()){ 
     logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage()); 
     } 
     bulkBuilder = client.prepareBulk(); 
    } 
} 

br.close(); 

if(bulkBuilder.numberOfActions() > 0){ 
    logger.info("##### " + bulkBuilderLength + " data indexed."); 
    BulkResponse bulkRes = bulkBuilder.execute().actionGet(); 
    if(bulkRes.hasFailures()){ 
     logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage()); 
    } 
    bulkBuilder = client.prepareBulk(); 
} 

の一例である、これが正しいと答えをマークし、あなたに感謝

関連する問題