2017-08-15 2 views
0

FLINKのドキュメントで述べたように、私は送信データストリームは、/ FLINKプログラムへのリモートソケットは、ホストOS上で実行されている

[email protected]:~$ nc -l 12345 

を使用して、ローカルソケットをオープンしてから受信することで、テキストサーバからのテキスト入力を読み取ることができました

DataStream<String> text = env.socketTextStream("localhost", 12345); 

text.print(); 

env.execute(); 

を使用してFLINKプログラムにしかし、私はいくつかのシナリオをシミュレートしていますので、私は(最終的には、その後、さまざまなVMの)VMからのデータのストリームを取得し、ホストOS上で実行されているCEPプログラムにそれを送りたいと。

だから、私はvagrant ssh

  1. を使用して、ゲストOSのホスト名が10.0 = ifconfigコマンドを使用してprecise64

  2. IPアドレスであることにベイグラントおよびSSHを使用して、VMをインストールしています.2.15

今、私がやりたいことは、VMからいくつかのデータを送ってFlinkプログラムで受信できるかどうかを確認することです。ローカル環境でも同じことができました。

私は

[email protected]:~$ nc -l 12345 

を使用してゲストOS上でNetcatをソケットを開いて、私はまた[email protected]を試してみました使用して、ホストプログラムにそれを受け取るしようとしましたが、エラー

DataStream<String> text = env.socketTextStream("precise64", 12345); 

text.print(); 

env.execute(); 

を得ました私はそれが間違っていると思う。

任意のアイデアは、どのように私は、事前に、FLINKプログラム

提案は大歓迎ですホストするVMからのデータストリームを送信するために感謝に近づく必要があります!

答えて

1

あなたはこの試みることができる:1.Program

を:

import org.apache.flink.streaming.api.scala._ 
import org.apache.flink.streaming.api.windowing.time.Time 

object WindowWordCount { 
    def main(args: Array[String]) { 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val text = env.socketTextStream("localhost", 9999) 

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } 
     .map { (_, 1) } 
     .keyBy(0) 
     .timeWindow(Time.seconds(5)) 
     .sum(1) 

    counts.print 

    env.execute("Window Stream WordCount") 
    } 
} 

2.Afterがprogram.You上記の本を開始することができました。

nc -lk 9999 

これは動作します。

関連する問題