2

私はスパークストラクチャ化ストリーミングを使用してKafkaキューからjsonデータを読み込んでいますが、私はElasticsearchにjsonデータを書き込む必要があります。SparkストラクチャードストリーミングForeachWriterがsparkContextを取得できない

しかし、ForeachWriter内でspsonContextを取得してjsonをRDDに変換することはできません。それはNPEを投げる。

jsonをRDDに変換するために、WriterでSparkContextを取得するにはどうすればよいですか?

+0

新しいSparkSessionを作成しているあなたが質問編集し、関連するコードを提供し、エラーますしてください入手した – Yaron

+0

なぜカフカは弾性に直接接続できませんか? –

+0

@ cricket_007スパークなし?私はMLで受信データを処理するためにSparkストリーミングを使用する必要があります –

答えて

0

できません。実行者で実行されるForeachWriterのメソッド。自分でElasticsearchシンクを作成するか、Elasticsearchの生APIを呼び出してデータを書き込むことができます。

+0

「elasticsearch-hadoopを使用すると、すべてのRDDをコンテンツをドキュメントに翻訳できる限り、Elasticsearchに保存できます」 https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html –

+0

シンクを作成しない限り、RDDを取得することはできません。 – zsxwing

+0

OPにKafka DStreamのRDDが必要です。 –

0

私はPS ForeachWriter

val writer = new ForeachWriter[CustomerData] { 

    override def open(partitionId: Long, version: Long) = true 
    override def process(value: CustomerData) = { 
     val spark = SparkSession 
     .builder() 
     .getOrCreate() //this works 
     ... 
    } 
    override def close(errorOrNull: Throwable) = {} 
} 

内SparkContextのインスタンスを取得することによって、問題を解決する:これはおそらく

+2

本当に効果があるかどうかチェックしましたか?エグゼキュータでSparkSessionを作成するため、多くのコードパスが壊れることをイメージできます。 – zsxwing

+0

@zsxwingはい、私はそれが新しいスパークセッションを作成しなかったと仮定しますが、現在のSparkSessionを返します - 'getOrCreate()' –

+0

エグゼキュータに使用可能なSparkSessionがないので、新しいものを作成します。また、いくつかの前提を壊すので、いくつかのランダムなエラーが発生することがあります。たとえば、「SparkEnv.get」はエグゼキュータで間違った値を返すことがあります。 – zsxwing

関連する問題