2016-10-24 21 views
0

私はSpark2を使用しています。私はRabbitmqから検索テキストのストリームを取得し、再度Elasticsearchを検索しようとしています。Rabbitmq spark stramを使用してESを照会する方法

params.put("hosts", "IP"); 
params.put("queueName", "query"); 
params.put("exchangeName", "Exchangequery"); 
params.put("vHost", "/"); 
params.put("userName", "test"); 
params.put("password", "test"); 

Function<byte[], String> messageHandler = new Function<byte[], String>() { 

    public String call(byte[] message) { 
     return new String(message); 
    } 
}; 

JavaReceiverInputDStream<String> messages = RabbitMQUtils.createJavaStream(jssc, String.class, params, messageHandler); 

messages.foreachRDD(); 

上記のコードは、rabbitmqからstramを受け取ります。しかし、私はどのようにESとストリームのバッチのクエリに接続するか分からない。 1つは、messages.foreachRDD();を使用し、各入力項目のelasticsearchをクエリすると、パフォーマンスに影響します。

私は常に1つのフィールドのみを使用してelasticsearchをクエリします。例えば

私STRAM messagesは、私は、ES fruitにインデックスを持っていると私は?q=apple or orangeのように照会したい

apple 
orange 

のような入力を持っています。私はelasticsearchでshouldを使用してクエリをフレームする必要があることを知っています。私はRabbitMQのストリームから受け取った値を使用してESに対してクエリを実行することができますどのように私の質問は

答えて

1

コードがelasticsearchサーバ(基本的にはそれが必要句の多くの単一のクエリを構築する)

に一つだけの呼び出しを行うです
public static void main(String[] args) throws UnknownHostException { 

    Client client = TransportClient.builder().build() 
      .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300)) 
      .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300)); 

    List<String> messages = new ArrayList<>(); 
    messages.add("apple"); 
    messages.add("orange"); 

    String index = "fruit"; 
    String fieldName = "fruit_type"; 

    BoolQueryBuilder query = QueryBuilders.boolQuery(); 

    for (String message : messages) { 
     query.should(QueryBuilders.matchQuery(fieldName, message)); 
     // alternative if you are not analyzing fields 
     // query.should(QueryBuilders.termQuery(fieldName, message)); 
    } 


    int size = 60; //you may want to change this since it defaults to 10 
    SearchResponse response = client.prepareSearch(index).setQuery(query).setSize(size).execute().actionGet(); 

    long totalHits = response.getHits().getTotalHits(); 
    System.out.println("Found " + totalHits + " documents"); 
    for (SearchHit hit : response.getHits().getHits()) { 
     System.out.println(hit.getSource()); 
    } 
} 

クエリを生成:

{ 
    "bool" : { 
    "should" : [ { 
     "match" : { 
     "fruit_type" : { 
      "query" : "apple", 
      "type" : "boolean" 
     } 
     } 
    }, { 
     "match" : { 
     "fruit_type" : { 
      "query" : "orange", 
      "type" : "boolean" 
     } 
     } 
    } ] 
    } 
} 
+0

は、応答をありがとうございました。私の問題は、SparkストリーミングRDD – Backtrack

+0

にこの作品のようなものを統合したいのですか? 'リストフルーツ= rdd.take(100); (フルーツ:+フルーツ); (文字列メッセージ:フルーツ){ //と同じです。上記 } ' – deathyr

関連する問題