2016-05-24 2 views
1

私のFlinkコードでは、HDFSフォルダにあるファイルをストリーミングしていますが、 "(そのようなファイルやディレクトリはありません)"というエラーが表示されますが、私はバッチメソッドで同じものを使用していて、すべてのものがスムーズに動作したので、アドレスは正しいです。 問題の原因を知っている人はいますか?あなたは、Javaの正規FileInputStreamとHDFS内のファイルにアクセスしようApache FlinkのHDFSアドレスからファイルをストリーミングする

public class MyObjectGenerator implements SourceFunction<MyObject> { 

    private String dataFilePath; 
    private float servingSpeedFactor; 
    private Integer rowNo ; 
    private transient BufferedReader reader; 
    private transient InputStream inputStream; 

    public MyObjectGenerator(String dataFilePath) { 
     this(dataFilePath, 1.0f); 
    } 

    public MyObjectGenerator(String dataFilePath, float servingSpeedFactor) { 
     this.dataFilePath = dataFilePath; 
     this.servingSpeedFactor = servingSpeedFactor; 
     rowNo = 0 ; 
    } 

    @Override 
    public void run(SourceContext<MyObject> sourceContext) throws Exception { 
     long servingStartTime = Calendar.getInstance().getTimeInMillis(); 
     inputStream = new DataInputStream(new FileInputStream(dataFilePath)); 
     reader = new BufferedReader(new InputStreamReader(inputStream)); 
     String line; 
     long dataStartTime; 
     rowNo++; 
     if (reader.ready() && (line = reader.readLine()) != null) { 
      MyObject myObject = MyObject.fromString(line); 
      if (febrlObject!= null) 
      sourceContext.collect(myObject); 
     } else { 
      return; 
     } 
     while (reader.ready() && (line = reader.readLine()) != null) { 
      MyObject myObject = MyObject.fromString(line); 
      sourceContext.collect(febrlObject); 
     } 
     this.reader.close(); 
     this.reader = null; 
     this.inputStream.close(); 
     this.inputStream = null; 
    } 

    @Override 
    public void cancel() { 
     try { 
      if (this.reader != null) { 
       this.reader.close(); 
      } 
      if(this.inputStream != null) { 
       this.inputStream.close(); 
      } 
     } catch (IOException ioe) { 
      // 
     } finally { 
      this.reader = null; 
      this.inputStream = null; 
     } 
    } 
} 

答えて

2

DataStream<FebrlObject> myStream = 
env.addSource(new MyObjectGenerator("hdfs://../Data/Dataset1.csv")); 

とその関連クラス: はここに私のコードです。 FileInputStream can only access the local file system. It does not know anything about talking to HDFS. You need to use the HDFS client to read files from HDFS. See Flink's FileInputFormat`を例として示しています。

しかし、可能であれば、これを自分で実装することは避けようとします。 FlinkのFileInputFormatを使用して、ファイル行を読み込み(DataStream<String>を返す)、行を解析する連続した(フラットな)マッパーを読み込もうとする可能性があります。

関連する問題