ソケットから読み込むが、私は、任意の入力が仕事に行くのは表示されません。
まあ、正直、あなたははないは、どこからでも何かを読んでください。ストリーミングパイプラインを開始したときにに行ったことはにしか説明していません。
構造化ストリーミングを使用してソケットからデータセットを読み込むので、start演算子を使用してデータの取得をトリガする必要があります。
スタート():StreamingQueryは、ストリーミングクエリの実行を開始した指定されたパスに、継続的に出力結果をする新しいデータが到着します。返されたStreamingQueryオブジェクトを使用してストリームと対話できます。
start
の前に、データをストリームする場所を定義する必要があります。それは、カフカ、ファイル、カスタムストリーミングシンク(おそらくforeach
オペレータを使用して)またはコンソールかもしれません。
次の例では、console
シンク(別名フォーマット)を使用しています。私はScalaを使って、あなたの家庭のエクササイズとしてJavaに書き直しておきます。
d.writeStream. // <-- this is the most important part
trigger(Trigger.ProcessingTime("10 seconds")).
format("console").
option("truncate", false).
start // <-- and this