私はPysparkを使用してElasticsearchからデータを読み取る必要があります。次のようにIデバイスにpysparkにおける流れをしようとしてい -RDDの各要素にsparkcontext関数を使用
i)はRDD1
II)foreachの発生インRDD1
CONF = {RDD1の要素に基づいて、動的な値}
RDD2 =を作成しますsc.newAPIHadoopRDD( "org.apache.hadoop.io.NullWritable" \ "org.elasticsearch.hadoop.mr.EsInputFormat"、 "org.elasticsearch.hadoop.mr.LinkedMapWritable"、CONF = CONF)
私は、 "foreach"が作業者間で作業を分散し、sc.newAPIHadoopRDDを呼び出そうとします。その結果、scがワーカーで利用できないというエラーが発生します。
これを達成する別の方法はありますか?
注 - 「newAPIHadoopRDD」を使用する必要があります。残りの処理は、それに依存します。
はい。 RDDのレコードごとに何を出力するために 'map()'を使い、結果として得られるRDDに 'newAPIHadoopRDD()'メソッドを使います。 –
あなたの提案Avihooに感謝します。残念ながら、新しいAPIハードウェアAPIはscでのみ動作します。つまり、sc.newAPIHadoopRDD()を使用する必要があり、ドライバで実行されます。また、クラスター上で動作させたいと思っていました。新しいAPIHadoopRDD()をワーカーにシリアライズし、処理が並行するようにする方法があることを期待していました。 – Yogesh