2017-02-23 28 views
1

背景

現在、私はギャトリングを使用しているストレステストツールの機能解析セットを作成しています。Gatlingシナリオで複数のアクションを追加するアクション

これには、スクロールクエリとそれに続く更新APIコールによるelasticsearchの読み込みが含まれます。

私は

ステップ1達成したいこと:実行スクロールの開始剤を、それがさらにスクロールで使用することができます_scroll_idが照会保存

ステップ2:実行のスクロールクエリ各スクロールクエリの一部として返された各ヒットに対する修正を行い、それをelasticsearchに戻し、効果的に1つのスクロールクエリアクションから1000個のアクションを生成し、結果をサンプリングします。

ステップ1は簡単です。ステップ2はそんなにありません。

私は

を試してみた私は現在、JSON形式の結果を解析しResponseTransformer経由してこれを達成しようとしている、インデックスに別のexec(http(...).post(...) etc)をしようとし、それぞれに各1とスレッドオフ火災への変更を行います変更はelasticsearchに戻ります。

基本的に、私はそれについて間違った方法をしていると思います。インデックス作成スレッドは実行されることはなく、ガートリングによってサンプリングされることはありません。

ここに私のスクロールクエリアクションの本体です:

... 

    val pool = Executors.newFixedThreadPool(parallelism) 

    val query = exec(http("Scroll Query") 
    .get(s"/_search/scroll") 
    .body(ElFileBody("queries/scrollquery.json")).asJSON // Do the scroll query 
    .check(jsonPath("$._scroll_id").saveAs("scroll_id")) // Get the scroll ID from the response 
    .transformResponse { case response if response.isReceived => 
     new ResponseWrapper(response) { 
     val responseJson = JSON.parseFull(response.body.string) 
     // Get the hits and 
     val hits = responseJson.get.asInstanceOf[Map[String, Any]]("hits").asInstanceOf[Map[String,Any]]("hits").asInstanceOf[List[Map[String, Any]]] 
     for (hit <- hits) { 
      val id = hit.get("_id").get.asInstanceOf[String] 
      val immutableSource = hit.get("_source").get.asInstanceOf[Map[String, Any]] 
      val source = collection.mutable.Map(immutableSource.toSeq: _*) // Make the map mutable 
      source("newfield") = "testvalue" // Make a modification 
      Thread.sleep(pause) // Pause to simulate topology throughput 
      pool.execute(new DocumentIndexer(index, doctype, id, source)) // Create a new thread that executes the index request 
     } 
     } 
    }) // Make some mods and re-index into elasticsearch 

    ... 

DocumentIndexerは次のようになります。

class DocumentIndexer(index: String, doctype: String, id: String, source: scala.collection.mutable.Map[String, Any]) extends Runnable { 

    ... 

    val httpConf = http 
    .baseURL(s"http://$host:$port/${index}/${doctype}/${id}") 
    .acceptHeader("application/json") 
    .doNotTrackHeader("1") 
    .disableWarmUp 

    override def run() { 

    val json = new ObjectMapper().writeValueAsString(source) 

    exec(http(s"Index ${id}") 
     .post("/_update") 
     .body(StringBody(json)).asJSON) 

    } 

} 

質問

  1. これはガトリングを使用していても可能ですか?
  2. 達成したいことをどのように達成できますか?

ご協力いただきありがとうございます。

答えて

1

jsonPathを使用してJSONヒット配列を抽出し、要素をセッションに保存し、アクションチェーンでforeachを使用し、exec-ループ内のインデックスタスクを使用してインデックス作成を実行できますそれに応じて。

すなわち:
ScrollQuery

... 
    val query = exec(http("Scroll Query") 
    .get(s"/_search/scroll") 
    .body(ElFileBody("queries/scrollquery.json")).asJSON // Do the scroll query 
    .check(jsonPath("$._scroll_id").saveAs("scroll_id")) // Get the scroll ID from the response 
    .check(jsonPath("$.hits.hits[*]").ofType[Map[String,Any]].findAll.saveAs("hitsJson")) // Save a List of hit Maps into the session 
) 
... 

シミュレーション

... 
    val scrollQueries = scenario("Enrichment Topologies").exec(ScrollQueryInitiator.query, repeat(numberOfPagesToScrollThrough, "scrollQueryCounter"){ 
     exec(ScrollQuery.query, pause(10 seconds).foreach("${hitsJson}", "hit"){ exec(HitProcessor.query) }) 
    }) 
... 

HitProcessor

... 
    def getBody(session: Session): String = { 
    val hit = session("hit").as[Map[String,Any]] 
    val id = hit("_id").asInstanceOf[String] 
    val source = mapAsScalaMap(hit("_source").asInstanceOf[java.util.LinkedHashMap[String,Any]]) 
    source.put("newfield", "testvalue") 
    val sourceJson = new ObjectMapper().writeValueAsString(mapAsJavaMap(source)) 
    val json = s"""{"doc":${sourceJson}}""" 
    json 
    } 

    def getId(session: Session): String = { 
    val hit = session("hit").as[Map[String,Any]] 
    val id = URLEncoder.encode(hit("_id").asInstanceOf[String], "UTF-8") 
    val uri = s"/${index}/${doctype}/${id}/_update" 
    uri 
    } 

    val query = exec(http(s"Index Item") 
    .post(session => getId(session)) 
    .body(StringBody(session => getBody(session))).asJSON) 
... 

免責事項:このコードはまだ最適化が必要です!そして、私はまだ実際に多くのスカラを学んでいません。もっと良い解決策を使ってコメントしてください。

これを実行した後、私が実際に達成したいのは、与えられた数の索引作成タスクを並列化することです。すなわち、私は1000ヒットを返し、個々のヒットごとにインデックスタスクを実行したいが、それらを繰り返し実行し、順番に実行するのではなく、同時に10個もやりたい。

しかし、これは別の質問だと思いますが、実際にはそれを提示します。

関連する問題