私は、IntelliJ ideaから次のコードを実行して、kafkaからコンソールにメッセージを出力しようとしています。しかし、次のエラーがスローされます。スパークストラクチャードストリーミング -
スレッド "main"の例外org.apache.spark.sql.AnalysisException:ストリーミング元のクエリは、writeStream.start();で実行する必要があります。 kafka
StacktraceはDataset.checkpointから開始されています。 .checkPoint()を削除すると、アクセス権に関連するその他のエラーが発生する 17/08/02 12:10:52エラーStreamMetadata:ストリームメタデータの書き込みエラーStreamMetadata(4e612f22-efff-4c9a-a47a-a36eb533e9d6) C:/ Users/rp/AppData/Local/Temp/temporary-2f570b97-ad16-4f00-8356-d43ccb7660db/metadata java.io.IOException:(null)コマンド文字列内のエントリ:null chmod 0644 C:\ Users \ rp \のAppData \ローカル\ Tempに\一時2f570b97-ad16-4f00-8356-d43ccb7660dbの\メタデータ
def main(args : Array[String]) = {
val spark = SparkSession.builder().appName("SparkStreaming").master("local[*]").getOrCreate()
val canonicalSchema = new StructType()
.add("cid",StringType)
.add("uid",StringType)
.add("sourceSystem",
new StructType().add("id",StringType)
.add("name",StringType))
.add("name", new StructType()
.add("firstname",StringType)
.add("lastname",StringType))
val messages = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("subscribe","c_canonical")
.option("startingOffset","earliest")
.load()
.checkpoint()
.select(from_json(col("value").cast("string"),canonicalSchema))
.writeStream.outputMode("append").format("console").start.awaitTermination
}
誰も私が間違ってやっているところに私は理解して助けてくださいすることができますか?
IntelliJを管理者として実行してみてください。 –
返信いただきありがとうございますが動作しませんでした。 –