2017-06-30 17 views
2

以下のコードはソケットから読み込みますが、ジョブへの入力はありません。私はnc -l 1111を実行していますが、データをダンプしていますが、私のスパークジョブがデータを10.176.110.112:1111から読み取ることができない理由はわかりません。コードの下ソケットからストリーミングデータセットを読み取る方法は?

Dataset<Row> d = sparkSession.readStream().format("socket") 
            .option("host", "10.176.110.112") 
            .option("port", 1111).load(); 

答えて

2

ソケットから読み込むが、私は、任意の入力が仕事に行くのは表示されません。

まあ、正直、あなたははないは、どこからでも何かを読んでください。ストリーミングパイプラインを開始したときにに行ったことはにしか説明していません。

構造化ストリーミングを使用してソケットからデータセットを読み込むので、start演算子を使用してデータの取得をトリガする必要があります。

スタート():StreamingQueryは、ストリーミングクエリの実行を開始した指定されたパスに、継続的に出力結果をする新しいデータが到着します。返されたStreamingQueryオブジェクトを使用してストリームと対話できます。

startの前に、データをストリームする場所を定義する必要があります。それは、カフカ、ファイル、カスタムストリーミングシンク(おそらくforeachオペレータを使用して)またはコンソールかもしれません。

次の例では、consoleシンク(別名フォーマット)を使用しています。私はScalaを使って、あなたの家庭のエクササイズとしてJavaに書き直しておきます。

d.writeStream. // <-- this is the most important part 
    trigger(Trigger.ProcessingTime("10 seconds")). 
    format("console"). 
    option("truncate", false). 
    start   // <-- and this 
関連する問題