2017-05-17 10 views
1

Siddhiは、アプリケーション用の埋め込みCEPプロセッサとして評価しています。スケールテスト中にルールの数を増やすと、イベントを挿入するのに要する時間が各固有IDごとに大幅に増加することがわかりました。たとえば:Siddhiパフォーマンスの問題を送信する - 埋め込み

  • は(窓やIDによるパーティションを使用して)10時ルール
  • ロード千一意のエントリを作成します。タイミングを追跡します。 100Kのユニークなエントリに近づくにつれ、挿入のパフォーマンスはms - >数秒で増加することに注意してください。あなたが持っているルールが増えると、この時間も増えます。
  • 各レコードに「次の」時刻をロードします。挿入時間はIDに関係なく一定です。ここで

これを再現するコードファイルである:ここ

public class ScaleSiddhiTest { 

    private SiddhiManager siddhiManager = new SiddhiManager(); 

    @Test 
    public void testWindow() throws InterruptedException { 

     String plan = "@Plan:name('MetricPlan') \n" + 
       "define stream metricStream (id string, timestamp long, metric1 double,metric2 double); \n" + 
       "partition with (id of metricStream) begin \n" + 
       "\n" + 
       "from metricStream#window.externalTime(timestamp, 300000) \n" + 
       "select id, avg(metric1) as value, 'Metric1-rule0' as ruleName\n" + 
       "having value>-1.000000 \n" + 
       "insert into outputStream;\n" + 
       "\n" + 
       "from metricStream#window.externalTime(timestamp, 300000) \n" + 
       "select id, avg(metric2) as value, 'Metric2-rule1' as ruleName\n" + 
       "having value>-1.000000 \n" + 
       "insert into outputStream;\n" + 
       "\n" + 
       "from metricStream#window.externalTime(timestamp, 300000) \n" + 
       "select id, avg(metric1) as value, 'Metric1-rule2' as ruleName\n" + 
       "having value>-1.000000 \n" + 
       "insert into outputStream;\n" + 
       "\n" + 
       "from metricStream#window.externalTime(timestamp, 300000) \n" + 
       "select id, avg(metric2) as value, 'Metric2-rule3' as ruleName\n" + 
       "having value>-1.000000 \n" + 
       "insert into outputStream;\n" + 
       "\n" + 
       "from metricStream#window.externalTime(timestamp, 300000) \n" + 
       "select id, avg(metric1) as value, 'Metric1-rule4' as ruleName\n" + 
       "having value>-1.000000 \n" + 
       "insert into outputStream;\n" + 
       "\n" + 
       "from metricStream#window.externalTime(timestamp, 300000) \n" + 
       "select id, avg(metric2) as value, 'Metric2-rule5' as ruleName\n" + 
       "having value>-1.000000 \n" + 
       "insert into outputStream;\n" + 
       "\n" + 
       "from metricStream#window.externalTime(timestamp, 300000) \n" + 
       "select id, avg(metric1) as value, 'Metric1-rule6' as ruleName\n" + 
       "having value>-1.000000 \n" + 
       "insert into outputStream;\n" + 
       "\n" + 
       "from metricStream#window.externalTime(timestamp, 300000) \n" + 
       "select id, avg(metric2) as value, 'Metric2-rule7' as ruleName\n" + 
       "having value>-1.000000 \n" + 
       "insert into outputStream;\n" + 
       "\n" + 
       "from metricStream#window.externalTime(timestamp, 300000) \n" + 
       "select id, avg(metric1) as value, 'Metric1-rule8' as ruleName\n" + 
       "having value>-1.000000 \n" + 
       "insert into outputStream;\n" + 
       "\n" + 
       "from metricStream#window.externalTime(timestamp, 300000) \n" + 
       "select id, avg(metric2) as value, 'Metric2-rule9' as ruleName\n" + 
       "having value>-1.000000 \n" + 
       "insert into outputStream;\n" + 
       "\n" + 
       "from metricStream#window.externalTime(timestamp, 300000) \n" + 
       "select id, avg(metric1) as value, 'Metric1-rule10' as ruleName\n" + 
       "having value>-1.000000 \n" + 
       "insert into outputStream;\n" + 
       "\n" + 
       "from metricStream#window.externalTime(timestamp, 300000) \n" + 
       "select id, avg(metric2) as value, 'Metric2-rule11' as ruleName\n" + 
       "having value>-1.000000 \n" + 
       "insert into outputStream;\n" + 
       "\n" + 
       "from metricStream#window.externalTime(timestamp, 300000) \n" + 
       "select id, avg(metric1) as value, 'Metric1-rule12' as ruleName\n" + 
       "having value>-1.000000 \n" + 
       "insert into outputStream;\n" + 
       "\n" + 
       "from metricStream#window.externalTime(timestamp, 300000) \n" + 
       "select id, avg(metric2) as value, 'Metric2-rule13' as ruleName\n" + 
       "having value>-1.000000 \n" + 
       "insert into outputStream;\n" + 
       "\n" + 
       "from metricStream#window.externalTime(timestamp, 300000) \n" + 
       "select id, avg(metric1) as value, 'Metric1-rule14' as ruleName\n" + 
       "having value>-1.000000 \n" + 
       "insert into outputStream;\n" + 
       "\n" + 
       "from metricStream#window.externalTime(timestamp, 300000) \n" + 
       "select id, avg(metric2) as value, 'Metric2-rule15' as ruleName\n" + 
       "having value>-1.000000 \n" + 
       "insert into outputStream;\n" + 
       "\n" + 
       "from metricStream#window.externalTime(timestamp, 300000) \n" + 
       "select id, avg(metric1) as value, 'Metric1-rule16' as ruleName\n" + 
       "having value>-1.000000 \n" + 
       "insert into outputStream;\n" + 
       "\n" + 
       "from metricStream#window.externalTime(timestamp, 300000) \n" + 
       "select id, avg(metric2) as value, 'Metric2-rule17' as ruleName\n" + 
       "having value>-1.000000 \n" + 
       "insert into outputStream;\n" + 
       "\n" + 
       "from metricStream#window.externalTime(timestamp, 300000) \n" + 
       "select id, avg(metric1) as value, 'Metric1-rule18' as ruleName\n" + 
       "having value>-1.000000 \n" + 
       "insert into outputStream;\n" + 
       "\n" + 
       "from metricStream#window.externalTime(timestamp, 300000) \n" + 
       "select id, avg(metric2) as value, 'Metric2-rule19' as ruleName\n" + 
       "having value>-1.000000 \n" + 
       "insert into outputStream;\n" + 
       "end ;"; 


     // Generating runtime 
     ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(plan); 

     AtomicInteger counter = new AtomicInteger(); 

     // Adding callback to retrieve output events from query 
     executionPlanRuntime.addCallback("outputStream", new StreamCallback() { 
      @Override 
      public void receive(Event[] events) { 
       counter.addAndGet(events.length); 
      } 
     }); 

     // Starting event processing 
     executionPlanRuntime.start(); 

     // Retrieving InputHandler to push events into Siddhi 
     InputHandler inputHandler = executionPlanRuntime.getInputHandler("metricStream"); 

     int numOfUniqueItems = 10000; 

     IntStream.range(0, 2).forEach(curMinute->{ 
      long iterationStartTime = System.currentTimeMillis(); 
      AtomicLong lastStart = new AtomicLong(System.currentTimeMillis()); 
      IntStream.range(0, numOfUniqueItems).forEach(id->{ 
       try { 
        inputHandler.send(TimeUnit.MINUTES.toMillis(curMinute), new Object[]{id, TimeUnit.MINUTES.toMillis(curMinute), 10.0, 20.0}); 
        if(id > 0 && id % 1000 == 0){ 
         long ls = lastStart.get(); 
         long curTime = System.currentTimeMillis(); 
         lastStart.set(curTime); 
         System.out.println("It took " + (curTime - ls) + " ms to load the last 1000 entities. Num Alarms So Far: " + counter.get()); 
        } 
       } catch (Exception e){ 
        throw new RuntimeException(e); 
       } 
      }); 
      System.out.println("It took " + (System.currentTimeMillis() - iterationStartTime) + "ms to load the last " + numOfUniqueItems); 
     }); 

     // Shutting down the runtime 
     executionPlanRuntime.shutdown(); 

     siddhiManager.shutdown(); 
    } 

} 

私の質問です:

  1. 我々は最初のロードパフォーマンスの問題につながることかもしれない、ここで間違って何をやっていますか?
  2. この問題を回避するための推奨事項はありますか?

更新日: 下記の推奨回答では、パーティションの代わりにグループを使用するようにテストを更新しました。それはさらに悪くなる以外は同様の成長は、各オブジェクトの初期荷重のために示されている: 具体的には、私はにルールを変更された:ここ

@Plan:name('MetricPlan') 
define stream metricStream (id string, timestamp long, metric1 double,metric2 double); 

from metricStream#window.externalTime(timestamp, 300000) 
select id, avg(metric1) as value, 'Metric1-rule0' as ruleName 
group by id 
having value>-1.000000 
insert into outputStream; 

... 

てパーティション対グループ化するための結果出力です。どちらも初期負荷の増加を示します。負荷によって

グループは、負荷によって

Load 10K Items - Group By   
It took 3098 ms to load the last 1000 entities. Num Alarms So Far: 20020   
It took 2507 ms to load the last 1000 entities. Num Alarms So Far: 40020   
It took 5993 ms to load the last 1000 entities. Num Alarms So Far: 60020   
It took 4878 ms to load the last 1000 entities. Num Alarms So Far: 80020   
It took 6079 ms to load the last 1000 entities. Num Alarms So Far: 100020   
It took 8466 ms to load the last 1000 entities. Num Alarms So Far: 120020   
It took 11840 ms to load the last 1000 entities. Num Alarms So Far: 140020   
It took 12634 ms to load the last 1000 entities. Num Alarms So Far: 160020   
It took 14779 ms to load the last 1000 entities. Num Alarms So Far: 180020   
It took 87053ms to load the last 10000   

Load Same 10K Items - Group By   
It took 31 ms to load the last 1000 entities. Num Alarms So Far: 220020   
It took 22 ms to load the last 1000 entities. Num Alarms So Far: 240020   
It took 19 ms to load the last 1000 entities. Num Alarms So Far: 260020   
It took 19 ms to load the last 1000 entities. Num Alarms So Far: 280020   
It took 17 ms to load the last 1000 entities. Num Alarms So Far: 300020   
It took 20 ms to load the last 1000 entities. Num Alarms So Far: 320020   
It took 17 ms to load the last 1000 entities. Num Alarms So Far: 340020   
It took 18 ms to load the last 1000 entities. Num Alarms So Far: 360020   
It took 18 ms to load the last 1000 entities. Num Alarms So Far: 380020   
It took 202ms to load the last 10000   

パーティションは、成長のこのタイプはほとんどそれが見つからないIDの負荷にそれを意味すると思わ

Load 10K Items - Partition By   
It took 1148 ms to load the last 1000 entities. Num Alarms So Far: 20020   
It took 1870 ms to load the last 1000 entities. Num Alarms So Far: 40020   
It took 1393 ms to load the last 1000 entities. Num Alarms So Far: 60020   
It took 1745 ms to load the last 1000 entities. Num Alarms So Far: 80020   
It took 2040 ms to load the last 1000 entities. Num Alarms So Far: 100020   
It took 2108 ms to load the last 1000 entities. Num Alarms So Far: 120020   
It took 3068 ms to load the last 1000 entities. Num Alarms So Far: 140020   
It took 2798 ms to load the last 1000 entities. Num Alarms So Far: 160020   
It took 3532 ms to load the last 1000 entities. Num Alarms So Far: 180020   
It took 23363ms to load the last 10000   

Load Same 10K Items - Partition By   
It took 39 ms to load the last 1000 entities. Num Alarms So Far: 220020   
It took 21 ms to load the last 1000 entities. Num Alarms So Far: 240020   
It took 30 ms to load the last 1000 entities. Num Alarms So Far: 260020   
It took 22 ms to load the last 1000 entities. Num Alarms So Far: 280020   
It took 35 ms to load the last 1000 entities. Num Alarms So Far: 300020   
It took 26 ms to load the last 1000 entities. Num Alarms So Far: 320020   
It took 25 ms to load the last 1000 entities. Num Alarms So Far: 340020   
It took 34 ms to load the last 1000 entities. Num Alarms So Far: 360020   
It took 48 ms to load the last 1000 entities. Num Alarms So Far: 380020   
It took 343ms to load the last 10000  

を結果結果ハッシュなどを活用する代わりに、他のすべてのIDと比較しています。したがって、一意のIDの数が増えるにつれて直線的な成長が見られます。

答えて

0

はい、動作は期待どおりです。 プッシュする新しいIDごとにIDを持つパーティションを使用すると、そのパーティションの新しいインスタンスが作成されます。そのため、パーティションが大きければ、パーティションの作成に時間がかかることがあります。一意のIDに対して既にパーティションが作成されているので、2回目にそれがより速く処理されます。

あなたの場合、パーティションの使用は理想的なソリューションではないと思います。パーティションは、内部ストリームがある場合や、非時間ベースのウィンドウを使用する場合にのみ便利です。

など。あなただけのグループの時間ベースの集計が、その後group byキーワードを使用したい場合は

partition with (id of metricStream) begin from metricStream ... insert into #TempStream ; from #TempStream .... select ... insert into outputStream; end;

from metricStream#window.externalTime(timestamp, 300000) select id, avg(metric1) as value, 'Metric1-rule18' as ruleName group by id having value>-1.000000 insert into outputStream;

+0

お寄せいただきありがとうございます。 group by vs partition byを使用した結果に関する私の質問を更新しました。結果は同じですが、最初の負荷がさらに悪化します。新しい鍵が見つかったときにすべてのノードを横断することを意味するようです – user8026282

関連する問題