1

私は、IntelliJ ideaから次のコードを実行して、kafkaからコンソールにメッセージを出力しようとしています。しかし、次のエラーがスローされます。スパークストラクチャードストリーミング -

スレッド "main"の例外org.apache.spark.sql.AnalysisException:ストリーミング元のクエリは、writeStream.start();で実行する必要があります。 kafka

StacktraceはDataset.checkpointから開始されています。 .checkPoint()を削除すると、アクセス権に関連するその他のエラーが発生する 17/08/02 12:10:52エラーStreamMetadata:ストリームメタデータの書き込みエラーStreamMetadata(4e612f22-efff-4c9a-a47a-a36eb533e9d6) C:/ Users/rp/AppData/Local/Temp/temporary-2f570b97-ad16-4f00-8356-d43ccb7660db/metadata java.io.IOException:(null)コマンド文字列内のエントリ:null chmod 0644 C:\ Users \ rp \のAppData \ローカル\ Tempに\一時2f570b97-ad16-4f00-8356-d43ccb7660dbの\メタデータ

def main(args : Array[String]) = { 
val spark = SparkSession.builder().appName("SparkStreaming").master("local[*]").getOrCreate() 
    val canonicalSchema = new StructType() 
          .add("cid",StringType) 
          .add("uid",StringType) 
          .add("sourceSystem", 
           new StructType().add("id",StringType) 
               .add("name",StringType)) 
          .add("name", new StructType() 
             .add("firstname",StringType) 
             .add("lastname",StringType)) 


    val messages = spark 
        .readStream 
        .format("kafka") 
        .option("kafka.bootstrap.servers","localhost:9092") 
        .option("subscribe","c_canonical") 
        .option("startingOffset","earliest") 
        .load() 
        .checkpoint() 
.select(from_json(col("value").cast("string"),canonicalSchema)) 
.writeStream.outputMode("append").format("console").start.awaitTermination 

} 

誰も私が間違ってやっているところに私は理解して助けてくださいすることができますか?

+0

IntelliJを管理者として実行してみてください。 –

+0

返信いただきありがとうございますが動作しませんでした。 –

答えて

1
  1. ストラクチャードストリーミングはDataset.checkpoint()をサポートしていません。 https://issues.apache.org/jira/browse/SPARK-20927

  2. IOExceptionはおそらくWindowsにcygwinをインストールしていないためです。

+0

構造化ストリーミングは 'checkpoint()'をサポートしませんが、 'option(" checkpointLocation "、"/path/to/store ")'をサポートします。それは何と言いますか? –

+0

答えを更新しました。残念ながら、彼らは同じ言葉を使っていますが、まったく違うものです。 – zsxwing

+0

'checkpointLocation'の実際の意味とどこが違うのかはどこにもありますか? –