2017-10-10 13 views
2

zuulを使用してリクエストをキャッシュします。キャッシュはRedisPOJOとして格納され、プレーンテキスト(gzip圧縮データではありません)を含みます。キャッシュのNetflix Zuul Pre Filterが、圧縮された応答の量が少なくても動作しません。

通常のテストと統合テストでは、すべて正常に動作します。 jmeter負荷テストでは、一部の要求が失敗する

java.util.zip.ZipException: Not in GZIP format (from jmeter) 

この時点で、zuulは空の応答を返しています。

マイプレフィルター:

public class CachePreFilter extends CacheBaseFilter { 

    private static DynamicIntProperty INITIAL_STREAM_BUFFER_SIZE = DynamicPropertyFactory.getInstance().getIntProperty(ZuulConstants.ZUUL_INITIAL_STREAM_BUFFER_SIZE, 8192); 

    @Autowired 
    CounterService counterService; 

    public CachePreFilter(RedisCacheManager redisCacheManager, Properties properties) { 
     super(redisCacheManager, properties); 
    } 

    @Override 
    public Object run() { 
     RequestContext ctx = RequestContext.getCurrentContext(); 

     CachedResponse data = getFromCache(ctx); 
     if (null != data) { 

      counterService.increment("counter.cached"); 

      HttpServletResponse response = ctx.getResponse(); 

      response.addHeader("X-Cache", "HIT"); 

      if (null != data.getContentType()) { 
       response.setContentType(data.getContentType()); 
      } 

      if (null != data.getHeaders()) { 
       for (Entry<String, String> header : data.getHeaders().entrySet()) { 
        if (!response.containsHeader(header.getKey())) { 
         response.addHeader(header.getKey(), header.getValue()); 
        } 
       } 
      } 

      OutputStream outStream = null; 
      try { 
       outStream = response.getOutputStream(); 

       boolean isGzipRequested = ctx.isGzipRequested(); 

       if (null != data.getBody()) { 
        final String requestEncoding = ctx.getRequest().getHeader(ZuulHeaders.ACCEPT_ENCODING); 

        if (requestEncoding != null && HTTPRequestUtils.getInstance().isGzipped(requestEncoding)) { 
         isGzipRequested = true; 
        } 

        ByteArrayOutputStream byteArrayOutputStream = null; 
        ByteArrayInputStream is = null; 
        try { 

         if (isGzipRequested) { 
          byteArrayOutputStream = new ByteArrayOutputStream(); 
          GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream); 
          gzipOutputStream.write(data.getBody().getBytes(StandardCharsets.UTF_8)); 
          gzipOutputStream.flush(); 
          gzipOutputStream.close(); 

          ctx.setResponseGZipped(true); 

          is = new ByteArrayInputStream(byteArrayOutputStream.toByteArray()); 
          logger.debug(String.format("Send gzip content %s", data.getBody())); 

          response.setHeader(ZuulHeaders.CONTENT_ENCODING, "gzip"); 
         } else { 
          logger.debug(String.format("Send content %s", data.getBody())); 

          is = new ByteArrayInputStream(data.getBody().getBytes(StandardCharsets.UTF_8)); 
         } 
         writeResponse(is, outStream); 

        } catch (Exception e) { 
         logger.error("Error at sending response " + e.getMessage(), e); 
         throw new RuntimeException("Failed to send content", e); 
        } finally { 
         if (null != byteArrayOutputStream) { 
          byteArrayOutputStream.close(); 
         } 

         if (null != is) { 
          is.close(); 
         } 
        } 
       } 

       ctx.setSendZuulResponse(false); 
      } catch (IOException e) { 
       logger.error("Cannot read from Stream " + e.getMessage(), e.getMessage()); 
      } finally { 
       // don't close the outputstream 
      } 

      ctx.set(CACHE_HIT, true); 

      return data; 
     } else { 
      counterService.increment("counter.notcached"); 
     } 

     ctx.set(CACHE_HIT, false); 
     return null; 
    } 

    private ThreadLocal<byte[]> buffers = new ThreadLocal<byte[]>() { 
     @Override 
     protected byte[] initialValue() { 
      return new byte[INITIAL_STREAM_BUFFER_SIZE.get()]; 
     } 
    }; 

    private void writeResponse(InputStream zin, OutputStream out) throws Exception { 
     byte[] bytes = buffers.get(); 
     int bytesRead = -1; 
     while ((bytesRead = zin.read(bytes)) != -1) { 
      out.write(bytes, 0, bytesRead); 
     } 
    } 

    @Override 
    public int filterOrder() { 
     return 99; 
    } 

    @Override 
    public String filterType() { 
     return "pre"; 
    } 
} 

私のポストは、私たちは、Redisの(単にローカル変数にそれを保存)することなく、それをテスト

public class CachePostFilter extends CacheBaseFilter { 

    public CachePostFilter(RedisCacheManager redisCacheManager, Properties properties) { 
     super(redisCacheManager, properties); 
    } 

    @Override 
    public boolean shouldFilter() { 
     RequestContext ctx = RequestContext.getCurrentContext(); 
     return super.shouldFilter() && !ctx.getBoolean(CACHE_HIT); 
    } 

    @Override 
    public Object run() { 
     RequestContext ctx = RequestContext.getCurrentContext(); 
     HttpServletRequest req = ctx.getRequest(); 
     HttpServletResponse res = ctx.getResponse(); 

     if (isSuccess(res, ctx.getOriginResponseHeaders())) { 
      // Store only successful responses 

      String cacheKey = cacheKey(req); 
      if (cacheKey != null) { 
       String body = null; 

       if (null != ctx.getResponseBody()) { 
        body = ctx.getResponseBody(); 
       } else if (null != ctx.getResponseDataStream()) { 
        InputStream is = null; 
        try { 
         is = ctx.getResponseDataStream(); 

         final Long len = ctx.getOriginContentLength(); 
         if (len == null || len > 0) { 

          if (ctx.getResponseGZipped()) { 
           is = new GZIPInputStream(is); 
          } 

          StringWriter writer = new StringWriter(); 

          IOUtils.copy(is, writer, "UTF-8"); 
          body = writer.toString(); 

          if (null != body && !body.isEmpty()) { 
           ctx.setResponseDataStream(new ByteArrayInputStream(body.getBytes())); 
           ctx.setResponseGZipped(false); 
           ctx.setOriginContentLength(String.valueOf(body.getBytes().length)); 
          } else { 
           ctx.setResponseBody("{}"); 
          } 
         } 

        } catch (IOException e) { 
         logger.error("Cannot read body " + e.getMessage(), e); 
        } finally { 
         if (null != is) { 
          try { 
           is.close(); 
          } catch (IOException e) { 

          } 
         } 
        } 

        saveToCache(ctx, cacheKey, body); 
       } 
      } 
     } 
     return null; 

    } 

    @Override 
    public int filterOrder() { 
     return 1; 
    } 

    @Override 
    public String filterType() { 
     return "post"; 
    } 

    private boolean isSuccess(HttpServletResponse res, List<Pair<String, String>> originHeaders) { 
     if (res != null && res.getStatus() < 300) { 
      if (null != originHeaders) { 
       for (Pair<String, String> header : originHeaders) { 
        if (header.first().equals("X-CACHEABLE") && header.second().equals("1")) { 
         return true; 
        } 
       } 
      } 
     } 
     return false; 
    } 

を濾過し、これはまだ同じです。キャッシュからの応答を常にログに記録し(gzipの前)、すべてが良好に見えます。

答えて

0

(質問者の代わりに投稿)

ソリューション

私たちは、ポストフィルタをリファクタリングしてzuulの応答にはあまり変わりません。この変更の後、我々は何の問題も、それ以上表示されません。

ワーキングポストフィルタ

public class CachePostFilter extends CacheBaseFilter { 

    public CachePostFilter(RedisCacheManager redisCacheManager, Properties properties) { 
     super(redisCacheManager, properties); 
    } 

    @Override 
    public boolean shouldFilter() { 
     RequestContext ctx = RequestContext.getCurrentContext(); 
     return super.shouldFilter() && !ctx.getBoolean(CACHE_HIT); 
    } 

    @Override 
    public Object run() { 
     RequestContext ctx = RequestContext.getCurrentContext(); 
     HttpServletRequest req = ctx.getRequest(); 
     HttpServletResponse res = ctx.getResponse(); 

     if (isSuccess(res, ctx.getOriginResponseHeaders())) { 
      // Store only successful responses 

      String cacheKey = cacheKey(req); 
      if (cacheKey != null) { 
       String body = null; 

       if (null != ctx.getResponseBody()) { 
        body = ctx.getResponseBody(); 
       } else if (null != ctx.getResponseDataStream()) { 
        InputStream rawInputStream = null; 
        InputStream gzipByteArrayInputStream = null; 
        try { 
         rawInputStream = ctx.getResponseDataStream(); 
         gzipByteArrayInputStream = null; 
         // If origin tell it's GZipped but the content is ZERO 
         // bytes, 
         // don't try to uncompress 
         final Long len = ctx.getOriginContentLength(); 
         if (len == null || len > 0) { 

          byte[] rawData = IOUtils.toByteArray(rawInputStream); 

          ctx.setResponseDataStream(new ByteArrayInputStream(rawData)); 

          if (ctx.getResponseGZipped()) { 
           gzipByteArrayInputStream = new GZIPInputStream(new ByteArrayInputStream(rawData)); 
          } else { 
           gzipByteArrayInputStream = new ByteArrayInputStream(rawData); 
          } 

          StringWriter writer = new StringWriter(); 

          IOUtils.copy(gzipByteArrayInputStream, writer, "UTF-8"); 
          body = writer.toString(); 
         } 

        } catch (IOException e) { 
         logger.error("Cannot read body " + e.getMessage(), e); 
        } finally { 
         if (null != rawInputStream) { 
          try { 
           rawInputStream.close(); 
          } catch (IOException e) { 

          } 
         } 

         if (null != gzipByteArrayInputStream) { 
          try { 
           gzipByteArrayInputStream.close(); 
          } catch (IOException e) { 

          } 
         } 
        } 

        // if we read from the stream, the other filter cannot read 
        // and they dont' deliver any response 
        // ctx.setResponseBody(body); 
        // ctx.setResponseGZipped(false); 

        saveToCache(ctx, cacheKey, body); 
       } 
      } 
     } 
     return null; 

    } 

    @Override 
    public int filterOrder() { 
     return 1; 
    } 

    @Override 
    public String filterType() { 
     return "post"; 
    } 

    private boolean isSuccess(HttpServletResponse res, List<Pair<String, String>> originHeaders) { 
     if (res != null && res.getStatus() == 200) { 
      if (null != originHeaders) { 
       for (Pair<String, String> header : originHeaders) { 
        if (header.first().equals("X-CACHEABLE") && header.second().equals("1")) { 
         return true; 
        } 
       } 
      } 
     } 
     return false; 
    } 
} 
関連する問題