2017-03-28 4 views
0

私はイベントのストリームから潜在的に非常に大規模な状態を予測したいです。これは私が不可欠な方法でこれを実装する方法を次のとおりです。BarStateのサイズは回数で成長する最悪の場合にはアパッチFLINK - 潜在的に非常に大きな状態でストリームプロセッサを実装

class ImperativeFooProcessor { 

    val state: mutable.Map[UUID, BarState] = mutable.HashMap.empty[UUID, BarState] 

    def handle(event: InputEvent) = { 
    event match { 
     case FooAdded(fooId, barId) => { 
     // retrieve relevant state and do some work on it 
     val barState = state(barId) 

     // let the world know about what may have happened 
     publish(BarOccured(fooId, barId)) 
     // or maybe rather 
     publish(BazOccured(fooId, barId)) 
     } 
     case FooRemoved(fooId, barId) => { 
     // retrieve relevant state and do some work on it 
     val barState = state(barId) 

     // let the world know about what may have happened 
     publish(BarOccured(fooId, barId)) 
     // or maybe rather 
     publish(BazOccured(fooId, barId)) 
     } 
    } 
    } 

    private def publish(event: OutputEvent): Unit = { 
    // push event to downstream sink 
    } 
} 

FooAdded

で言及されて、そのユニークなbarIdの数が非常にあります各棒のイベントの合計数に対して相対的に小さい。

私はこの処理構造をFlinkでどのように表現し始めますか?

各BarStateが潜在的に非常に大きくなる可能性があるという事実をどうやって解決するのですか?

答えて

1

Flinkは、いわゆる状態バックエンドで状態を維持します。ワーカープロセスのJVMヒープ上で動作する状態バックエンド(MemoryStateBackendおよびFsStateBackend)があります。これらのバックエンドは大きな状態を処理するのには適していません。

FLINKもRocksDBに基づいてRocksDBStateBackendを備えています。 RocksDBは、各ワーカーノードでローカルデータベース(外部サービスとして設定する必要はありません)として使用され、状態データをディスクに書き込みます。したがって、メモリを超過する非常に大きな状態を処理することができます。

FLINKは、特定の属性にパーティション化されたストリームであるKeyedStreamを提供しています。あなたのケースでは、同じIDへのすべてのアクセスが同じ状態のインスタンスになるようにするため、barIdをキーとして使用します。次に、状態はbarIdに基づいてすべての並列ワーカースレッドに分割されます。これは基本的に分散キー値ストアまたはマップです。したがって、Flinkによって自動的に配信されるため、状態を地図として表現する必要はありません。

関連する問題