2016-11-04 12 views
1

ここに表示されているように、スパークはレコード数をどのように計算しているのですか(バッチ内のイベント数と同じだと思いますか?私はリモートでこの値を取得する方法を把握しようとしているSpark Streamingで処理されたレコードの総数はどのように取得するのですか?

enter image description here

(REST-APIは、UIでのストリーミングオプションのために存在していません)。

基本的には、アプリケーションで処理されるレコードの総数を取得するために何をしようとしていますか。 Webポータルにはこの情報が必要です。

私は各段階でRecordsを数えようとしましたが、上の写真のように完全に異なる番号が付けられました。各ステージにはレコードに関する情報が含まれています。ここ

enter image description here

示されているように私は、各段階から、「inputRecords」をカウントするために、この短いPythonスクリプトを使用しています。これらStagesは、複数のTasksを持って、各Batchは1 Jobがあり、各Jobは、複数のStagesを持っている:私はそれを正しく理解している場合

import json, requests, urllib 
print "Get stages script started!" 
#URL REST-API 
url = 'http://10.16.31.211:4040/api/v1/applications/app-20161104125052-0052/stages/' 
response = urllib.urlopen(url) 
data = json.loads(response.read()) 

stages = [] 
print len(data) 
inputCounter = 0 
for item in data: 
     stages.append(item["stageId"]) 
     inputCounter += item["inputRecords"] 
print "Records processed: " + str(inputCounter) 

:これは、ソースコードです。

私にとっては、それぞれStageの入力を数えるのが理にかなっていました。

+0

何を試しましたか?いくつかのサンプルデータとコードを投稿して、あなたのために働いていないものを教えてください。また、このリンクをチェックしてください:stackoverflow.com/help/mcve。 – CGritton

+0

おそらく、「スパークストリーミングで処理されたレコードの総数を取得するには」 – maasg

答えて

2

スパークは、ドライバにメトリクスエンドポイントを提供しています:

<driver-host>:<ui-port>/metrics/json 

スパークストリーミング・アプリケーションは、いくつかのより多くのUIとで利用可能なすべてのメトリックを報告します。潜在的に探しているものは:

<driver-id>.driver.<job-id>.StreamingMetrics.streaming.totalProcessedRecords: { 
value: 48574640 
}, 
<driver-id>.driver.<job-id>.StreamingMetrics.streaming.totalReceivedRecords: { 
value: 48574640 
} 

このエンドポイントはカスタマイズできます。情報については、Spark Metricsを参照してください。

+1

ありがとうございます。これは本当に動作します、この/ metrics/jsonオプションは私に隠されていました。 –

+0

@SeverinSimkoはよく知られている機能ではありません。ところで、あなたの質問に答えたと考えるなら、それを受け入れることを忘れないでください。また、将来の訪問者も利益を得ることができるようにタイトルを変更することを検討してください...そして、ようこそ! – maasg

関連する問題