2017-03-01 9 views
0

私は以前のバージョンのFlinkを使用していました。私は1.2.0にアップグレードして、フィルタに関するいくつかの問題があります。ScalaのFlinkで簡単なフィルタを適用する方法

私は正常に動作ログのデータストリームを持っている:

val logs: DataStream[Log] = env.addSource(new LogSource(
     data, delay, factor)) 

// DISPLAY TUPLE IN CONSOLE 
logs.print() 

// EXECUTE SCRIPT 
env.execute("stream") 

私はもちろん示しドキュメント読んでいる:

dataStream.filter { _ != 0 } 

を私はこのようなものの束を試してみました:

val cleanLogs = logs.filter { _.isComplete } 

しかし、次のエラーが表示されます。

期待型の不一致、:filterFunctionの[ログイン]、実際:(任意)=>

だから私は、ドキュメントと、このエラーの間のリンクが表示されません。 助けてください?例?

おかげ

+0

isCompleteメソッドのシグニチャは何ですか。 –

+0

これはメソッドではなく、Logの最初の属性はboolean:isCompleteです。それはFlink 0.10で完璧に機能しましたが、それはもはや不可能かもしれませんか? – ImbaBalboa

+1

私はあなたの問題を本当に再現することはできません。私の心に来る唯一のものは、間違った輸入品です。 'DataStream'と' StreamExecutionEnvironment'のscalaバージョンをインポートしていることを確認してください。常にorg.apache.flink.streaming.api.scala._'をインポートすることはスケーラでは最善です –

答えて

0

問題がfilterのような基本的な機能でこの問題につながるStreamExecutionEnvironmentの間違ったインポートが最初でした。

その後、古いバージョンのFlinkを使用していたので、私はLocalExecutionEnvironmentクラスを使用していましたが、これはFlink 1.Xでは使用できなくなりました。

代わりに、StreamExecutionEnvironment.createLocalEnvironment(1)

関連する問題