2016-06-15 4 views
0

2ノードクラスタ(ノードあたり8コアと16Gメモリ)でSpark 1.6.1を使用するCassandra 3.5を使用します。Sparkアプリケーションのパフォーマンスが(コアとメモリの)最大値に近いかどうかを判断するにはどうすればよいですか?

あり、次のカサンドラテーブル

CREATE TABLE schema.trade (
symbol text, 
date int, 
trade_time timestamp, 
reporting_venue text, 
trade_id bigint, 
ref_trade_id bigint, 
action_type text, 
price double, 
quantity int, 
condition_code text, 
PRIMARY KEY ((symbol, date), trade_time, trade_id) 
) WITH compaction = {'class' : 'DateTieredCompactionStrategy', 'base_time_seconds' : '3600', 'max_sstable_age_days' : '93'}; 

があると私はボリュームの割合を計算したい:交換や時間バーでgroupped期間中、関連するセキュリティにおける取引のすべてのボリュームの合計(1または5分)。私は例を作成しました :

void getPercentageOfVolume(String symbol, Integer date, Timestamp timeFrom, Timestamp timeTill, Integer barWidth) { 
    char MILLISECOND_TO_MINUTE_MULTIPLIER = 60_000; 
    LOG.info("start"); 
    JavaPairRDD<Tuple2, Integer> counts = javaFunctions(sparkContext).cassandraTable("schema", "trade") 
      .filter(row -> 
         row.getString("symbol").equals(symbol) && row.getInt("date").equals(date) && 
         row.getDateTime("trade_time").getMillis() >= timeFrom.getTime() && 
         row.getDateTime("trade_time").getMillis() < timeTill.getTime()) 
      .mapToPair(row -> 
       new Tuple2<>(
        new Tuple2(
          new Timestamp(
            (row.getDateTime("trade_time").getMillis()/(barWidth * MILLISECOND_TO_MINUTE_MULTIPLIER)) * barWidth * MILLISECOND_TO_MINUTE_MULTIPLIER 
          ), 
          row.getString("reporting_venue")), 
        row.getInt("quantity") 
       ) 
      ).reduceByKey((a, b) -> a + b); 
    LOG.info(counts.collect().toString()); 
    LOG.info("finish"); 
} 

は[2016年6月15日09:25:27.014] [情報] [メイン] [EquityTCAAnalytics]スタート
[2016年6月15日9時25分:28.000] [メイン] [NettyUtil]それを使用してクラスパスでNettyのネイティブepollトランスポートを発見
[2016-06-15 09:25:28.518] [メイン] [クラスタ]新しいカサンドラホスト/ [INFO] [main] [LocalNodeFirstLoadBalancingPolicy]ホストノード1(datacenter1)が追加されました
[2016-06-15 09:25:28.519] [INFO]ノード1:9042追加
[2016-06-15 09:25:28.519] [メイン] [Clu新しいカサンドラホスト/ノード2:9042が追加されました
[2016-06-15 09:25:28.520]カサンドラクラスターに接続されたカサンドラクラスターに接続しました:Cassandra
[2016-06-15 09:25] :29.115] [情報] [メイン] [SparkContext]ジョブを開始:EquityTCAAnalytics.java:88で収集
[2016-06-15 09:25:29.385] [情報] [ダグスケジューライベントループ] [DAGScheduler ] RAG 2の登録(EquityTCAAnalytics.java:78のmapToPair)
[2016-06-15 09:25:29.388] [情報] [ダグスケジューライベントループ] [DAGScheduler]ジョブ0(EquityTCAAnalyticsで収集)。 [INFO] [dag-scheduler-event-loop] [DAGScheduler]最終ステージ:ResultStage 1(EquityTCAAnalytics.java:88で収集します。 )[DAGスケジューライベントループ] [DAGScheduler]最終ステージの親:リスト(ShuffleMapStage 0)
[2016-06-15 09:25] :29.391] [情報] [ダグスケジューライベントループ] [DAGScheduler]親なし:リスト(ShuffleMapStage 0)
[2016-06-15 09:25:29.400] [情報] [ダグスケジューライベント - ループ] [DAGScheduler]不足している親を持たないShuffleMapStage 0(EquityTCAAnalytics.java:78のmapToPairでのMapPartitionsRDD [2])。
[2016-06-15 09:25:29.594] [INFO] [dag-scheduler-イベントループ] [MemoryStore]メモリに値として格納されたブロードキャストをブロックする(推定サイズ10.8 KB、フリー10.8 KB)
[2016-06-15 09:25:29.642] [INFO] [dag-scheduler-event-loop] [MemoryStore]ブロックbroadcast_0_piece0ストア[INFO] [INFO] [dispatcher-event-loop-7] [BlockManagerInfo] node2のメモリにbroadcast_0_piece0を追加しました。(メモリサイズは5.4KB、空き容量は16.3KB)
[2016-06-15 09:25:29.647] [INFO] [dag-scheduler-event-loop] [SparkContext] DAGSchedulerでブロードキャストからブロードキャスト0を作成しました。 [DAGスケジューラーイベントループ] [DAGScheduler] ShuffleMapStage 0(MapPartitionsRDD [2]:EquityTCAAnalytics.javaのMapPartitionsRDD [2])から5つの欠落したタスクを送信しています。 :78)
[INFO] [ダグスケジューライベントループ] [TaskSchedulerImpl]タスクセット0.0を5つのタスクに追加する
[2016-06-15 09:25] :30。006] [INFO] [ディスパッチャイベントループ7] [SparkDeploySchedulerBackend] ID 0の登録済み実行者NettyRpcEndpointRef(null)(ノード1:41122)
[2016-06-15 09:25:30.040] [INFO] [ディスパッチャ-event-loop-7] [TaskSetManager]ステージ0.0でタスク0.0を開始する(TID 0、ノード1、パーティション0、NODE_LOCAL、11725バイト)
[2016-06-15 09:25:30.051] [INFO] [dispatcher-イベントループ7] [TaskSetManager]ステージ0.0(タスク1、ノード1、パーティション1、NODE_LOCAL、11317バイト)でタスク1.0を開始
[2016-06-15 09:25:30.054] [INFO] [dispatcher-event [INFO] [INFO] [dispatcher-event-] [TaskSetManager]タスク0.0をステージ0.0(TID 2、ノード1、パーティション2、NODE_LOCAL、11929バイト)から起動します。
[2016-06-15 09:25:30.057]ループ-7] [TaskSetManager]ステージ0.0でタスク3.0を開始(TID 3、ノード1、パーティション3、NO [INFO] [dispatcher-event-loop-7] [TaskSetManager]ステージ0.0でタスク4.0を開始しています(TID 4、ノード1、パーティション4、NODE_LOCAL 、11560 bytes)[INFO] [dispatcher-event-loop-7] [SparkDeploySchedulerBackend]登録されたエグゼキュータNettyRpcEndpointRef(null)(CassandraCH4.ehubprod.local:33668)ID付き
[2016-06-15 09:25:30.077] [BlockManagerMasterEndpoint]ブロックマネージャnode1を登録すると、BlockManagerId(0、node1、36512)のRAMが511.1 MBの36512になります。 [INFO] [dispatcher-event-loop-3] [BlockManagerMasterEndpoint]ブロックマネージャCassandraCH4.ehubprod.localを登録しています:33610、511.1 MB RAM、BlockManagerId(1、CassandraCH4.ehubprod 。地元、33610)
[INFO] [dispatcher-event-loop-2] [BlockManagerInfo] node1のメモリにbroadcast_0_piece0を追加:36512(サイズ:5.4 KB、空き容量:511.1 MB [CassandraConnector] [CassandraConnector] Cassandraクラスターから切断されました:Cassandra
[2016-06-15 09:25:48.914]
[2016-06-15 09:25:36.764] [INFO] [task-result-getter-0] [TaskSetManager] node1の18854 msでステージ0.0(TID 4)で終了したタスク4.0(1/5)
[2016-06-15 09:25:55.541]情報] [タスク - 結果 - ゲッター-1] [TaskSetManager]ノー​​ド0.05(2/5)で25489ミリ秒でステージ0.0(TID 2)のタスク2.0を完了
[2016-06-15 09:25:57.837] [情報] [task-result-getter-2] [TaskSetManager] 27.795 msでステージ0.0の完了済みタスク1.0(TID 1) [情報] [タスク - 結果 - ゲッター - 3] [TaskSetManager] 27.09.0秒でステージ0.0(TID 0)で完了したタスク0.0(0/3)
[2016-06-15 09:25:57.931] [INFO] [task-result-getter-0] [TaskSetManager] 31.02(TID 3)のタスク3.0をノード1で31302ミリ秒で終了しました。 (5/5)
[INFO] [ダグスケジューライベントループ] [DAGScheduler] ShuffleMapStage 0(EquityTCAAnalytics.java:78のmapToPair)が31.602で終了しました
[INFO] [ダグスケジューライベントループ] [DAGScheduler]新しく実行可能なステージを検索する
[2016-06-15 09:26:01.360] [INFO] [dag-scheduler-event-loop] [DAGScheduler]実行中:Set()
[task-result-getter-0] [TaskSchedulerImpl]プールからタスクがすべて完了したTaskSet 0.0を削除しました
[2016-06-15 09:06:06:06: 26:01.362] [情報] [ダグスケジューライベントループ] [DAGScheduler]待機:(結果ステージ1)を設定します。
[2016-06-15 09:26:01.362] [DAGScheduler] [DAGScheduler] [(DAGScheduler)] [(DAGScheduler)]を[reduceByKeyでShuffledRDD [3]を送信しています。 [info] [dag-scheduler-event-loop] [MemoryStore]メモリに値として格納されたブロックをブロックします(推定値)。サイズ3.6 KB、無料199キロバイト)
[2016年6月15日09:26:01.382] [INFO] [DAG-スケジューライベントループ] [MemoryStore]ブロックは、メモリ内のバイト(推定サイズ2.1キロバイト、フリー21.9キロバイト)
として格納broadcast_1_piece[INFO] [dispatcher-event-loop-1] [BlockManagerInfo] node2のメモリにbroadcast_1_piece0を追加しました。サイズ:2.1 KB、空き容量:2.4 GB
[ [DAG-scheduler-event-loop] [SparkContext] DAGScheduler.scalaでブロードキャストからブロードキャスト1を作成しました。1006
[2016-06-15 09:26:01.384] [2016-06-15 09:26:01.384] ] [INFO] [DAG-スケジューライベントループ] [DAGScheduler] ResultStage 1から5つの欠落タスクを提出(ShuffledRDD [3] EquityTCAAnalytics.java:87でreduceByKeyで)
[2016年6月15日09:26:01.386 ] [情報] [ダグschedu [INFO] [INFO] [dispatcher-event-loop-4] [TaskSetManager]ステージでタスク0.0を開始しています。 [INFO] [dispatcher-event-loop-4] [TaskSetManager] 1.0のステージ1.0でタスク1.0を開始しています(TID 5、ノード1、ノード0、NODE_LOCAL、2786バイト)
[2016-06-15 09:26:01.390] [INFO] [dispatcher-event-loop-4] [TaskSetManager] 1.0のステージでタスク2.0を開始する(TID 6、ノード1、NODE_LOCAL、2786バイト)
[2016-06-15 09:26:01.397] [INFO] [dispatcher-event-loop-4] [TaskSetManager]ステージ1.0でタスク3.0を開始しています(TID(TID 7)、ノード1、パーティション2、NODE_LOCAL、2786バイト)
[2016-06-15 09:26:01.398] 8、ノード1、パーティション3、NODE_LOCAL、2786バイト)
[TABIT 9、ノード1、パーティション4、NODE_LOCAL、2786バイト]
[タスクセットマネージャ] [1.0] [INFO] [dispatcher-event-loop-4] [BlockManagerInfo] node1のメモリにbroadcast_1_piece0を追加:36512(サイズ:2.1 KB、空き:511.1 MB)
[2016 -06-15 09:26:01.452] [INFO] [ディスパッチャ・イベント・ループ・6] [MapOutputTrackerMasterEndpoint]ノード1にシャッフル0地図上の出力位置を送信する質問:41122
[2016年6月15日9時26分: 01.456] [INFO] [ディスパッチャイベントループ-6]シャッフル0の出力状態の[MapOutputTrackerMaster]サイズが161バイト
[2016年6月15日09である:26:01.526] [INFO] [タスク結果ゲッター-1] [TaskSetManager]ノー​​ド1.0の128ミリ秒でステージ1.0の終了タスク4.0(TID 9)(1タスク1:タスク1:タスク1:タスク1:タスク1:タスク1:タスク1:タスク1:タスク2:タスク1:タスク2: [情報] [task-result-getter-2] [TaskSetManager]ノー​​ド1.01の193 msでステージ1.0(TID 5)で完了したタスク0.0(3/5 [1]タスク1.0の終了タスク1.0(TID 6)、ノード1の199ミリ秒(4/5)で完了1.0。
[2016年6月15日09:26:01.599]ステージ1.0における[INFO] [タスク結果ゲッター-2] [TaskSetManager]終了タスク3.0 node1で200ミリ秒(TID 8)(5/5)
[2016-06-15 09:26:01.599] [INFO] [task-result-getter-2] [TaskSchedulerImpl]プールからタスクがすべて完了したTaskSet 1.0を削除しました。
[2016-06-15 09:26:01.599] [DAGスケジューライベントループ] [DAGScheduler] ResultStage 1(EquityTCAAnalytics.java:88で収集)は0.202秒で終了しました
[2016-06 -15 09:26:01.612] [メイン] [DAGScheduler]ジョブ0が終了:EquityTCAAnalytics.java:88で収集、は32.496470秒を取った
[2016-06-15 09:26:01.634] ((2016-06-10 14:25:00.0、B)、5241)、((2016-06-10 13:45:00.0 DA)、6944) (2016-06-10 14:55:00.0、A)、1300)]
[2016-06-15 09:26](2016-06-10 10:55:00.0、QD)、109080) 01。641] [情報] [メイン] [EquityTCAAalyaly]終了

は32.5ですか?

+1

正常であることは、達成しようとしているタスクおよび処理しているデータの量に比例します。また、ネットワークIOにボトルネックが発生する可能性があります。実際にあなたの質問は、与えられた情報では答えられません。 – eliasah

答えて

0

私は、CPUの%%、またはメモリ使用量が出発点になると言います。コアが十分活用されていないと、プロセスが十分に並列していない可能性があります。メモリサイズのロットはデータに依存しますが、通常はIOに戻るよりも多くのRAMを使用します。

関連する問題