Flink:Monitoring the Wikipedia Edit Streamのクイックスタートの例に従っています。Flink:廃止予定の折り畳みを集計する方法
例はJavaであり、そして私は次のように、Scalaでそれを実装しています。しかし
/**
* Wikipedia Edit Monitoring
*/
object WikipediaEditMonitoring {
def main(args: Array[String]) {
// set up the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)
val result = edits.keyBy(_.getUser)
.timeWindow(Time.seconds(5))
.fold(("", 0L)) {
(acc: (String, Long), event: WikipediaEditEvent) => {
(event.getUser, acc._2 + event.getByteDiff)
}
}
result.print
// execute program
env.execute("Wikipedia Edit Monitoring")
}
}
、FLINKでfold
機能ががを非推奨すでにあり、かつaggregate
機能が推奨されます。
しかし、私はaggregrate
に廃止予定fold
を変換する方法についての例やチュートリアルを見つけることができませんでした。
これを行うにはどのように任意のアイデア?おそらくaggregrate
を適用するだけではありません。
UPDATE
私は、次のように別の実装があります。
/**
* Wikipedia Edit Monitoring
*/
object WikipediaEditMonitoring {
def main(args: Array[String]) {
// set up the execution environment
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val edits: DataStream[WikipediaEditEvent] = env.addSource(new WikipediaEditsSource)
val result = edits
.map(e => UserWithEdits(e.getUser, e.getByteDiff))
.keyBy("user")
.timeWindow(Time.seconds(5))
.sum("edits")
result.print
// execute program
env.execute("Wikipedia Edit Monitoring")
}
/** Data type for words with count */
case class UserWithEdits(user: String, edits: Long)
}
を私はまた、自己定義AggregateFunction
を使用して実装を持ってする方法を知っていただきたいと思います。
UPDATE
私は、このマニュアルに従っ:AggregateFunctionが、次の質問があります。
:リリース1.3のためのインタフェースAggregateFunction
のソースコードでは
を、あなたはadd
が実際void
を返す表示されます
void add(IN value, ACC accumulator);
バージョン1.4の場合、AggregateFunction
は返されます:
ACC add(IN value, ACC accumulator);
これはどのように処理すればよいですか?
私が使用しているFlinkのバージョンは1.3.2
であり、このバージョンのドキュメントにはAggregateFunction
が含まれていませんが、アーティファクトのリリース1.4はまだありません。
新しい更新をご覧ください – fluency03