私のFlinkコードでは、HDFSフォルダにあるファイルをストリーミングしていますが、 "(そのようなファイルやディレクトリはありません)"というエラーが表示されますが、私はバッチメソッドで同じものを使用していて、すべてのものがスムーズに動作したので、アドレスは正しいです。 問題の原因を知っている人はいますか?あなたは、Javaの正規FileInputStream
とHDFS内のファイルにアクセスしようApache FlinkのHDFSアドレスからファイルをストリーミングする
public class MyObjectGenerator implements SourceFunction<MyObject> {
private String dataFilePath;
private float servingSpeedFactor;
private Integer rowNo ;
private transient BufferedReader reader;
private transient InputStream inputStream;
public MyObjectGenerator(String dataFilePath) {
this(dataFilePath, 1.0f);
}
public MyObjectGenerator(String dataFilePath, float servingSpeedFactor) {
this.dataFilePath = dataFilePath;
this.servingSpeedFactor = servingSpeedFactor;
rowNo = 0 ;
}
@Override
public void run(SourceContext<MyObject> sourceContext) throws Exception {
long servingStartTime = Calendar.getInstance().getTimeInMillis();
inputStream = new DataInputStream(new FileInputStream(dataFilePath));
reader = new BufferedReader(new InputStreamReader(inputStream));
String line;
long dataStartTime;
rowNo++;
if (reader.ready() && (line = reader.readLine()) != null) {
MyObject myObject = MyObject.fromString(line);
if (febrlObject!= null)
sourceContext.collect(myObject);
} else {
return;
}
while (reader.ready() && (line = reader.readLine()) != null) {
MyObject myObject = MyObject.fromString(line);
sourceContext.collect(febrlObject);
}
this.reader.close();
this.reader = null;
this.inputStream.close();
this.inputStream = null;
}
@Override
public void cancel() {
try {
if (this.reader != null) {
this.reader.close();
}
if(this.inputStream != null) {
this.inputStream.close();
}
} catch (IOException ioe) {
//
} finally {
this.reader = null;
this.inputStream = null;
}
}
}