NiFi 1.1.1状態マネージャがデータを永続化/取得しない
State Managerを使用してバイト[]を永続化しようとしています。
private byte[] lsnUsedDuringLastLoad;
@Override
public void onTrigger(final ProcessContext context,
final ProcessSession session) throws ProcessException {
...
...
...
final StateManager stateManager = context.getStateManager();
try {
StateMap stateMap = stateManager.getState(Scope.CLUSTER);
final Map<String, String> newStateMapProperties = new HashMap<>();
newStateMapProperties.put(ProcessorConstants.LAST_MAX_LSN,
new String(lsnUsedDuringLastLoad));
logger.debug("Persisting stateMap : "
+ newStateMapProperties);
stateManager.replace(stateMap, newStateMapProperties,
Scope.CLUSTER);
} catch (IOException ioException) {
logger.error("Error while persisting the state to NiFi",
ioException);
throw new ProcessException(
"The state(LSN) couldn't be persisted", ioException);
}
...
...
...
}
例外は発生しないか、ログエラーエントリさえありません。プロセッサは引き続き実行されます。永続フィールドのために: 次負荷コードは常に({} statemapを取り出さ):null値を返します
try {
stateMap = stateManager.getState(Scope.CLUSTER);
stateMapProperties = new HashMap<>(stateMap.toMap());
logger.debug("Retrieved the statemap : "+stateMapProperties);
lastMaxLSN = (stateMapProperties
.get(ProcessorConstants.LAST_MAX_LSN) == null || stateMapProperties
.get(ProcessorConstants.LAST_MAX_LSN).isEmpty()) ? null
: stateMapProperties.get(
ProcessorConstants.LAST_MAX_LSN).getBytes();
logger.debug("Attempted to load the previous lsn from NiFi state : "
+ lastMaxLSN);
} catch (IOException ioe) {
logger.error("Couldn't load the state map", ioe);
throw new ProcessException(ioe);
}
ZKが故障しているか、国家地図を使用している間、私は何かを逃した場合、私は疑問に思って!
それは風のように働いた、しかし、私はまだ医者の背後に不思議です。 replace()メソッドの場合:| - 根拠は何でしょうか? –
推論は分かりませんが、置換実装ではif(stateMap.getVersion()== -1L){ //状態が一度も設定されていないのでfalseを返します false false; } –
StateManagerを使った後、replace()の背後にある考え方は、別のノードやタスクがその間に地図を更新したかどうかを制御することだと思います。この状況を考えてみましょう。あなたのタスクの1つが現在の状態を取得し、それを使って何かを行い、後でそれを更新します。このタスク1は、別のタスク(タスク2)によって更新された状態を保持していました。タスク1からの更新は、タスク2からの更新を無効にします。 – jboi