Spark Structured StreamingでS3バケットからデータを読み取ろうとしています。以下のコードは、既存のデータを取得するために機能します。しかし、新しいデータがバケットに追加されると、Sparkはこれを選択しません。Spark Structured Streamingで新しいデータがS3から取得されない
val lines = spark.readStream.schema(schemaImp).format("com.databricks.spark.avro").load("s3n://bucket/*")
val query = lines.writeStream.outputMode("append").format("memory").queryName("memtable").start()
query.processAllAvailable()
spark.sql("select * from memtable").show()
新しいデータをフェッチする方法を教えてください。または、これはまだサポートされていない機能ですか?
ローカルファイルシステムを試してみましたが、これは同じ動作です。私はメモリ内のテーブルに出力をストリームすると推測していますが、出力は新しいデータを取得しません。私は入力上の複数の集約操作を防ぐ構造化ストリーミングの制限を回避しようとしています。私がアップデートを受け取るために出力ストリーミングを得ることができるなら、私は任意の数の集約操作を行うことができます。これが理にかなってほしい。 – Kaptrain
@Kaptrainログはありますか?これはバグのようです。 – zsxwing
出力ログを[このリンク](https://www.dropbox.com/s/nauda93y6nzz154/S3Table.log?dl=0)にアップロードしました。小さなテーブルが表示されている部分がS3の第1ファイルから取り出されます。その下の行は、S3に新しいファイルを追加した後に生成されます。したがって、Sparkは新しいファイルを検出しますが、出力テーブルへの更新はストリーミングしません。 – Kaptrain