2016-07-10 10 views
0

私はPysparkを使用してElasticsearchからデータを読み取る必要があります。次のようにIデバイスにpysparkにおける流れをしようとしてい -RDDの各要素にsparkcontext関数を使用

i)はRDD1
II)foreachの発生インRDD1
CONF = {RDD1の要素に基づいて、動的な値}
RDD2 =を作成しますsc.newAPIHadoopRDD( "org.apache.hadoop.io.NullWritable" \ "org.elasticsearch.hadoop.mr.EsInputFormat"、 "org.elasticsearch.hadoop.mr.LinkedMapWritable"、CONF = CONF)

私は、 "foreach"が作業者間で作業を分散し、sc.newAPIHadoopRDDを呼び出そうとします。その結果、scがワーカーで利用できないというエラーが発生します。

これを達成する別の方法はありますか?
注 - 「newAPIHadoopRDD」を使用する必要があります。残りの処理は、それに依存します。

+0

はい。 RDDのレコードごとに何を出力するために 'map()'を使い、結果として得られるRDDに 'newAPIHadoopRDD()'メソッドを使います。 –

+0

あなたの提案Avihooに感謝します。残念ながら、新しいAPIハードウェアAPIはscでのみ動作します。つまり、sc.newAPIHadoopRDD()を使用する必要があり、ドライバで実行されます。また、クラスター上で動作させたいと思っていました。新しいAPIHadoopRDD()をワーカーにシリアライズし、処理が並行するようにする方法があることを期待していました。 – Yogesh

答えて

0

RDDをネストすることはできません。 rdd1の結果をループしたい場合は、最初にドライバにcollectする必要があります。

val rdd1Result = rdd1.collect() 
rdd1Result.foreach { v => 
    val conf = ... 
    sc.newAPIHadoopRDD... 
} 
+0

Dikeiありがとうございます。あなたの入力を感謝します。 i) "rdd1.collect"はリストを返しますが、 ".foreach"はRDDで動作します。ii)スケーラビリティのためにこれをワーカーノード上で実行したいと思います。しかし、 "rdd1Result"全体がドライバノード上でのみ処理されると考えられます。それをワーカーノード上で並列に実行するための提案はありますか? – Yogesh

+0

スカラリストにも 'foreach'メソッドがあります。ワーカーノードでSparkContextにアクセスすることは不可能です。 – Dikei

+0

はいディケイ。ワーカーノードではSparkContextにアクセスできないことに気付きました。あなたの入力をありがとう。 – Yogesh

0

foreach内でRDDを送信することはできません。そのようにしないでください。 あなたが参加していない限り、この場合sparkは2つのrddを扱うことができます。あなたがそれについて考えるなら、これはあなたが必要とするすべてのものです。

あなたはtheta-joinをやっているようです。 データに応じて、近似値で正確な結合を使用してループを回避することができます。

+0

ありがとうMarmouset。我々は我々のアプローチを変える必要があることを認識した。その結果、私たちはnewAPIHadoopRDD()と同様のことをする関数を作成しました。私たちの基本的な要件は、私たちが代わりに行ったelasticsearchを照会することでした。そのためにHadoop APIを使用しました。この新しい関数を.mapを使用してRDDに渡し、ワーカーノードで実行されるようにしました。これは私たちが望んだことを達成するのに役立ちました。すなわち、i)クエリelasticsearch ii)スケーラブルな方法でそれを行います。 – Yogesh

関連する問題