Apache Flink(Scala API)を始めたばかりです: Flinkサイトの一例に基づいて、KafkaからApache Flinkにデータをストリームしようとしています:Apache FlinkでKafkaからデータを解析しています
val stream =
env.addSource(new FlinkKafkaConsumer09("testing", new SimpleStringSchema() , properties))
すべてが正常に動作し、stream.print()ステートメントは、画面に以下の表示されます。
2018年5月16日10時22分44秒AM | 1 | 11 | -71.16 | 40.27
私は、データをロードするためにケースクラスを使用したいと思います。私はusiを試しましたng
flatMap(p=>p.split("|"))
ですが、データを1文字ずつ分割するだけです。基本的には、予想される結果
は
field(0)=2018-05-16 10:22:44 AM
field(1)=1
field(2)=11
field(3)=-71.16
field(4)=40.27
を次のようにケースクラスの5つのフィールドを埋めることができるようにすることですが、それは今やっている:
field(0) = 2
field(1) = 0
field(3) = 1
field(4) = 8
等...
どれをアドバイスをいただければ幸いです。
は、問題がString.split
の使用である
フランク
「一度に1文字ずつデータを分割する」とはどういう意味ですか?おそらく、期待される出力と実際の出力の例を挙げることができます。 –
あなたのメッセージをありがとう、私は自分の投稿を編集しました。 –