2017-06-26 4 views
0

私はJavaの初心者です。私はelasticsearch V5.XとSparkの間のコネクタのいくつかの使用例を見るために探しています。Apache Sparkとelasticsearch V5.X

package Spark; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.log4j.Level; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.junit.Test; 

import com.google.common.collect.ImmutableList; 
import com.google.common.collect.ImmutableMap; 

import scala.collection.immutable.Map; 
import twitter4j.Status; 

import org.apache.spark.api.java.JavaSparkContext;        
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.SparkConf; 

import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; 

public class EsSpark { 



    public EsSpark(){ 

     SparkConf conf = new SparkConf().setAppName("MyApp1").setMaster("localhost"); 
     conf.set("es.index.auto.create", "true"); 

     JavaSparkContext jsc = new JavaSparkContext(conf); 
     Map<String, ?> numbers = (Map<String, ?>) ImmutableMap.of("one", 1, "two", 2);     
     Map<String, ?> airports = (Map<String, ?>) ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran"); 

     JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports)); 


     JavaEsSpark.saveToEs(javaRDD, "spark/docs");  
     } 

} 

ありがとう:ここでは現時点では

は私のコードです。

答えて

1

ローカルのElasticsearchインスタンスを使用している場合を除き、特に重要な設定はes.nodesです。

あなたはマスターノードを好む、

conf.set("es.nodes", "eshost:9200");

あなたも、複数のインスタンスを指定することができます使用してそれを行うことができますが、すべてのノードが必要とされるわけではありません。

official documentationを参照してください。

ディスカッションフォーラムの人々(elastic)は、例として使用できるコードを公開することがよくあります。

EsSparkまたはEsSparkStreamingオブジェクトとしていくつかのドキュメントを用意してください。毎回1つの文書を送信せず、複数の文書を優先します。

EsSparkまたはEsSparkStreamingあなたが提供するノードに接続し、彼らがクラスター・トポロジー(ノード数、ノードの種類)を確認すると、彼らはデータノードへと(ホップを避ける)正しいシャードに直接データを送信します。 (section of the documentationで指定された設定を使用して)データノードに直接データをプッシュしないようにすることはできますが、ボトルネックが発生します。

+0

@lucabelluciniおかげで、私はこの設定es.nodesを追加することができますし、私はどのように私ができる休憩中Elasticsearchを使用していますか: – hugo

+0

が更新リプライ – lucabelluccini

+0

を参照してください「EsSparkとEsSparkStreamingオブジェクトがそれぞれのクラスタを発見していることを確認し、再接続」どうもありがとう – hugo