Siddhiは、アプリケーション用の埋め込みCEPプロセッサとして評価しています。スケールテスト中にルールの数を増やすと、イベントを挿入するのに要する時間が各固有IDごとに大幅に増加することがわかりました。たとえば:Siddhiパフォーマンスの問題を送信する - 埋め込み
- は(窓やIDによるパーティションを使用して)10時ルール
- ロード千一意のエントリを作成します。タイミングを追跡します。 100Kのユニークなエントリに近づくにつれ、挿入のパフォーマンスはms - >数秒で増加することに注意してください。あなたが持っているルールが増えると、この時間も増えます。
- 各レコードに「次の」時刻をロードします。挿入時間はIDに関係なく一定です。ここで
これを再現するコードファイルである:ここ
public class ScaleSiddhiTest {
private SiddhiManager siddhiManager = new SiddhiManager();
@Test
public void testWindow() throws InterruptedException {
String plan = "@Plan:name('MetricPlan') \n" +
"define stream metricStream (id string, timestamp long, metric1 double,metric2 double); \n" +
"partition with (id of metricStream) begin \n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric1) as value, 'Metric1-rule0' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric2) as value, 'Metric2-rule1' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric1) as value, 'Metric1-rule2' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric2) as value, 'Metric2-rule3' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric1) as value, 'Metric1-rule4' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric2) as value, 'Metric2-rule5' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric1) as value, 'Metric1-rule6' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric2) as value, 'Metric2-rule7' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric1) as value, 'Metric1-rule8' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric2) as value, 'Metric2-rule9' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric1) as value, 'Metric1-rule10' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric2) as value, 'Metric2-rule11' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric1) as value, 'Metric1-rule12' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric2) as value, 'Metric2-rule13' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric1) as value, 'Metric1-rule14' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric2) as value, 'Metric2-rule15' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric1) as value, 'Metric1-rule16' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric2) as value, 'Metric2-rule17' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric1) as value, 'Metric1-rule18' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"\n" +
"from metricStream#window.externalTime(timestamp, 300000) \n" +
"select id, avg(metric2) as value, 'Metric2-rule19' as ruleName\n" +
"having value>-1.000000 \n" +
"insert into outputStream;\n" +
"end ;";
// Generating runtime
ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(plan);
AtomicInteger counter = new AtomicInteger();
// Adding callback to retrieve output events from query
executionPlanRuntime.addCallback("outputStream", new StreamCallback() {
@Override
public void receive(Event[] events) {
counter.addAndGet(events.length);
}
});
// Starting event processing
executionPlanRuntime.start();
// Retrieving InputHandler to push events into Siddhi
InputHandler inputHandler = executionPlanRuntime.getInputHandler("metricStream");
int numOfUniqueItems = 10000;
IntStream.range(0, 2).forEach(curMinute->{
long iterationStartTime = System.currentTimeMillis();
AtomicLong lastStart = new AtomicLong(System.currentTimeMillis());
IntStream.range(0, numOfUniqueItems).forEach(id->{
try {
inputHandler.send(TimeUnit.MINUTES.toMillis(curMinute), new Object[]{id, TimeUnit.MINUTES.toMillis(curMinute), 10.0, 20.0});
if(id > 0 && id % 1000 == 0){
long ls = lastStart.get();
long curTime = System.currentTimeMillis();
lastStart.set(curTime);
System.out.println("It took " + (curTime - ls) + " ms to load the last 1000 entities. Num Alarms So Far: " + counter.get());
}
} catch (Exception e){
throw new RuntimeException(e);
}
});
System.out.println("It took " + (System.currentTimeMillis() - iterationStartTime) + "ms to load the last " + numOfUniqueItems);
});
// Shutting down the runtime
executionPlanRuntime.shutdown();
siddhiManager.shutdown();
}
}
私の質問です:
- 我々は最初のロードパフォーマンスの問題につながることかもしれない、ここで間違って何をやっていますか?
- この問題を回避するための推奨事項はありますか?
更新日: 下記の推奨回答では、パーティションの代わりにグループを使用するようにテストを更新しました。それはさらに悪くなる以外は同様の成長は、各オブジェクトの初期荷重のために示されている: 具体的には、私はにルールを変更された:ここ
@Plan:name('MetricPlan')
define stream metricStream (id string, timestamp long, metric1 double,metric2 double);
from metricStream#window.externalTime(timestamp, 300000)
select id, avg(metric1) as value, 'Metric1-rule0' as ruleName
group by id
having value>-1.000000
insert into outputStream;
...
てパーティション対グループ化するための結果出力です。どちらも初期負荷の増加を示します。負荷によって
グループは、負荷によって
Load 10K Items - Group By
It took 3098 ms to load the last 1000 entities. Num Alarms So Far: 20020
It took 2507 ms to load the last 1000 entities. Num Alarms So Far: 40020
It took 5993 ms to load the last 1000 entities. Num Alarms So Far: 60020
It took 4878 ms to load the last 1000 entities. Num Alarms So Far: 80020
It took 6079 ms to load the last 1000 entities. Num Alarms So Far: 100020
It took 8466 ms to load the last 1000 entities. Num Alarms So Far: 120020
It took 11840 ms to load the last 1000 entities. Num Alarms So Far: 140020
It took 12634 ms to load the last 1000 entities. Num Alarms So Far: 160020
It took 14779 ms to load the last 1000 entities. Num Alarms So Far: 180020
It took 87053ms to load the last 10000
Load Same 10K Items - Group By
It took 31 ms to load the last 1000 entities. Num Alarms So Far: 220020
It took 22 ms to load the last 1000 entities. Num Alarms So Far: 240020
It took 19 ms to load the last 1000 entities. Num Alarms So Far: 260020
It took 19 ms to load the last 1000 entities. Num Alarms So Far: 280020
It took 17 ms to load the last 1000 entities. Num Alarms So Far: 300020
It took 20 ms to load the last 1000 entities. Num Alarms So Far: 320020
It took 17 ms to load the last 1000 entities. Num Alarms So Far: 340020
It took 18 ms to load the last 1000 entities. Num Alarms So Far: 360020
It took 18 ms to load the last 1000 entities. Num Alarms So Far: 380020
It took 202ms to load the last 10000
パーティションは、成長のこのタイプはほとんどそれが見つからないIDの負荷にそれを意味すると思わ
Load 10K Items - Partition By
It took 1148 ms to load the last 1000 entities. Num Alarms So Far: 20020
It took 1870 ms to load the last 1000 entities. Num Alarms So Far: 40020
It took 1393 ms to load the last 1000 entities. Num Alarms So Far: 60020
It took 1745 ms to load the last 1000 entities. Num Alarms So Far: 80020
It took 2040 ms to load the last 1000 entities. Num Alarms So Far: 100020
It took 2108 ms to load the last 1000 entities. Num Alarms So Far: 120020
It took 3068 ms to load the last 1000 entities. Num Alarms So Far: 140020
It took 2798 ms to load the last 1000 entities. Num Alarms So Far: 160020
It took 3532 ms to load the last 1000 entities. Num Alarms So Far: 180020
It took 23363ms to load the last 10000
Load Same 10K Items - Partition By
It took 39 ms to load the last 1000 entities. Num Alarms So Far: 220020
It took 21 ms to load the last 1000 entities. Num Alarms So Far: 240020
It took 30 ms to load the last 1000 entities. Num Alarms So Far: 260020
It took 22 ms to load the last 1000 entities. Num Alarms So Far: 280020
It took 35 ms to load the last 1000 entities. Num Alarms So Far: 300020
It took 26 ms to load the last 1000 entities. Num Alarms So Far: 320020
It took 25 ms to load the last 1000 entities. Num Alarms So Far: 340020
It took 34 ms to load the last 1000 entities. Num Alarms So Far: 360020
It took 48 ms to load the last 1000 entities. Num Alarms So Far: 380020
It took 343ms to load the last 10000
を結果結果ハッシュなどを活用する代わりに、他のすべてのIDと比較しています。したがって、一意のIDの数が増えるにつれて直線的な成長が見られます。
お寄せいただきありがとうございます。 group by vs partition byを使用した結果に関する私の質問を更新しました。結果は同じですが、最初の負荷がさらに悪化します。新しい鍵が見つかったときにすべてのノードを横断することを意味するようです – user8026282