2017-01-30 9 views
3

私はフリンクとストリーミングにも新しいです。ストリームの各ウィンドウにパーティションごとに特定の機能を適用したい(イベント時間が使用される)。私がこれまで行ってきたことはこれです:Flinkストリーミング - ウィンドウに機能を適用する

val env = StreamExecutionEnvironment.getExecutionEnvironment 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

val inputStream = env.readTextFile("dataset.txt") 
     .map(transformStream(_)) 
     .assignAscendingTimestamps(_.eventTime) 
     .keyBy(_.id) 
     .timeWindow(Time.seconds(windowSize),Time.seconds(slidingStep)) 

def transformStream(input: String): EventStream = {...} 

case class EventStream(val eventTime: Long, val id: String, actualEvent: String) 

私が何をしたいのか、複雑な処理アルゴリズムまたは類似のものを適用するかもしれない、窓のバッチあたり、各パーティションに一般的な関数を適用することです。私はこのメソッドがDataStream APIに適用されているのを見ましたが、どのように動作するのか分かりませんでした。 FLINKのAPIではそれはScalaではそのように使用されていると言う:

inputStream.apply { WindowFunction } 

誰かがまたはそれがどのように使用されない方法を適用するかを説明することはできますか? Scalaの例は好ましいことです。適用メソッドは私が望むことをしますか?

答えて

5

基本的に、実行したい計算のタイプに基づいて、2つの可能な方向があります。どちらかを使用してください:fold/reduce/aggregateまたはそれ以上の一般的な1つ、既に述べた - apply。それらのすべては、キーのウィンドウに適用されます。

applyは、計算を適用するための非常に一般的な方法です。 (スカラで)最も基本的なバージョンは、次のようになります。

def apply[R: TypeInformation](function: (K, W, Iterable[T],Collector[R]) => Unit): DataStream[R] 

関数は4つのパラメータ取ります

  • をウィンドウのキー(あなたはkeyedStreamに取り組んでいる覚えている)
  • ウィンドウ(あなたが抽出できますe。グラム開始またはそれからウィンドウの終わり)この特定のウィンドウに割り当てられた
  • 要素を、キー

一つは、このかかわらず、覚えておく必要がありますあなたは、処理の結果を発する必要があるためにコレクタウィンドウが放出されるまで、すべての要素を状態に保つ必要があります。より良いメモリパフォーマンスのソリューションは、上記の関数を実行する前にいくつかの計算を実行するpreAgreggatorを持つバージョンを使用することです。セッション・ウィンドウでのキーのappearencesをカウント

val stream: DataStream[(String,Int)] = ... 

stream.keyBy(_._1) 
     .window(EventTimeSessionWindows.withGap(Time.seconds(conf.sessionGap()))) 
     .apply((e1, e2) => (e1._1, e1._2 + e2._2), 
      (key, window, in, out: Collector[(String, Long, Long, Int)]) => { 
       out.collect((key, window.getStart, window.getEnd, in.map(_._2).sum)) 
     }) 

ここでは、事前集計と短いスニペットを見ることができます。

基本的には、ウィンドウのメタ情報が必要ない場合は、fold \ reduce \ aggregateに十分であれば固執します。何らかの事前集会での適用を検討する以上に、最も一般的でないものを見てみましょうapply

詳細については、hereをご覧ください。

0

あなたがステートフルウィンドウのデータval inputStreamにmap/flatmap/keyBy関数呼び出しを適用して、データを変更することができます。あなたがしたい場合

val inputStreamChanged = inputStream .map(a => DoSthWithYourStream.Change2ColumnsIntoOne(a.change1st, a.change2nd), a) .flatMap(new DoSthWithYourStream())

Examples extending Java Classed and applying Scala classes into the stream using map/flapmap/key etc

:あなたがあなたの方法と入力データの制約を定義し

class DoSthWithYourStream {...}

を作成していたのであれば、あなたは別の値を作成することができますCEPを使用するには、最良の選択肢は、CEP pattern API

val pattern = Pattern.begin("start").where(_.getId == 42) .next("middle").subtype(classOf[SubEvent]).where(_.getVolume >= 10.0) .followedBy("end").where(_.getName == "end")

val patternStream = CEP.pattern(inputStream, pattern) val result: DataStream[Alert] = patternStream.select(createAlert(_))

+0

私はパーティション全体を操作し、map/flatMap関数呼び出しはDataStreamの各要素に変換を適用したいということです。 –

0

それはScalaの魔法のビットを必要と判明しました。私がこれまで行ってきたことはこれです:

val test: DataStream[Long] = inputStream.apply(processPartition(_,_,_,_)) 

    def processPartition(key: String, window: TimeWindow, 
         batch: Iterable[EventStream], 
         out: Collector[Long]): Unit = {..} 

私の実験からprocessPartition方法は、「パーティション化キー」(バッチは、同じキーを持つ唯一の要素が含まれます)であるバッチ全体に機能を適用します。私はこのメソッドのパラメータをJava APIから取り出しました。適用関数とそれがどのように動作するかについて誰かが詳しく説明することができれば便利です。

関連する問題