2017-05-28 14 views
0

カスタムのNiFiプロセッサを作成して、ESRi ASCII grid filesを読み取って、CSVのような表現を、ファイルごとにいくつかのメタデータとWKT形式のジオリファレンスされたユーザデータで返すことができます。カスタムのnifiプロセッサ - フローファイルの書き込み

残念ながら、解析結果は更新されたフローファイルとして書き戻されません。

https://github.com/geoHeil/geomesa-nifi/blob/rasterAsciiGridToWKT/geomesa-nifi-processors/src/main/scala/org/geomesa/nifi/geo/AsciiGrid2WKT.scala#L71-L107これをNiFiで実現しようとしています。

残念ながら、元のファイルのみが返されます。変換された出力は永続化されません。

手動のようないくつかのCSV文字列をシリアル化するためにそれを適応しようとしている:

val lineSep = System.getProperty("line.separator") 
    val csvResult = result.map(p => p.productIterator.map{ 
    case Some(value) => value 
    case None => "" 
    case rest => rest 
    }.mkString(";")).mkString(lineSep) 

    var output = session.write(flowFile, new OutputStreamCallback() { 
    @throws[IOException] 
    def process(outputStream: OutputStream): Unit = { 
     IOUtils.write(csvResult, outputStream, "UTF-8") 
    } 
    }) 

はまだflowfliesが書かれていません。上記の問題が続くか、outputStreamのStream not closed例外が発生します。

これは欠けている小さなビットでなければなりませんが、欠けているビットを見つけることができません。

答えて

3

session.write()のようなフローファイルを変更する各セッションメソッドは新しいバージョンのファイルを返し、この新しいバージョンを転送する必要があります。

converterIngester()関数でファイルを変更した場合、この新しいバージョンを呼び出し元関数に戻して関係に転送する必要があります。

+0

私はダミーコードのアウトラインとしてcsvに手動でシリアル化する必要がありますか?または私のためにAvroへのケースクラスのシリアル化をwifiが処理しますか? –

+2

あなたは自分でそうするべきです。 nifiの場合、ファイルの内容は単なるバイナリストリームです。おそらく、データベースのシリアライゼーションがどのように行われているかをnifiのhttps://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/mainで確認できます/java/org/apache/nifi/processors/standard/util/JdbcCommon.java – daggett

関連する問題