2017-02-02 16 views
5

spark構造化ストリーミングドキュメントから: "このチェックポイントの場所は、HDFS互換ファイルシステム内のパスでなければならず、起動時にDataStreamWriterのオプションとして設定できますクエリ。Apache Spark(Structured Streaming):S3チェックポイントのサポート

そして案の定、S3のパスにチェックポイントを設定するとスローされます。

17/01/31 21:23:56 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020 
java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020 
     at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:652) 
     at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194) 
     at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106) 
     at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305) 
     at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301) 
     at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) 
     at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301) 
     at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430) 
     at org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51) 
     at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100) 
     at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) 
     at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) 
     at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) 
     at com.roku.dea.spark.streaming.FactDeviceLogsProcessor$.main(FactDeviceLogsProcessor.scala:133) 
     at com.roku.dea.spark.streaming.FactDeviceLogsProcessor.main(FactDeviceLogsProcessor.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637) 
17/01/31 21:23:56 INFO SparkContext: Invoking stop() from shutdown hook 

ここで質問のカップル:

  1. サポートをストリーミングチェックポイントディレクトリ(通常の火花としてサポートされていませんS3れるのはなぜこの)?ファイルシステムを "HDFSに準拠させる"には何がありますか?
  2. 私はHDFSを使用します(クラスタは常に上がったり下がったりする可能性があるので)。すべてのデータを保持する場所としてs3を使用します。このような設定で構造化ストリーミングデータのチェックポイントデータを格納するための推奨事項は?
+0

ここで純粋に推測しますが、s3nまたはs3a(好ましくはs3a)プロトコルを試してみましたか? – ImDarrenG

+0

確かに価値があり、試してみてください。 – Apurva

答えて

2

FS HDFSを「対応していますか?それはファイルシステムであり、振る舞いはHadoop FS specificationで指定されています。特にS3のオブジェクトストアとFSとの間の差が被覆されて、キーポイントがあると「追記又はOせず、最終的に一貫したオブジェクトストア(1)原子リネームが準拠していない」

  1. にです一貫性がありません:新しいBLOBを作成した後、リストコマンドはそれを表示しないことがよくあります。削除と同じです。ブロブが上書きされたり削除された場合
  2. 、それは)(離れ
  3. 名前の変更を行くためにしばらく時間がかかることができますコピーによって実現され、その後、場所にすべてを保存して、名前の変更によって

スパークストリーミングチェックポイントを削除しますそれをチェックポイントディレクトリにコピーします。これにより、S3のデータのコピーを実行する時間に比例するチェックポイントの時間が得られます。これは〜6-10 MB/sです。

ストリーミングコードの現在のビットはs3には適していませんが、ある時点ではそれを修正できるかもしれませんが、私は古いパッチをコミットするまで新しいパッチを出すことはありません。それが無視されるだけであれば、私はこのことに取り組んでいるわけではありません。今の

、HDFSへ

  • チェックポイントの1、その後は、EBSのビットへ
  • チェックポイントが割り当てられた結果をコピーし、S3へのクラスタ
  • チェックポイントに添付ますが、持っていますチェックポイントの間隔が長いため、ストリーミングアプリがダウンすることはありません。

EMRを使用している場合は、一貫性のあるダイナモDBバックアップS3に対してプレミアムを支払うことができます。しかし、コピー時間は同じです。したがって、チェックポイントは遅くなります。

+0

チェックポイントとS3との間に40秒の間隔があり、一時ディレクトリが書き込まれていて見つからないなどのチェックポイントの問題があります。 –

+0

チェックポイントが見つからない場合はおそらくs3の一貫性の高いサーフェスです。リスティングはオブジェクトストアの変更に遅れがちです。通常、あなたは気付かないが、時にはそれが表れる。ダイナモをメタデータストアに使用するとうまくいくはずです...少なくともそうでなければ、間違って実装しています –

4

これは既知の問題です:https://issues.apache.org/jira/browse/SPARK-19407

は、次のリリースで修正される予定です。回避策として--conf spark.hadoop.fs.defaultFS=s3を使用して、デフォルトのファイルシステムをs3に設定できます。

+0

これはまだ解決されていないと思わないでください。まだS3で構造化されたストリーミングをチェックポイントできません(spark 2.1.1)。 チェックポイント回復はして失敗します。 7/06/29午前0時29分00秒INFO StateStoreCoordinatorRef:登録StateStoreCoordinatorエンドポイント org.apache.spark.sql.AnalysisException:このクエリは、チェックポイントの位置からの回復をサポートしていません。 – Apurva

+0

これは別の問題です。あなたは回復をサポートしていない "メモリ"または "コンソール"を使用していますか? – zsxwing

2

この問題はhttps://issues.apache.org/jira/browse/SPARK-19407に修正されています。

しかし、S3で最終的な一貫性がないため、構造化ストリーミングのチェックポイント設定がS3でうまく機能しません。チェックポイント処理のためにS3を使用することは良い考えではありませんhttps://issues.apache.org/jira/browse/SPARK-19013

マイケルアームバースト氏は、これはSparkでは修正されないと言いました。解決策は、S3guardが実装されるまで待つことです。 S3Guardはいつか離れています。

関連する問題