2017-12-05 4 views
0

私はApacheのFLINK(FLINK 1.3)でdeltatriggerを使用したいが、私はこのコードのいくつかの問題があります。flinkでdelta triggerを使用するには?

.trigger(DeltaTrigger.of(100, new DeltaFunction[uniqStruct] { 
    override def getDelta(oldFp: uniqStruct, newFp: uniqStruct): Double = newFp.time - oldFp.time 
    }, TypeInformation[uniqStruct])) 

をそして、私はこのエラーを持っている:

error: object org.apache.flink.api.common.typeinfo.TypeInformation is not a value [ERROR] }, TypeInformation[uniqStruct])) 

私は理由を理解していませんDeltaTriggerにはTypeSerializer[T] が必要で、このエラーを取り除くために何をすべきかわかりません。

ありがとうございます。

答えて

0

https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.htmlあなたのタイプ情報にtypeInfo.createSerializer(config)を使用してシリアライザを作成することができます。あなたが現在渡しているものは、型自体であり、型情報ではないことに注意してください。これは、あなたがエラーを受け取っている理由です。それはFLINKのを使用していますので、あなたがシリアライザ

The config parameter is of type ExecutionConfig and holds the information about the program’s registered custom serializers. Where ever possibly, try to pass the programs proper ExecutionConfig. You can usually obtain it from DataStream or DataSet via calling getExecutionConfig(). Inside functions (like MapFunction), you can get it by making the function a Rich Function and calling getRuntimeContext().getExecutionConfig().

+0

getExecutionConfigは、flink 1.3とRichFunctionで廃止されました。このエラーがあります: 'Can not resolve symbol getRuntimeContext'。 または、nミリ秒ごとにトリガーを取得する別の方法はありませんか? – FlinkNoob

0

DeltaTriggerを作成するために渡す必要がありますが、設定のparamに関する上記のページを引用すると、より

val uniqStructTypeInfo: TypeInformation[uniqStruct] = createTypeInformation[uniqStruct] 
val uniqStrictTypeSerializer = typeInfo.createSerializer(config) 

ような何かをする必要があるだろう

TypeSerializerを必要とします後で次の要素と比較するために各要素を格納する管理された状態のメカニズム(新しい要素が到着したときに更新される1つの要素、最後の要素のみを保持する)。

例(Javaの場合)hereがあります。

を1時間の窓を持ってするには、次のように

更新
input 
    .keyBy(<key selector>) 
    .timeWindow(Time.milliseconds(100))) 
    .apply(<window function>) 

として

しかし、あなたが必要とするすべては、すべて100ミリ秒をトリガーするウィンドウである場合、ちょうどTimeWindowを使用する方が簡単だろう、 100ミリ秒ごとにトリガすると、スライディングウィンドウを使用できます。ただし、10 * 60 * 60のウィンドウがあり、各イベントはこれらの36,000のウィンドウに配置されます。それは素晴らしい考えではありません。

DeltaTriggerGlobalWindowを使用すると、イベントは100msec以上離れている場合にのみ表示されます。

ProcessFunctionをご覧ください。あなたがそのようにしたいものを手に入れるのは簡単です。

+0

100ミリ秒ごとに1時間からすべてのデータを取得したいと思いますが、デルタトリガーは必要ありませんか? – FlinkNoob

関連する問題