2017-10-24 15 views
0

特定の名前を含むHDFSディレクトリからこれらのファイルのみを取得したいとします(このような名前を持つ複数の応答ファイルを2017-090-0.1、 2017-090 -0.2,2017-090-0.3など。私はflowFileの特定の量がhdfsに置かれていることを確認したい(私は関連する名前で3つの要求を送る場合、私はHDFSで3つの応答を出したかどうかを確認する必要がある)このような場合のために、私は以下のコードを使用しますが、それは私がに興味を持っていくつかのsubejectsがあるファイルを取得DEOSN'T:?Groovy:filePatternに基づいてHDFSから特定のファイルを取得する方法

  1. することは、possbile nifi機能により、グルーヴィーなコードなしでこのタスクを作ることです
  2. 私はこのコード作業を行うために何を変更する必要がありますか?

    import org.apache.commons.io.IOUtils 
    import java.nio.charset.StandardCharsets 
    def flowFile= session.get(1);// i gues it will return list of 
    flowfiles 
    def name=""; 
    def count=0; 
    def value=0; 
    def amount=0; 
    List<FlowFile> flowFiles = new ArrayList<FlowFile>(); 
    for(def n in flowFile){ 
    name=n.getAttribute("realName") 
    count=n.getAttribute("count") 
        value=count as Number 
        value=Math.round(value) 
    } 
    session.remove(flowFile) 
    def findFileRecursive(String directoryName, String filePattern) { 
    def fileFound 
    def directory = new File(directoryName) 
    if (directory.isDirectory()){ 
    def findFilenameClosure = { if (filePattern.matcher(it.name).find()){ fileFound = it } } 
    directory.eachFileRecurse(findFilenameClosure) 
    } 
    amount++; 
    flowFiles.add(fileFound); 
    return fileFound 
    } 
    String filePattern=filePattern.contains(name) 
    String directoryName="/group/test/userDate"; 
    findFileRecursive(directoryName,filePattern); 
    
    if(amount==count){ 
    for(def m in flowFiles){ 
    session.transfer(m,REL_SUCCESS); 
    } 
    
    } 
    

答えて

4

あなたは検証可能取り組んコードとルートのいずれかsuccessまたはfailure関係になるflowfileでHDFSストレージからこれらのファイルを取得しますGetHDFSプロセッサを使用することができます。このタスクを実行するためのカスタムコードを書く必要はありません。 PutHDFSは、必要に応じてHDFSへのライトバックを実行します。

+0

getHdfsでファイルパターンを使用するにはどうすればいいですか?ディレクトリアドレスとハープ・コンフィグレーションの詳細を書き込むことができます。 –

+0

名前に「2017-01-01」という文字列が含まれているファイルを取得する必要があります。 GetHdfs –

+0

getHdfsプロセッサからflowfilesを取得したときに、このようなコードを書くことができます(flowFile.getAttribute( 'filename')。substring(0,10)== "2017-01-01"){session.transfer(flowfile、 REL_SUCCESS)} else {session.rollback(flowFile)} –

関連する問題