S3

2017-04-11 2 views
0

でBucketingSinkの統合ApacheのFLINKは、それは、Apache FLINKに付属のS3にデータを書き込むようにすることをBucketingSinkを使用することは可能ですか?S3

私はURLのいくつかの組み合わせを試してみましたが、私はS3

例えばとどこに行くように見えることはできませんS3://バケット/パス/に/フォルダ

S3 EMR 5.4.0に展開ではなく、ときに私は、HDFSに書き込むことができます。

ドキュメントは、潜在的な統合としてS3を言及していないが、私はそれがネイティブにサポートされていると仮定しています。

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/filesystem_sink.html

私はS3Aを使用している場合、私は次のエラーを取得する:// URL形式

java.lang.NoSuchMethodError: org.apache.http.params.HttpConnectionParams.setSoKeepalive(Lorg/apache/http/params/HttpParams;Z)V 
at com.amazonaws.http.HttpClientFactory.createHttpClient(HttpClientFactory.java:96) 
at com.amazonaws.http.AmazonHttpClient.<init>(AmazonHttpClient.java:187) 
at com.amazonaws.AmazonWebServiceClient.<init>(AmazonWebServiceClient.java:136) 
at com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:394) 
at com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:374) 
at com.amazonaws.services.s3.AmazonS3Client.<init>(AmazonS3Client.java:356) 
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:235) 
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2717) 
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:93) 
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2751) 
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2733) 
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:377) 
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295) 
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:417) 
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:351) 
at org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178) 
at org.apache.flink.streaming.api.functions.util.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160) 
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:106) 
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:225) 
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:666) 
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:654) 
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:257) 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) 
at java.lang.Thread.run(Thread.java:745) 

答えて

1

EMRは、ASFのHadoopからS3Aクライアントと10%の互換性がないことを行います。 Amaonのs3:// URLに固執する:サポートするものだ