私はSpark2を使用しています。私はRabbitmqから検索テキストのストリームを取得し、再度Elasticsearchを検索しようとしています。Rabbitmq spark stramを使用してESを照会する方法
params.put("hosts", "IP");
params.put("queueName", "query");
params.put("exchangeName", "Exchangequery");
params.put("vHost", "/");
params.put("userName", "test");
params.put("password", "test");
Function<byte[], String> messageHandler = new Function<byte[], String>() {
public String call(byte[] message) {
return new String(message);
}
};
JavaReceiverInputDStream<String> messages = RabbitMQUtils.createJavaStream(jssc, String.class, params, messageHandler);
messages.foreachRDD();
上記のコードは、rabbitmqからstramを受け取ります。しかし、私はどのようにESとストリームのバッチのクエリに接続するか分からない。 1つは、messages.foreachRDD();
を使用し、各入力項目のelasticsearchをクエリすると、パフォーマンスに影響します。
私は常に1つのフィールドのみを使用してelasticsearchをクエリします。例えば
私STRAM messages
は、私は、ES fruit
にインデックスを持っていると私は?q=apple or orange
のように照会したい
apple
orange
のような入力を持っています。私はelasticsearchでshould
を使用してクエリをフレームする必要があることを知っています。私はRabbitMQのストリームから受け取った値を使用してESに対してクエリを実行することができますどのように私の質問は
は、応答をありがとうございました。私の問題は、SparkストリーミングRDD – Backtrack
にこの作品のようなものを統合したいのですか? 'リストフルーツ= rdd.take(100); (フルーツ:+フルーツ); (文字列メッセージ:フルーツ){ //と同じです。上記 } ' –
deathyr