2016-08-04 12 views
1

同じプロパティを共有するイベントを検出したいと思います。私は、単純なケースのクラスがあるとします。Apache Flink + CEP - 同じイベントを検出する

Record("A", 1) 
Record("B", 2) 
Record("A", 3) 
Record("C", 4) 

その後、私は二重の「A」レコードを検出したいと思います:

case class Record(name: String, value: Int) 

は、以下の流れがあると。これは可能ですか?あなたはこれを試してみました、私はあなたがイベント検出が何であるかを定義していると思います

val start: Pattern[Record, _] = myStream 
.begin("first") 
.followedBy("second").where(previous_record.name == _.name) 
+0

を上に検出するにはあなたが格納する必要があると見なされる各プロパティについては、プロパティセットが無限になる可能性がある同じストアプロパティ、大きいストアスペースが必要です。プロパティセットが制限より大きい場合は、フィルタリングを適用できます。 – ravthiru

答えて

1

:私は今、これを持って

val start: Pattern[Record, _] = myStream 
    .begin("first").where(name == "A") 
    .followedBy("second").where(name == "A") 

更新:たとえば:

val patternIG: Pattern[(String,String), _] = Pattern.begin[String,String)]("start").where(_.name == "Ignition").where(_.ac == "ON").next("end").where(_.name == "Door").where(_.ac == "ON") 
val patternStream: PatternStream[(String,String)] = CEP.pattern(mystream, patternIG) 
def selectFn(pattern : mutable.Map[String,(String,String)]): String = { 
val startEvent = pattern.get("start").get 
val endEvent = pattern.get("end").get 
    "ALERT Door Open" 
} 
val patternStreamSelected = patternStream.select(selectFn(_)).print() 
+0

ほぼ。ここでの問題は、 "A"が前のレコードに定義されているため、変数が可変であることです。 –

関連する問題