データをS3バケットにプッシュする小さなテストプロジェクトがあります。しかし、エラーjava.io.IOException: No file system found with scheme s3a
が出ているので、core-site.xmlファイルを読んでいないようです。 core-site.xmlファイルを正しく読み込んでS3にデータをプッシュするにはどうすればよいですか?Apache FlinkでデータをS3にプッシュ
これはコードです:
public class S3Sink {
public static void main(String[] args) throws Exception {
Map<String, String> configs = ConfigUtils.loadConfigs(“path/to/config.yaml");
final ParameterTool parameterTool = ParameterTool.fromMap(configs);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setGlobalJobParameters(parameterTool);
DataStream<String> messageStream = env
.addSource(new FlinkKafkaConsumer09<String>(
parameterTool.getRequired("kafka.topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));
String id = UUID.randomUUID().toString();
messageStream.writeAsText("s3a://flink-test/" + id + ".txt").setParallelism(1);
env.execute();
}
これは、コア-site.xmlファイル参照するためにFLINK-conf.yamlファイルの構成変更である:
fs.hdfs.hadoopconf: /path/to/core-site/etc/hadoop/
これは私のコアですが-site.xml:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<!-- Comma separated list of local directories used to buffer
large results prior to transmitting them to S3. -->
<property>
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
<!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
<property>
<name>fs.s3a.awsAccessKeyId</name>
<value>*****</value>
</property>
<!-- set your AWS access key -->
<property>
<name>fs.s3a.awsSecretAccessKey</name>
<value>*****</value>
</property>
'fs.hdfs.hadoopconf'を' core-site.xml'ファイルのフォルダーに設定するとどうなりますか? '$ HADOOP_HOME'環境変数が正しく設定されていることを確認してください。 –
私はIntelliJを使用しており、環境変数HADOOP_HOMEをcore-site.xmlパスに設定しています。私はプログラムをローカルで実行しているので、fs.hdfs.hadoopconfの設定は役に立ちません。 – Sam