0

Apache HttpComponentsのHttpClient v4.5.3を使用してHTTPリクエストを実行するScala関数を実装しました。考え方は、レスポンスを解析し、それが正常かどうか(すなわち、リターンコード200)をチェックすることです。そうでない場合は、n回再試行してください。スカラ:マップ値のメソッドの複製

応答がOKの場合、すべて正常に動作します。 ロガー出力から、最初にHTTPエラーが発生した場合に、再試行も期待通りに実行されていることがわかります。しかし、n回失敗すると、もう一度プロセス全体を再開したようです。

import org.apache.http.client.methods.{CloseableHttpResponse, HttpPost, HttpUriRequest} 
[...] 

class SearchQuery { 
    [...] 
    def executeRequest(retries: Int = 2, delay: Int = 1): Option[SearchResponse] = { 
     val request: HttpUriRequest = makeRequest 
     val response: CloseableHttpResponse = config.httpClient.execute(request) 
     val searchResults: Option[SearchResponse] = parseResponse(response) 
     response.close() 
     if (searchResults.isEmpty && retries > 0) { 
     logger.warn(s"Failed to retrieve response from ${source}. " + 
      s"Retrying $retries more time(s) in $delay second(s)...") 
     Thread.sleep(delay * 1000) 
     executeRequest(retries - 1, delay) 
     } 
     else searchResults 
    } 
    [...] 
} 

HTTPリクエストを生成makeRequest()方法

これは、リクエストを実行する機能です。 config.httpClientは、再利用可能なHttpClientインスタンスを提供します。

再帰の呼び出しとは別に、次のようにexecuteRequests()が呼び出されます。この考え方は、呼び出し元によって定義された1つまたは複数のサーバー(ソース)を、呼び出しが並行して実行されるようにすることです。それらはenumタイプSourceで定義されています。 queryFromfile()は、が呼び出されるSearchQueryオブジェクトを生成するために、指定されたファイルの内容を読み取ります。この例では

val sources: Set[Source] = Set(Source.1) 
val file: File = ... 
val responses: parallel.ParMap[Source, Option[SearchResponse]] = sources 
    .par 
    .map(source => (source, SearchQuery.queryFromFile(file, source))) 
    .toMap 
    .mapValues(query => query.executeRequest()) 

executeRequestは、各ソースのために一度呼び出されることになっています。 1つのソースが正常に通過し、もう1つはここで失敗すると予想されます。ログから

:予想通り

IntelliJのデバッガを使用して
14:56:48.656 [scala-execution-context-global-15] WARN query.SearchQuery - Failed to retrieve response from source1. Retrying 2 more time(s) in 3 second(s)... 
14:57:07.136 [scala-execution-context-global-15] WARN query.SearchQuery - Failed to retrieve response from source1. Retrying 1 more time(s) in 3 second(s)... 
14:57:25.538 [ScalaTest-run-running-FileProcessorTest] ERROR process.FileProcessor - Failed to retrieve results for file 'XXX' from sources: source1. Continuing. 
14:57:40.933 [scala-execution-context-global-15] WARN query.SearchQuery - Failed to retrieve response from source1. Retrying 2 more time(s) in 3 second(s)... 
14:57:59.214 [scala-execution-context-global-15] WARN query.SearchQuery - Failed to retrieve response from source1. Retrying 1 more time(s) in 3 second(s)... 

、ロジックは、n個の試行の後にラインelse searchResultsで終わる、最初のように見えます。ただし、実際に結果を返すのではなく、if句に戻り、再帰呼び出しexecuteRequest(retries - 1, delay)にジャンプします。

更新: 私はこの動作はresponsesヴァルに次の動作によって引き起こされた考え出し:

ここ
val emptyResponses = responses.filter(_._2.isEmpty) 
if (emptyResponses.nonEmpty) 
    logger.error(
    s"Failed to retrieve results for '$fileName' from sources: " + 
     s"${emptyResponses.keys.mkString(",")}. Continuing." 
) 

呼び出しのいずれかが失敗し、彼らがした場合はエラーをログに記録した場合、私は確認してください。私はなぜこれがexecuteRequest()をもう一度呼び出すのか理解していません。それはなぜですか?それは概念的に非常に似ているようだにもかかわらず

はさらに、次の行はexecuteRequest()に2回目の呼び出しにはなりません。

responses.values 
    .filter(response => response.isDefined) 
    .map(response => response.get) 
+0

注: 'filter'の代わりに' withFilter'を使用しても、動作は変更されません。 – Carsten

答えて

0

値にアクセスするときexecuteResponse()が別の時間に実行された理由は、の値Scala Mapはアクセスされるたびに再計算されます。 responsesが故にタプル(source, Option[SearchResponse]としてモデル化される必要がある:

val responses = sources 
    .par 
    .map(source => (source, SearchQuery.queryFromFile(file, source))) 
    .map(pair => (pair._1, pair._2.executeRequest())) 

これは、基本的mapValuesの簡潔性を排除するが、タプルの2番目の要素のための固定値になります。

+1

厳密に言えば、mapValues'呼び出しの結果である 'Map'sと' ParMap'に対してのみ値が再計算されます。もう一つの可能​​な修正は 'respond'計算の最後に' .toMap'をもう一つ追加することです。これにより結果の「実現」が強制され、それ以降のアクセスで再計算されません。 – SergGr

関連する問題