私は遠隔地からHDFSへのデータオンボード、そしてHDFSからHiveテーブルへのデータ摂取に関して、Apache Spark実装に基づいています。私は場所でHDFSへのデータ/ファイルをonboardedしている私の最初の火花ジョブを使用してfs.rename(新しいパス(rawFileName)、新しいパス(processFileName))が機能していません。
は言う -
HDFS://sandbox.hortonworks.com:8020 /データ/分析/生/フォルダ
を
CT_Click_Basic.csvとCT_Click_Basic1.csv.gzファイルをオンボードした後、私はHDFS [共有位置のファイル名はここにフォルダ名となり、その内容はpart-xxxxxファイルに存在します]のファイルを持っているとしましょう。
[ルート@サンドボックス〜]#HDFS DFS -ls /データ/分析/生/ */ 見つかり3つの項目
-rw-rを - r--の3つのchauhan.bhupeshのHDFS 0 2017年7月27日15:02 /data/analytics/raw/CT_Click_Basic.csv/_SUCCESS
-rw-r-r-- 3 chauhan.bhupesh hdfs 8383 2017-07-27 15:02/data/analytics/raw/CT_Click_Basic .CSV /パート00000
-rw-R - r--の3つのchauhan.bhupeshのHDFS 8395 2017年7月27日午後03時02分/data/analytics/raw/CT_Click_Basic.csv/part-00001
が見つかりました。
-rw-R - r--の3つのchauhan.bhupeshのHDFS 0 2017年7月27日15時02 /data/analytics/raw/CT_Click_Basic1.csv.gz/_SUCCESS
-rw-r--のr--の3 chauhan.bhupeshは今私の別のスパークジョブを使用して16588 2017年7月27日15時02分/data/analytics/raw/CT_Click_Basic1.csv.gz/part-00000
をHDFS、私が移動したいですこれらのファイルは、/rawフォルダから/processまで、そして最後に/archiveというフォルダにあり、各ステージで実行されるタスクに基づいています。私が最初に次のコードを使用して/生フォルダの下に存在するすべてのファイルのリストを取得していますことを行うための
:、私は名前を変更しようとしているコードの次の行を使用して、その後
def listAllFilesFolderInDir(filePath:String,recursiveTraverse:Boolean,filePaths: ListBuffer[Path]) : ListBuffer[Path] = {
val files = GlobalContext.hdfs.listStatus(new Path(filePath))
files.foreach { fileStatus => {
if(!fileStatus.isDirectory()) {
filePaths+=fileStatus.getPath()
}
else {
listAllFilesFolderInDir(fileStatus.getPath().toString(), recursiveTraverse, filePaths)
}
}
}
filePaths
}
をし、/移動/ rawフォルダの/ processフォルダへのファイル:
var inputDir = "/data/analytics/raw"
var outputDir = "/data/analytics/process"
var filePaths = new ListBuffer[Path]()
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths)
val fs= <Getting hdfs FileSystem Instance Here>
for(path<-pathArray){
var pathSplit = path.toString().split("/")
var pathSplitSize = pathSplit.size
val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1)
fs.rename(new Path(rawFileName), new Path(processFileName))
}
しかし、上記のコードを使用してこれらのファイルを移動/名前変更することはできません。コードをデバッグしようとしましたが、fs.rename()が私に "false"を返すことがわかりました。
ご注意:私は、私はに/データ/分析/生フォルダEX CT.csv [またはその他のファイル]手動で任意のファイルをコピーするときに、ファイル名変更/運動を達成することができるよ をしてからfs.renameを実行しています()が、Part-xxxxxファイルでは機能していません。
紛失しているものがありますか?
迅速なヘルプがあれば幸いです。
よろしく、 Bhupesh