2016-11-15 16 views
4

私はkafka(バージョン10)からsparkでメッセージを読み込み、それを印刷しようとしています。スレッドでのエラー例外を取得ストリーミングソースのクエリは、writeStream.start()で実行する必要があります。

 import spark.implicits._ 

     val spark = SparkSession 
       .builder 
       .appName("StructuredNetworkWordCount") 
       .config("spark.master", "local") 
       .getOrCreate() 

      val ds1 = spark.readStream.format("kafka") 
    .option("kafka.bootstrap.servers", "localhost:9092") 
.option("subscribe", "topicA") .load() 
      ds1.collect.foreach(println) 
     ds1.writeStream 
      .format("console") 
      .start() 
      ds1.printSchema() 

「メイン」org.apache.spark.sql.AnalysisException:ストリーミングソースとのクエリは)(writeStream.startで実行する必要があります;;

答えて

5

あなたはクエリプランを分岐している:あなたがしようとしている同じDS1からに:

  • ds1.collect.foreach(...)
  • ds1.writeStream.format(...){...}

しかし、あなたが唯一残して、第二分岐に.start()を呼び出しています終了せずに他のダングリング、それはあなたが戻ってくる例外をスローします。

+0

だから何が修正されていますか? – user1870400

+0

'.start()'両方のブランチ?それはダウンボートですか? – ssice

+0

私はここでコメントを2番目にします。ここで適切な解決策を得ることができますか?たぶんコードサンプルですか?ありがとう! – DataGeek

1

私は、次のコードを使用して同じissue.i固定問題に直面していました。修正された問題に役立つかもしれません。

val df = session 
    .readStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers", brokers) 
    .option("subscribe", "streamTest2") 
    .load(); 

    val query = df.writeStream 
    .outputMode("append") 
    .format("console") 
    .start() 
query.awaitTermination() 
0

エラーメッセージについて読ん

org.apache.spark.sql.AnalysisException:ストリーミングソースとクエリ )writeStream.start(で実行されなければならない;;

私はこれを見つけました。それはそれをうまく説明し、別のものを提供します。私は自分でそれを試して、それが私のために働くなら結果を投稿します。

関連する問題