2017-03-20 10 views
1

NiFi 1.1.1状態マネージャがデータを永続化/取得しない

State Managerを使用してバイト[]を永続化しようとしています。

private byte[] lsnUsedDuringLastLoad; 


@Override 
    public void onTrigger(final ProcessContext context, 
      final ProcessSession session) throws ProcessException { 
... 

... 

... 
​final StateManager stateManager = context.getStateManager(); 
try { 
StateMap stateMap = stateManager.getState(Scope.CLUSTER); 
final Map<String, String> newStateMapProperties = new HashMap<>(); 
newStateMapProperties.put(ProcessorConstants.LAST_MAX_LSN, 
new String(lsnUsedDuringLastLoad)); 
logger.debug("Persisting stateMap : " 
+ newStateMapProperties); 
stateManager.replace(stateMap, newStateMapProperties, 
Scope.CLUSTER); 
} catch (IOException ioException) { 
logger.error("Error while persisting the state to NiFi", 
ioException); 
throw new ProcessException(
"The state(LSN) couldn't be persisted", ioException); 
} 

... 
... 
... 
} 

例外は発生しないか、ログエラーエントリさえありません。プロセッサは引き続き実行されます。永続フィールドのために: 次負荷コードは常に({} statemapを取り出さ):null値を返します

try { 
        stateMap = stateManager.getState(Scope.CLUSTER); 
        stateMapProperties = new HashMap<>(stateMap.toMap()); 

        logger.debug("Retrieved the statemap : "+stateMapProperties); 


        lastMaxLSN = (stateMapProperties 
          .get(ProcessorConstants.LAST_MAX_LSN) == null || stateMapProperties 
          .get(ProcessorConstants.LAST_MAX_LSN).isEmpty()) ? null 
          : stateMapProperties.get(
            ProcessorConstants.LAST_MAX_LSN).getBytes(); 


        logger.debug("Attempted to load the previous lsn from NiFi state : " 
          + lastMaxLSN); 
       } catch (IOException ioe) { 
        logger.error("Couldn't load the state map", ioe); 
        throw new ProcessException(ioe); 
       } 

ZKが故障しているか、国家地図を使用している間、私は何かを逃した場合、私は疑問に思って!

答えて

1

交換するためのドキュメントは言う:

は「場合に新しい値を基準としてコンポーネントの状態の値を更新した値は、現在与えられたOLDVALUEと同じである場合のみです。」

https://github.com/apache/nifi/blob/master/nifi-api/src/main/java/org/apache/nifi/components/state/StateManager.java#L79-L92

私はこのような何かを示唆している:あなたが状態を取得する際を通じて初めて、バージョンは何も今まで保存されませんでしたので、-1になり、その場合にすべき

if (stateMap.getVersion() == -1) { 
    stateManager.setState(stateMapProperties, Scope.CLUSTER); 
} else { 
    stateManager.replace(stateMap, stateMapProperties, Scope.CLUSTER); 
} 

をsetStateを使用しますが、その後は常にreplaceを使用できます。

+0

それは風のように働いた、しかし、私はまだ医者の背後に不思議です。 replace()メソッドの場合:| - 根拠は何でしょうか? –

+0

推論は分かりませんが、置換実装ではif(stateMap.getVersion()== -1L){ //状態が一度も設定されていないのでfalseを返します false false; } –

+0

StateManagerを使った後、replace()の背後にある考え方は、別のノードやタスクがその間に地図を更新したかどうかを制御することだと思います。この状況を考えてみましょう。あなたのタスクの1つが現在の状態を取得し、それを使って何かを行い、後でそれを更新します。このタスク1は、別のタスク(タスク2)によって更新された状態を保持していました。タスク1からの更新は、タスク2からの更新を無効にします。 – jboi

0

replace()の背後にあるアイデアと戻り値は、競合に反応できることです。同じノード上または別のノード上(クラスタ内)の別のタスクが、その間に状態を変更した可能性があります。 replace()がfalseを返すと、競合に対処し、並べ替え、自動的にソート可能なものをソートできないときにユーザーに通知できます。

これは私が使用するコードです:

/** 
* Set or replace key-value pair in status cluster wide. In case of a conflict, it will retry to set the state, when the given 
* key does not yet exist in the map. If the key exists and the value is equal to the given value, it does nothing. Otherwise 
* it fails and returns false. 
* 
* @param stateManager that controls state cluster wide. 
* @param key of key-value pair to be put in state map. 
* @param value of key-value pair to be put in state map. 
* @return true, if state map contains the key with a value equal to the given value, probably set by this function. 
*  False, if a conflict occurred and key-value pair is different. 
* @throws IOException if the underlying state mechanism throws exception. 
*/ 
private boolean setState(StateManager stateManager, String key, String value) throws IOException { 
     boolean somebodyElseUpdatedWithoutConflict = false; 
     do { 
      StateMap stateMap = stateManager.getState(Scope.CLUSTER); 

      // While the next two lines run, another thread might change the state. 
      Map<String,String> map = new HashMap<String, String>(stateMap.toMap()); // Make mutable 
      String oldValue = map.put(key, value); 

      if(!stateManager.replace(stateMap, map, Scope.CLUSTER)) { 
       // Conflict happened. Sort out action to take 
       if(oldValue == null) 
        somebodyElseUpdatedWithoutConflict = true; // Different key was changed. Retry 
       else if(oldValue.equals(value)) 
        break; // Lazy case. Value already set 
       else 
        return false; // Unsolvable conflict 
      } 
     } while(somebodyElseUpdatedWithoutConflict); 
     return true; 
} 

あなたは何が必要紛争解決と// Conflict happened...後に部品を交換することができます。

関連する問題