2017-09-13 5 views
1

私は、SparkSQLによるElastic検索の統合にHDFSを取り組んでいます。 HDFSからcsvデータを読み込み、弾性検索インデックスを作成することができました。 Elastic検索インデックスIDを作成するには、csvデータの一意の列の1つを使用しています。今私の要件は、弾性検索インデックスIDは、2つのCSVの列の組み合わせでなければなりません。私はこれをどのように達成するでしょうか?私は、インデックスを作成するためにelasticsearch-sparkライブラリを使用しています。以下はサンプルコードです。複合キーに弾性検索を作成する複合キーIndex

SparkSession sparkSession = SparkSession.builder().config(config).getOrCreate(); 
SQLContext ctx = sparkSession.sqlContext(); 
HashMap<String, String> options = new HashMap<String, String>(); 
options.put("header", "true"); 
options.put("path", "hdfs://localhost:9000/test"); 
Dataset df = ctx.read().format("com.databricks.spark.csv").options(options).load(); 
JavaEsSparkSQL.saveToEs(df, "spark/test", ImmutableMap.of("es.mapping.id", "Id")); 

答えて

0

変更ID値は、弾性検索にデータセットを保存します。働い

df.registerTempTable("tmp"); 
Dataset ds= spark.sql("select concat(Id,<another composite key column>) as Id ,<rest of the columns> from tmp"); 
JavaEsSparkSQL.saveToEs(df, "spark/test", ImmutableMap.of("es.mapping.id", "Id")); 
+1

感謝を。 – Sach

+0

お寄せください。マークされた質問に答えました。 –