2016-05-17 10 views
0

Apache Flink(Scala API)を始めたばかりです: Flinkサイトの一例に基づいて、KafkaからApache Flinkにデータをストリームしようとしています:Apache FlinkでKafkaからデータを解析しています

val stream = 
    env.addSource(new FlinkKafkaConsumer09("testing", new SimpleStringSchema() , properties)) 

すべてが正常に動作し、stream.print()ステートメントは、画面に以下の表示されます。

2018年5月16日10時22分44秒AM | 1 | 11 | -71.16 | 40.27

私は、データをロードするためにケースクラスを使用したいと思います。私はusiを試しましたng

flatMap(p=>p.split("|")) 

ですが、データを1文字ずつ分割するだけです。基本的には、予想される結果

field(0)=2018-05-16 10:22:44 AM 
    field(1)=1 
    field(2)=11 
    field(3)=-71.16 
    field(4)=40.27 

を次のようにケースクラスの5つのフィールドを埋めることができるようにすることですが、それは今やっている:

field(0) = 2 
    field(1) = 0 
    field(3) = 1 
    field(4) = 8 

等...

どれをアドバイスをいただければ幸いです。

は、問題がString.splitの使用である

フランク

+0

「一度に1文字ずつデータを分割する」とはどういう意味ですか?おそらく、期待される出力と実際の出力の例を挙げることができます。 –

+0

あなたのメッセージをありがとう、私は自分の投稿を編集しました。 –

答えて

2

事前にありがとうございます。 Stringで呼び出すと、このメソッドは正規表現であるとみなします。したがって、p.split("\\|")は入力データの正しい正規表現になります。また、splitバリアントには、分離文字p.split('|')を指定することもできます。どちらのソリューションでも、望ましい結果が得られるはずです。

+0

ありがとう、私は本当にあなたの助けを初心者に感謝します。 –

関連する問題