2ノードクラスタ(ノードあたり8コアと16Gメモリ)でSpark 1.6.1を使用するCassandra 3.5を使用します。Sparkアプリケーションのパフォーマンスが(コアとメモリの)最大値に近いかどうかを判断するにはどうすればよいですか?
あり、次のカサンドラテーブル
CREATE TABLE schema.trade (
symbol text,
date int,
trade_time timestamp,
reporting_venue text,
trade_id bigint,
ref_trade_id bigint,
action_type text,
price double,
quantity int,
condition_code text,
PRIMARY KEY ((symbol, date), trade_time, trade_id)
) WITH compaction = {'class' : 'DateTieredCompactionStrategy', 'base_time_seconds' : '3600', 'max_sstable_age_days' : '93'};
があると私はボリュームの割合を計算したい:交換や時間バーでgroupped期間中、関連するセキュリティにおける取引のすべてのボリュームの合計(1または5分)。私は例を作成しました :
void getPercentageOfVolume(String symbol, Integer date, Timestamp timeFrom, Timestamp timeTill, Integer barWidth) {
char MILLISECOND_TO_MINUTE_MULTIPLIER = 60_000;
LOG.info("start");
JavaPairRDD<Tuple2, Integer> counts = javaFunctions(sparkContext).cassandraTable("schema", "trade")
.filter(row ->
row.getString("symbol").equals(symbol) && row.getInt("date").equals(date) &&
row.getDateTime("trade_time").getMillis() >= timeFrom.getTime() &&
row.getDateTime("trade_time").getMillis() < timeTill.getTime())
.mapToPair(row ->
new Tuple2<>(
new Tuple2(
new Timestamp(
(row.getDateTime("trade_time").getMillis()/(barWidth * MILLISECOND_TO_MINUTE_MULTIPLIER)) * barWidth * MILLISECOND_TO_MINUTE_MULTIPLIER
),
row.getString("reporting_venue")),
row.getInt("quantity")
)
).reduceByKey((a, b) -> a + b);
LOG.info(counts.collect().toString());
LOG.info("finish");
}
は[2016年6月15日09:25:27.014] [情報] [メイン] [EquityTCAAnalytics]スタート
[2016年6月15日9時25分:28.000] [メイン] [NettyUtil]それを使用してクラスパスでNettyのネイティブepollトランスポートを発見
[2016-06-15 09:25:28.518] [メイン] [クラスタ]新しいカサンドラホスト/ [INFO] [main] [LocalNodeFirstLoadBalancingPolicy]ホストノード1(datacenter1)が追加されました
[2016-06-15 09:25:28.519] [INFO]ノード1:9042追加
[2016-06-15 09:25:28.519] [メイン] [Clu新しいカサンドラホスト/ノード2:9042が追加されました
[2016-06-15 09:25:28.520]カサンドラクラスターに接続されたカサンドラクラスターに接続しました:Cassandra
[2016-06-15 09:25] :29.115] [情報] [メイン] [SparkContext]ジョブを開始:EquityTCAAnalytics.java:88で収集
[2016-06-15 09:25:29.385] [情報] [ダグスケジューライベントループ] [DAGScheduler ] RAG 2の登録(EquityTCAAnalytics.java:78のmapToPair)
[2016-06-15 09:25:29.388] [情報] [ダグスケジューライベントループ] [DAGScheduler]ジョブ0(EquityTCAAnalyticsで収集)。 [INFO] [dag-scheduler-event-loop] [DAGScheduler]最終ステージ:ResultStage 1(EquityTCAAnalytics.java:88で収集します。 )[DAGスケジューライベントループ] [DAGScheduler]最終ステージの親:リスト(ShuffleMapStage 0)
[2016-06-15 09:25] :29.391] [情報] [ダグスケジューライベントループ] [DAGScheduler]親なし:リスト(ShuffleMapStage 0)
[2016-06-15 09:25:29.400] [情報] [ダグスケジューライベント - ループ] [DAGScheduler]不足している親を持たないShuffleMapStage 0(EquityTCAAnalytics.java:78のmapToPairでのMapPartitionsRDD [2])。
[2016-06-15 09:25:29.594] [INFO] [dag-scheduler-イベントループ] [MemoryStore]メモリに値として格納されたブロードキャストをブロックする(推定サイズ10.8 KB、フリー10.8 KB)
[2016-06-15 09:25:29.642] [INFO] [dag-scheduler-event-loop] [MemoryStore]ブロックbroadcast_0_piece0ストア[INFO] [INFO] [dispatcher-event-loop-7] [BlockManagerInfo] node2のメモリにbroadcast_0_piece0を追加しました。(メモリサイズは5.4KB、空き容量は16.3KB)
[2016-06-15 09:25:29.647] [INFO] [dag-scheduler-event-loop] [SparkContext] DAGSchedulerでブロードキャストからブロードキャスト0を作成しました。 [DAGスケジューラーイベントループ] [DAGScheduler] ShuffleMapStage 0(MapPartitionsRDD [2]:EquityTCAAnalytics.javaのMapPartitionsRDD [2])から5つの欠落したタスクを送信しています。 :78)
[INFO] [ダグスケジューライベントループ] [TaskSchedulerImpl]タスクセット0.0を5つのタスクに追加する
[2016-06-15 09:25] :30。006] [INFO] [ディスパッチャイベントループ7] [SparkDeploySchedulerBackend] ID 0の登録済み実行者NettyRpcEndpointRef(null)(ノード1:41122)
[2016-06-15 09:25:30.040] [INFO] [ディスパッチャ-event-loop-7] [TaskSetManager]ステージ0.0でタスク0.0を開始する(TID 0、ノード1、パーティション0、NODE_LOCAL、11725バイト)
[2016-06-15 09:25:30.051] [INFO] [dispatcher-イベントループ7] [TaskSetManager]ステージ0.0(タスク1、ノード1、パーティション1、NODE_LOCAL、11317バイト)でタスク1.0を開始
[2016-06-15 09:25:30.054] [INFO] [dispatcher-event [INFO] [INFO] [dispatcher-event-] [TaskSetManager]タスク0.0をステージ0.0(TID 2、ノード1、パーティション2、NODE_LOCAL、11929バイト)から起動します。
[2016-06-15 09:25:30.057]ループ-7] [TaskSetManager]ステージ0.0でタスク3.0を開始(TID 3、ノード1、パーティション3、NO [INFO] [dispatcher-event-loop-7] [TaskSetManager]ステージ0.0でタスク4.0を開始しています(TID 4、ノード1、パーティション4、NODE_LOCAL 、11560 bytes)[INFO] [dispatcher-event-loop-7] [SparkDeploySchedulerBackend]登録されたエグゼキュータNettyRpcEndpointRef(null)(CassandraCH4.ehubprod.local:33668)ID付き
[2016-06-15 09:25:30.077] [BlockManagerMasterEndpoint]ブロックマネージャnode1を登録すると、BlockManagerId(0、node1、36512)のRAMが511.1 MBの36512になります。 [INFO] [dispatcher-event-loop-3] [BlockManagerMasterEndpoint]ブロックマネージャCassandraCH4.ehubprod.localを登録しています:33610、511.1 MB RAM、BlockManagerId(1、CassandraCH4.ehubprod 。地元、33610)
[INFO] [dispatcher-event-loop-2] [BlockManagerInfo] node1のメモリにbroadcast_0_piece0を追加:36512(サイズ:5.4 KB、空き容量:511.1 MB [CassandraConnector] [CassandraConnector] Cassandraクラスターから切断されました:Cassandra
[2016-06-15 09:25:48.914]
[2016-06-15 09:25:36.764] [INFO] [task-result-getter-0] [TaskSetManager] node1の18854 msでステージ0.0(TID 4)で終了したタスク4.0(1/5)
[2016-06-15 09:25:55.541]情報] [タスク - 結果 - ゲッター-1] [TaskSetManager]ノード0.05(2/5)で25489ミリ秒でステージ0.0(TID 2)のタスク2.0を完了
[2016-06-15 09:25:57.837] [情報] [task-result-getter-2] [TaskSetManager] 27.795 msでステージ0.0の完了済みタスク1.0(TID 1) [情報] [タスク - 結果 - ゲッター - 3] [TaskSetManager] 27.09.0秒でステージ0.0(TID 0)で完了したタスク0.0(0/3)
[2016-06-15 09:25:57.931] [INFO] [task-result-getter-0] [TaskSetManager] 31.02(TID 3)のタスク3.0をノード1で31302ミリ秒で終了しました。 (5/5)
[INFO] [ダグスケジューライベントループ] [DAGScheduler] ShuffleMapStage 0(EquityTCAAnalytics.java:78のmapToPair)が31.602で終了しました
[INFO] [ダグスケジューライベントループ] [DAGScheduler]新しく実行可能なステージを検索する
[2016-06-15 09:26:01.360] [INFO] [dag-scheduler-event-loop] [DAGScheduler]実行中:Set()
[task-result-getter-0] [TaskSchedulerImpl]プールからタスクがすべて完了したTaskSet 0.0を削除しました
[2016-06-15 09:06:06:06: 26:01.362] [情報] [ダグスケジューライベントループ] [DAGScheduler]待機:(結果ステージ1)を設定します。
[2016-06-15 09:26:01.362] [DAGScheduler] [DAGScheduler] [(DAGScheduler)] [(DAGScheduler)]を[reduceByKeyでShuffledRDD [3]を送信しています。 [info] [dag-scheduler-event-loop] [MemoryStore]メモリに値として格納されたブロックをブロックします(推定値)。サイズ3.6 KB、無料199キロバイト)
[2016年6月15日09:26:01.382] [INFO] [DAG-スケジューライベントループ] [MemoryStore]ブロックは、メモリ内のバイト(推定サイズ2.1キロバイト、フリー21.9キロバイト)
として格納broadcast_1_piece[INFO] [dispatcher-event-loop-1] [BlockManagerInfo] node2のメモリにbroadcast_1_piece0を追加しました。サイズ:2.1 KB、空き容量:2.4 GB
[ [DAG-scheduler-event-loop] [SparkContext] DAGScheduler.scalaでブロードキャストからブロードキャスト1を作成しました。1006
[2016-06-15 09:26:01.384] [2016-06-15 09:26:01.384] ] [INFO] [DAG-スケジューライベントループ] [DAGScheduler] ResultStage 1から5つの欠落タスクを提出(ShuffledRDD [3] EquityTCAAnalytics.java:87でreduceByKeyで)
[2016年6月15日09:26:01.386 ] [情報] [ダグschedu [INFO] [INFO] [dispatcher-event-loop-4] [TaskSetManager]ステージでタスク0.0を開始しています。 [INFO] [dispatcher-event-loop-4] [TaskSetManager] 1.0のステージ1.0でタスク1.0を開始しています(TID 5、ノード1、ノード0、NODE_LOCAL、2786バイト)
[2016-06-15 09:26:01.390] [INFO] [dispatcher-event-loop-4] [TaskSetManager] 1.0のステージでタスク2.0を開始する(TID 6、ノード1、NODE_LOCAL、2786バイト)
[2016-06-15 09:26:01.397] [INFO] [dispatcher-event-loop-4] [TaskSetManager]ステージ1.0でタスク3.0を開始しています(TID(TID 7)、ノード1、パーティション2、NODE_LOCAL、2786バイト)
[2016-06-15 09:26:01.398] 8、ノード1、パーティション3、NODE_LOCAL、2786バイト)
[TABIT 9、ノード1、パーティション4、NODE_LOCAL、2786バイト]
[タスクセットマネージャ] [1.0] [INFO] [dispatcher-event-loop-4] [BlockManagerInfo] node1のメモリにbroadcast_1_piece0を追加:36512(サイズ:2.1 KB、空き:511.1 MB)
[2016 -06-15 09:26:01.452] [INFO] [ディスパッチャ・イベント・ループ・6] [MapOutputTrackerMasterEndpoint]ノード1にシャッフル0地図上の出力位置を送信する質問:41122
[2016年6月15日9時26分: 01.456] [INFO] [ディスパッチャイベントループ-6]シャッフル0の出力状態の[MapOutputTrackerMaster]サイズが161バイト
[2016年6月15日09である:26:01.526] [INFO] [タスク結果ゲッター-1] [TaskSetManager]ノード1.0の128ミリ秒でステージ1.0の終了タスク4.0(TID 9)(1タスク1:タスク1:タスク1:タスク1:タスク1:タスク1:タスク1:タスク1:タスク2:タスク1:タスク2: [情報] [task-result-getter-2] [TaskSetManager]ノード1.01の193 msでステージ1.0(TID 5)で完了したタスク0.0(3/5 [1]タスク1.0の終了タスク1.0(TID 6)、ノード1の199ミリ秒(4/5)で完了1.0。
[2016年6月15日09:26:01.599]ステージ1.0における[INFO] [タスク結果ゲッター-2] [TaskSetManager]終了タスク3.0 node1で200ミリ秒(TID 8)(5/5)
[2016-06-15 09:26:01.599] [INFO] [task-result-getter-2] [TaskSchedulerImpl]プールからタスクがすべて完了したTaskSet 1.0を削除しました。
[2016-06-15 09:26:01.599] [DAGスケジューライベントループ] [DAGScheduler] ResultStage 1(EquityTCAAnalytics.java:88で収集)は0.202秒で終了しました
[2016-06 -15 09:26:01.612] [メイン] [DAGScheduler]ジョブ0が終了:EquityTCAAnalytics.java:88で収集、は32.496470秒を取った
[2016-06-15 09:26:01.634] ((2016-06-10 14:25:00.0、B)、5241)、((2016-06-10 13:45:00.0 DA)、6944) (2016-06-10 14:55:00.0、A)、1300)]
[2016-06-15 09:26](2016-06-10 10:55:00.0、QD)、109080) 01。641] [情報] [メイン] [EquityTCAAalyaly]終了
は32.5ですか?
正常であることは、達成しようとしているタスクおよび処理しているデータの量に比例します。また、ネットワークIOにボトルネックが発生する可能性があります。実際にあなたの質問は、与えられた情報では答えられません。 – eliasah