0

私は、Spark Streamingにデータを取り込むためにtextFileStreamを使用しています。しかし、データは1つのバッチに対してのみ処理されています。私の最初の質問は、ファイルから各レコードをストリーミングしないのですか?私は100Kデータレコードをcontanining 1ギガバイトのファイルをストリーミングする場合textFileStreamの混乱

https://docs.databricks.com/spark/latest/rdd-streaming/debugging-streaming-applications.htmlFor TextFileStream, since files are input, the # of input events is always 0. In such cases, you can look at the “Completed Batches” section in the notebook to figure out how to find more information.

によると第二に、私はスパークEngine.For例により処理されているレコードの数を知っていただきたいと思い、私はどのように知っていただきたいと思います多くのSpark Streamingがそれを実行しました。

誰でも自分の考えや役に立つリンクを教えてください。どんな助けもありがとう。

ありがとうございました。

スパークバージョン:2.0.1 データ摂取をAmazon S3からtextFileStream

経由で

答えて

0

これを参照するには直接的な方法はありません、しかし、あなたは、この使用してカスタムコードを実装することができます。例えば

、あなたが

dStream.forEachRDD{rdd => rdd.forEachPartition{part => { }} } 

を使用textFileStreamによって生成DSTREAMを処理するときには、単にアキュムレータまたはのいずれかで処理されたレコードのないを更新するrdd.forEachPartition {}ブロック内でいくつかのコードを追加することができカフカのトピックにその情報を追加したり、飼育係にその情報を追加したり、mysqlデータベース:)

dStream.forEachRDD{rdd => rdd.forEachPartition{part => { 
      var recordProcessed:Int = 0; ... 
      part.foreach{...;recordProcessed+=1} 
      //update recordProcessed in kafka/HBase/Mysql/Zookepeer 
    }} } 

さらに可視化ツールを使用してレコードプロセスの一切を視覚化するために使用することはできません更新します。