2017-03-29 18 views
2

複数のキーと値のペアを持つJSONドキュメントを取得しているNifiフローに取り組んでいます。私はpythonExecuteScriptプロセッサを使用しています。Nifi ExecuteScriptプロセッサを使用して複数のフローファイルを生成

私の目標は、JSON keysにさまざまなURLベースを作成することです。キーは数値であり、彼らは次のようになります。

keys = [10200, 10201, 10202, ...] 

私が欲しいのURLは3種類のものであり、彼らこれらのようになります。

http://google.com/10200 
http://bing.com/10200 
http://yahoo.com/10200 

私は私のkeys[]をループにしようとしていますし、それに含まれる数値キーごとに3つのURLを作成します。私は次のコードを試しています:

リストから数字キーを読み取る--> 3つのURLを作成-->フローファイルを吐き出します。

......と、リスト内の次の数字キーを読み、ループ保つ.....

私は、次のコードを持っていますが、私はそれを与えるときJSONはそれが右の何もしないflowfile今。誰かが私が間違っていることを教えてもらえますか?

import json 
import java.io 
from org.apache.commons.io import IOUtils 
from java.nio.charset import StandardCharsets 
from org.apache.nifi.processor.io import StreamCallback 

class ModJSON(StreamCallback): 

    def __init__(self): 
     self.parentFlowFile = None 
     pass 
    def process(self, inputStream, outputStream): 
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) 
    obj = json.loads(text) 
    flowfiles_list = [] 

    outputStream.write(bytearray(json.dumps(obj.keys(), indent=4).encode('utf-8'))) 


    for numerical_key in obj.keys(): 
     # create 1 flowfile for each numerical_key. Each flow file should have 3 url attributes 
     flowFile = session.create(self.parentFlowFile) 
     if (flowFile != None): 
     flowFile = session.write(flowFile, "Does not matter") 
     flowFile = session.putAttribute(flowFile, "google", "http://google.com/"+ numerical_key) 

     flowFile = session.putAttribute(flowFile, "google", "http://bing.com/"+ numerical_key) 

     flowFile = session.putAttribute(flowFile, "google", "http://yahoo.com/"+ numerical_key) 
     flowfiles_list.append(flowFile) 

    for flow in flowfiles_list: 
     session.transfer(flow, REL_SUCCESS) 

答えて

5

良い質問ですが、これはフローファイルAPIへのコールバックアプローチのニュアンスです。 StreamCallbackのサブクラスを作成しましたが、入力フローファイルを取得していないか、クラスのインスタンスを介してコンテンツを上書きするために使用していません。

あなたModJSONクラスの定義の後にこれを試してみてください:

originalFlowFile = session.get() 
if(originalFlowFile != None): 
    originalFlowFile = session.write(flowFile, ModJSON()) 
    session.remove(originalFlowFile) 

これは、入力フローファイルを取得(または1つが表示されるまで待つ)します、そして、あなたのフローファイルの内容を上書きするためにあなたのStreamCallbackを呼び出します。私の例では、あなたの入力フローファイルを破棄します。それがあなたのユースケースの正しい振る舞いであれば、代わりにStreamCallbackの代わりにInputStreamCallbackを拡張し、何にもoutputStreamを使用していない場合はoutputStream.write() 。これを行うには、StreamCallbackをInputStreamCallbackに置き換え、process()メソッドから「outputStream」パラメータを削除します。

例では、上記のスニペットを追加すると、入力コンテンツをjson.dumps()コマンドで上書きし、新しいファイルを作成して転送し、すべて同じ関係(成功)にします。同じフォーマットでないと問題が発生する可能性があります(これがsession.remove()を追加した理由です)。元のフローファイルが残りの部分と異なる関係になる必要がある場合は、ExecuteScriptではなくInvokeScriptedProcessorと考えてください。処理後(URL属性を追加した後)に入力フローファイルを気にしない場合は、上記の私の提案に従ってください。それらはすべて同じ関係(成功)を出て行くことができるならば、(私のsession.removeを置き換える)

session.transfer(originalFlowFile, REL_SUCCESS) 

と(Jythonの中にこれらの使用例より多くの例については、私のExecuteScriptの料理本の記事(3のpart 2)を確認し、他の言語):)

関連する問題