2016-09-21 12 views
1

現在、カスタムプロセッサの実装でcsvレコードに変換を適用する必要があるNiFiフローで作業しています。Apache NiFi - カスタムプロセッサで複数のスレッドを設定するNullPointerException

実行中のベンチマーク中にこのような現象が発生しました。各カスタムプロセッサにスレッドが1つしか割り当てられていない場合、すべて正常に動作します。カスタムプロセッサにさらにスレッドを割り当てると、はjava.lang.NullPointerExceptionのためセッションを処理できませんでした。

エラーは単一のスレッドでは再現できないため、フローファイルの処理に関するいくつかの問題、またはNiFiのいくつかの側面については気づいていません。

フローファイル属性へのアクセスが処理されます。フローファイルの内容は決して読み取られず、いくつかの属性を追加した後に出力フローファイルが返されます。以下は、プロセッサ・コードの関連部分の抜粋です:

@Override 
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { 

    // Will hold all the processed attributes 
    Map<String, String> processedAttributes = new HashMap<>(); 
    FlowFile flowfile = session.get(); 
    ... 
    // Adds the attributes to the flowfile 
    flowfile = session.putAllAttributes(flowfile, processedAttributes); 
    session.transfer(flowfile, PROCESSED); 
    } 

私はm4.4xlargeのAmazon EC2インスタンスにNiFi 0.7を実行しています。私は高性能を求めています(誰もいない)ので、スレッド数を増やす安全な方法を探しています。どんな提案も本当に感謝しています。

ありがとうございます。

+1

関連するスタックトレースを提供し、参照されるコードが提供されていることを確認できますか? – apiri

答えて

2

session.get()メソッドがいくつかのスレッドでnullを返すことは可能です(特に複数スレッド/並行タスクの場合)。これは、利用可能なフローファイルがあるために2つ以上のスレッドがスケジュールされている場合に発生し、1つのスレッドが(session.get()を介して)フローファイルを取得し、次のスレッドはnullを取得します。

あなたの場合、フローファイルを読み書きしていないかもしれませんが、他のメソッド(session.putAllAttributes()など)はフローファイルのメソッドを呼び出します。多くのプロセッサは、flowfile == nullかどうかを確認するためのチェックを追加します(フローファイルが動作することを望んでいるため)ので、問題が解決される可能性があります。

+0

1つのカスタムプロセッサで最初に別々に試してみましたが、NullPointerExceptionをスローしませんでした。すべてのコードを変更しましたが、例外は表示されません。好奇心のために、NiFiはキュー内のスレッドの並行性をどのように扱いますか?ご協力ありがとうございました – riccamini

関連する問題