2

私は遠隔地からHDFSへのデータオンボード、そしてHDFSからHiveテーブルへのデータ摂取に関して、Apache Spark実装に基づいています。私は場所でHDFSへのデータ/ファイルをonboardedしている私の最初の火花ジョブを使用してfs.rename(新しいパス(rawFileName)、新しいパス(processFileName))が機能していません。

は言う -

HDFS://sandbox.hortonworks.com:8020 /データ/分析/生/フォルダ

CT_Click_Basic.csvとCT_Click_Basic1.csv.gzファイルをオンボードした後、私はHDFS [共有位置のファイル名はここにフォルダ名となり、その内容はpart-xxxxxファイルに存在します]のファイルを持っているとしましょう。

[ルート@サンドボックス〜]#HDFS DFS -ls /データ/分析/生/ */ 見つかり3つの項目

-rw-rを - r--の3つのchauhan.bhupeshのHDFS 0 2017年7月27日15:02 /data/analytics/raw/CT_Click_Basic.csv/_SUCCESS

-rw-r-r-- 3 chauhan.bhupesh hdfs 8383 2017-07-27 15:02/data/analytics/raw/CT_Click_Basic .CSV /パート00000

-rw-R - r--の3つのchauhan.bhupeshのHDFS 8395 2017年7月27日午後03時02分/data/analytics/raw/CT_Click_Basic.csv/part-00001

が見つかりました。

-rw-R - r--の3つのchauhan.bhupeshのHDFS 0 2017年7月27日15時02 /data/analytics/raw/CT_Click_Basic1.csv.gz/_SUCCESS

-rw-r--のr--の3 chauhan.bhupeshは今私の別のスパークジョブを使用して16588 2017年7月27日15時02分/data/analytics/raw/CT_Click_Basic1.csv.gz/part-00000

をHDFS、私が移動したいですこれらのファイルは、/rawフォルダから/processまで、そして最後に/archiveというフォルダにあり、各ステージで実行されるタスクに基づいています。私が最初に次のコードを使用して/生フォルダの下に存在するすべてのファイルのリストを取得していますことを行うための

:、私は名前を変更しようとしているコードの次の行を使用して、その後

def listAllFilesFolderInDir(filePath:String,recursiveTraverse:Boolean,filePaths: ListBuffer[Path]) : ListBuffer[Path] = { 
val files = GlobalContext.hdfs.listStatus(new Path(filePath)) 
files.foreach { fileStatus => { 
      if(!fileStatus.isDirectory()) { 
       filePaths+=fileStatus.getPath()  
      } 
      else { 
       listAllFilesFolderInDir(fileStatus.getPath().toString(), recursiveTraverse, filePaths) 
      } 
     } 
    } 
    filePaths 
} 

をし、/移動/ rawフォルダの/ processフォルダへのファイル:

var inputDir = "/data/analytics/raw" 
var outputDir = "/data/analytics/process" 
var filePaths = new ListBuffer[Path]() 
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths) 
val fs= <Getting hdfs FileSystem Instance Here> 
for(path<-pathArray){ 
    var pathSplit = path.toString().split("/") 
    var pathSplitSize = pathSplit.size 
    val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1) 
    val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1) 
    fs.rename(new Path(rawFileName), new Path(processFileName)) 
} 

しかし、上記のコードを使用してこれらのファイルを移動/名前変更することはできません。コードをデバッグしようとしましたが、fs.rename()が私に "false"を返すことがわかりました。

ご注意:私は、私はに/データ/分析/生フォルダEX CT.csv [またはその他のファイル]手動で任意のファイルをコピーするときに、ファイル名変更/運動を達成することができるよ をしてからfs.renameを実行しています()が、Part-xxxxxファイルでは機能していません。

紛失しているものがありますか?

迅速なヘルプがあれば幸いです。

よろしく、 Bhupesh

答えて

0

最後に、私は問題があります。実際に/data/analytics/raw/folder.csv/part-xxxxxから/data/analytics/process/folder.csv/part-xxxxxにファイルの名前を変更しようとしていましたが、/ data/analytics/processはHDFSに存在していましたが、 folder.csv "は存在しませんでした。それゆえ、名前を変更している間に私を偽りにしていました。私のコードに次の行を追加して、うまく働いた

var inputDir = "/data/analytics/raw" 
var outputDir = "/data/analytics/process" 
var filePaths = new ListBuffer[Path]() 
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths) 
val fs= <Getting hdfs FileSystem Instance Here> 
for(path<-pathArray){ 
    var pathSplit = path.toString().split("/") 
    var pathSplitSize = pathSplit.size 

    val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1) 

    var processFolderName = outputDir + "/" + pathSplit(pathSplitSize-2) 
    var processFolderPath = new Path(processFolderName) 
    if(!(fs.exists(processFolderPath))) 
     fs.mkdirs(processFolderPath) 
    val processFileName = processFolderName + "/" + pathSplit(pathSplitSize-1) 
    fs.rename(new Path(rawFileName), new Path(processFileName)) 
} 
0

名前を変更することができ、新たなパス(rawFileName)が存在しない場合はfalseを返します。
fs.renameがそのファイルをチェック作る前には存在します

if (fs.exists(somePath)) { 
fs.rename... 
} 

もう一つの原因は、名前を変更しようとするファイルが誰かによって使用されている可能性があります。または、ディレクトリの名前を変更しようとすると、その中のいくつかのファイルが誰かによって使用される可能性があります。
これは根本的な原因は何か他のものあなたのコード内のファイル名前を変更しようとされていることを確認する:この名前の変更は、根本的な原因は、競合状態で本当に成功するかどう

var inputDir = "/data/analytics/raw" 
var outputDir = "/data/analytics/process" 
var filePaths = new ListBuffer[Path]() 
var pathArray = listAllFilesFolderInDir(inputDir, true, filePaths) 
val fs= <Getting hdfs FileSystem Instance Here> 
for(path<-pathArray){ 
    var pathSplit = path.toString().split("/") 
    var pathSplitSize = pathSplit.size 
    val rawFileName = inputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1) 
    val processFileName = outputDir + "/" + pathSplit(pathSplitSize-2) + "/" + pathSplit(pathSplitSize-1) 
    fs.rename(new Path("**/TESTDIR1**"), new Path("**/TESTDIR2**")) 
} 

を。

関連する問題