2
まず、データセットAPIを使用して静的データを操作してから、DataStream APIを使用してストリーミングジョブを実行します。私がIDEにコードを書くと、それは完全に動作します。しかし、ローカルのFlinkジョブマネージャ(すべての並列処理1)を実行しようとすると、ストリーミングコードは決して実行されません!Flink:データセットとDatastream APIを1つのプログラムにまとめました。出来ますか?
たとえば、次のコードは動作していない:私はこのことが作業を取得しようとする必要があり
val parallelism = 1
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(parallelism)
val envStatic = ExecutionEnvironment.getExecutionEnvironment
envStatic.setParallelism(parallelism)
val myStaticData = envStatic.fromCollection(1 to 10)
val myVal: Int = myStaticData.reduce(_ + _).collect().head
val theStream = env.fromElements(1).iterate(iteretion => {
val result = iteretion.map(x => x + myVal)
(result, result)
})
theStream.print()
env.execute("static and streaming together")
何?
ログ:execution logs for above program
実行計画:plan 巡回しているようです。
ログは何を表していますか? –
@TillRohrmannリンクが追加されました。 –
クライアントログには何が表示されますか? –