2017-08-01 7 views
0

私はApache Flinkでscalaで簡単なスクリプトを実行しています。 Apache Kafkaプロデューサーからデータを読みました。これは私のコードです。マップ関数のKafka/Flink統合の問題

import java.util.Properties 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema 
import org.json4s._ 
import org.json4s.native.JsonMethods._ 
import org.json4s.native.Serialization 
import org.json4s.native.Serialization.{read, write} 

object App { 

    def main(args : Array[String]) { 

case class Sensor2(sensor_name: String, start_date: String, end_date: String, data: String, stt: Int) 

val properties = new Properties(); 
    properties.setProperty("bootstrap.servers", "localhost:9092"); 
    properties.setProperty("group.id", "test"); 

    val env = StreamExecutionEnvironment.getExecutionEnvironment() 
    val consumer1 = new FlinkKafkaConsumer010[String]("topics1", new SimpleStringSchema(), properties) 
    val stream1 = env 
    .addSource(consumer1) 
    .flatMap(raw => JsonMethods.parse(raw).toOption) 

    env.execute() 

} 

} 

私はflatMapの「欠落しているパラメータの型」のエラーが表示されます(私はマップやフィルタなど、他の機能を使用しようとすると、私は得る同じ誤りです)。 私はそれを解決するためにホを知らない。 助けてください?

LF

答えて

0

あなたはStreamExecutionEnvironmentのScalaのAPIのバージョンを使用する必要があります。

にインポートを変更し

:ありがとう

import java.util.Properties 
import org.apache.flink.streaming.api.scala._ 
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010 
import org.apache.flink.streaming.util.serialization.SimpleStringSchema 
import org.json4s._ 
import org.json4s.native.JsonMethods 
+0

。これにより、欠落しているパラメータの型エラーが解決されました。 –

+0

KafkaのDataStream [String]をDataStream [Sensor2]クラスに解析するために、コードを少し修正しなければなりませんでした。作業コードは次のとおり 'ヴァルconsumer1 =新しいFlinkKafkaConsumer010 [文字列]( "トピックス"、新SimpleStringSchema()、プロパティ) ヴァルSTREAM1 = ENV .addSource(consumer1) ヴァルS1 = stream1.map {X = > { 暗黙的なvalのフォーマット= DefaultFormats JsonMethods.parse(x).extract [センサー2] } } –

関連する問題