2016-07-02 12 views
5

Elasticsaerchのドキュメントでは、完全なインデックスをSparkに読み込む方法についてのみ説明しています。PysparkとDataframesを使用してElasticsearchインデックスをクエリする方法

from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type") 
df.printSchema() 

どのようにElasticsearchインデックスからデータを返すとpysparkを使用してデータフレームとしてスパークするためにそれらをロードするためのクエリを実行することができますか?

答えて

4

以下は私のやり方です。

一般環境設定およびコマンド:

export SPARK_HOME=/home/ezerkar/spark-1.6.0-bin-hadoop2.6 
export PYSPARK_DRIVER_PYTHON=ipython2 

./spark-1.6.0-bin-hadoop2.6/bin/pyspark --driver-class-path=/home/eyald/spark-1.6.0-bin-hadoop2.6/lib/elasticsearch-hadoop-2.3.1.jar 

はコード:

from pyspark import SparkConf 
from pyspark.sql import SQLContext 

conf = SparkConf().setAppName("ESTest") 
sc = SparkContext(conf=conf) 
sqlContext = SQLContext(sc) 

q ="""{ 
    "query": { 
    "filtered": { 
     "filter": { 
     "exists": { 
      "field": "label" 
     } 
     }, 
     "query": { 
     "match_all": {} 
     } 
    } 
    } 
}""" 

es_read_conf = { 
    "es.nodes" : "localhost", 
    "es.port" : "9200", 
    "es.resource" : "titanic/passenger", 
    "es.query" : q 
} 

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf) 

sqlContext.createDataFrame(es_rdd).collect() 

また、データフレームの列を定義することができます。詳細はHereを参照してください。

希望すると助かります!

+0

これは私が今行ってきたことですが、フィルタリングされたDataFrameを直接フェッチする方法があることを願っていました。 –

+1

ES-Hadoop Sparkコネクタの最新APIでは可能かどうかはわかりません。 –

+1

このAPIを使用してelasticsearchにデータフレームを書き込む方法はありますか? –

0

pysparkを使用してAmazonのEMRクラスターでコードを実行しています。その後、方法は私が作っそれ以下の手順に従ってた作品:

1)ローカルホストelasticsearchサーバーを作成する(クラスタ作成中、このブートストラップアクションを入れて):

s3://awssupportdatasvcs.com/bootstrap-actions/elasticsearch/elasticsearch_install.4.0.0.rb 

2)私は埋めるためにこれらのコマンドを実行しますいくつかのデータとelasticsearchデータベース:

curl -XPUT "http://localhost:9200/movies/movie/1" -d' { 
    "title": "The Godfather", 
    "director": "Francis Ford Coppola", 
    "year": 1972 
    }' 

ご希望の場合も同様に、他のカールのコマンドを実行できます。

curl -XGET http://localhost:9200/_search?pretty=true&q={'matchAll':{''}} 

3)私は、次のパラメータを使用してpysparkをinited:

pyspark --driver-memory 5G --executor-memory 10G --executor-cores 2 --jars=elasticsearch-hadoop-5.5.1.jar 

私は以前

4 elasticsearchのPythonクライアントをダウンロードしていた)私は、次のコードを実行します。

from pyspark import SparkConf 
from pyspark.sql import SQLContext 

q ="""{ 
    "query": { 
    "match_all": {} 
    } 
}""" 

es_read_conf = { 
    "es.nodes" : "localhost", 
    "es.port" : "9200", 
    "es.resource" : "movies/movie", 
    "es.query" : q 
} 

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf) 

sqlContext.createDataFrame(es_rdd).collect() 

その後、私は最終的に得ました成功したコマンドの結果。

0

ジオフィルタリングされたデータをPySpark DataFrameに取り込むために、これと同様の問題が発生しました。私は、Sparkバージョン2.1.1とESバージョン5.2でelasticsearch-spark-20_2.11-5.2.2.jarを使用しています。私はデータフレーム

spark_df = spark.read.format("es").option("es.query", q).load("index_name") 
にデータをロードするには、次のコマンドを使用

q ="""{ 
    "query": { 
     "bool" : { 
      "must" : { 
       "match_all" : {} 
      }, 
      "filter" : { 
       "geo_distance" : { 
        "distance" : "100km", 
        "location" : { 
         "lat" : 35.825, 
         "lon" : -87.99 
        } 
       } 
      } 
     } 
    } 
}""" 

DATAFRAME

を作成中にオプションとしての私のクエリを指定することにより、データフレームにデータをロードすることができた私の地理クエリ

関連する問題