2017-01-09 4 views
1

データをS3バケットにプッシュする小さなテストプロジェクトがあります。しかし、エラーjava.io.IOException: No file system found with scheme s3aが出ているので、core-site.xmlファイルを読んでいないようです。 core-site.xmlファイルを正しく読み込んでS3にデータをプッシュするにはどうすればよいですか?Apache FlinkでデータをS3にプッシュ

これはコードです:

public class S3Sink { 
public static void main(String[] args) throws Exception { 
    Map<String, String> configs = ConfigUtils.loadConfigs(“path/to/config.yaml"); 

    final ParameterTool parameterTool = ParameterTool.fromMap(configs); 

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.getConfig().disableSysoutLogging(); 
    env.getConfig().setGlobalJobParameters(parameterTool); 

    DataStream<String> messageStream = env 
      .addSource(new FlinkKafkaConsumer09<String>(
        parameterTool.getRequired("kafka.topic"), 
        new SimpleStringSchema(), 
        parameterTool.getProperties())); 

    String id = UUID.randomUUID().toString(); 
    messageStream.writeAsText("s3a://flink-test/" + id + ".txt").setParallelism(1); 

    env.execute(); 
} 

これは、コア-site.xmlファイル参照するためにFLINK-conf.yamlファイルの構成変更である:

fs.hdfs.hadoopconf: /path/to/core-site/etc/hadoop/ 

これは私のコアですが-site.xml:

<configuration> 
<property> 
    <name>fs.defaultFS</name> 
    <value>hdfs://localhost:9000</value> 
</property> 
<property> 
    <name>fs.s3.impl</name> 
    <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value> 
</property> 

<!-- Comma separated list of local directories used to buffer 
    large results prior to transmitting them to S3. --> 
<property> 
    <name>fs.s3a.buffer.dir</name> 
    <value>/tmp</value> 
</property> 

<!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants --> 
<property> 
    <name>fs.s3a.awsAccessKeyId</name> 
    <value>*****</value> 
</property> 
<!-- set your AWS access key --> 
<property> 
    <name>fs.s3a.awsSecretAccessKey</name> 
    <value>*****</value> 
</property> 

+0

'fs.hdfs.hadoopconf'を' core-site.xml'ファイルのフォルダーに設定するとどうなりますか? '$ HADOOP_HOME'環境変数が正しく設定されていることを確認してください。 –

+0

私はIntelliJを使用しており、環境変数HADOOP_HOMEをcore-site.xmlパスに設定しています。私はプログラムをローカルで実行しているので、fs.hdfs.hadoopconfの設定は役に立ちません。 – Sam

答えて

1

core-site.xmlファイルが読み込まれなかったのは、Hadoopのファイル構造のためです。私はHADOOP_HOME=path/to/dir/etc/hadoopだった。しかし、Hadoopはcore-site.xmlを見つけるためにファイル構造の一部としてetc/hadoopを探します。 HADOOP_HOME環境変数でパスを正しく読み取るには、HADOOP_HOME=path/to/dirと表示されます。

他の問題は、データがS3にプッシュされていない理由です。私はストリーム処理を使用していたからです。バッチ処理はS3にデータをプッシュするために機能しますが、S3がデータをキー/値ストアとして格納し、新しいデータを置換するだけでは置換できないため、ストリーム処理が行われません。ストリーム処理の場合、FlinkはS3に許可されない同じファイルにデータを追加し続けるので、S3にデータがプッシュされることはありません。したがって、このコードはバッチをS3にプッシュするために働きます

ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment(); 
    DataSet dataSet = ee.readTextFile("/Users/name/Desktop/flinkoutputtest.txt"); 
    dataSet.writeAsText("s3://flink-test/flink-output/testdoc.txt").setParallelism(1); 
    ee.execute(); 
関連する問題