0
私はApache Flinkでscalaで簡単なスクリプトを実行しています。 Apache Kafkaプロデューサーからデータを読みました。これは私のコードです。マップ関数のKafka/Flink統合の問題
import java.util.Properties
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.json4s._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.json4s.native.Serialization.{read, write}
object App {
def main(args : Array[String]) {
case class Sensor2(sensor_name: String, start_date: String, end_date: String, data: String, stt: Int)
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val consumer1 = new FlinkKafkaConsumer010[String]("topics1", new SimpleStringSchema(), properties)
val stream1 = env
.addSource(consumer1)
.flatMap(raw => JsonMethods.parse(raw).toOption)
env.execute()
}
}
私はflatMapの「欠落しているパラメータの型」のエラーが表示されます(私はマップやフィルタなど、他の機能を使用しようとすると、私は得る同じ誤りです)。 私はそれを解決するためにホを知らない。 助けてください?
LF
。これにより、欠落しているパラメータの型エラーが解決されました。 –
KafkaのDataStream [String]をDataStream [Sensor2]クラスに解析するために、コードを少し修正しなければなりませんでした。作業コードは次のとおり 'ヴァルconsumer1 =新しいFlinkKafkaConsumer010 [文字列]( "トピックス"、新SimpleStringSchema()、プロパティ) ヴァルSTREAM1 = ENV .addSource(consumer1) ヴァルS1 = stream1.map {X = > { 暗黙的なvalのフォーマット= DefaultFormats JsonMethods.parse(x).extract [センサー2] } } –