カスタムのNiFi
プロセッサを作成して、ESRi ASCII grid files
を読み取って、CSV
のような表現を、ファイルごとにいくつかのメタデータとWKT形式のジオリファレンスされたユーザデータで返すことができます。カスタムのnifiプロセッサ - フローファイルの書き込み
残念ながら、解析結果は更新されたフローファイルとして書き戻されません。
https://github.com/geoHeil/geomesa-nifi/blob/rasterAsciiGridToWKT/geomesa-nifi-processors/src/main/scala/org/geomesa/nifi/geo/AsciiGrid2WKT.scala#L71-L107これをNiFiで実現しようとしています。
残念ながら、元のファイルのみが返されます。変換された出力は永続化されません。
手動のようないくつかのCSV文字列をシリアル化するためにそれを適応しようとしている:
val lineSep = System.getProperty("line.separator")
val csvResult = result.map(p => p.productIterator.map{
case Some(value) => value
case None => ""
case rest => rest
}.mkString(";")).mkString(lineSep)
var output = session.write(flowFile, new OutputStreamCallback() {
@throws[IOException]
def process(outputStream: OutputStream): Unit = {
IOUtils.write(csvResult, outputStream, "UTF-8")
}
})
はまだflowfliesが書かれていません。上記の問題が続くか、outputStreamのStream not closed例外が発生します。
これは欠けている小さなビットでなければなりませんが、欠けているビットを見つけることができません。
私はダミーコードのアウトラインとしてcsvに手動でシリアル化する必要がありますか?または私のためにAvroへのケースクラスのシリアル化をwifiが処理しますか? –
あなたは自分でそうするべきです。 nifiの場合、ファイルの内容は単なるバイナリストリームです。おそらく、データベースのシリアライゼーションがどのように行われているかをnifiのhttps://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/mainで確認できます/java/org/apache/nifi/processors/standard/util/JdbcCommon.java – daggett