2017-08-29 19 views
1

私はラクダのカフカコンポーネントを使用していますが、オフセットを適用してフードの下で何が起こっているのかはっきりしていません。下に見られるように、私はレコードを集約しています。私は、レコードがSFTPに保存された後にオフセットをコミットすることが理にかなっていると考えています。camel-kafkaでオフセットコミットを手動で制御する方法は?

コミットをいつ実行できるか手動で制御することはできますか?

private static class MyRouteBuilder extends RouteBuilder { 

    @Override 
    public void configure() throws Exception { 

     from("kafka:{{mh.topic}}?" + getKafkaConfigString()) 
     .unmarshal().string() 
     .aggregate(constant(true), new MyAggregationStrategy()) 
      .completionSize(1000) 
      .completionTimeout(1000) 
     .setHeader("CamelFileName").constant("transactions-" + (new Date()).getTime()) 
     .to("sftp://" + getSftpConfigString()) 

     // how to commit offset only after saving messages to SFTP? 

     ; 
    } 

    private final class MyAggregationStrategy implements AggregationStrategy { 
     @Override 
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 
      if (oldExchange == null) { 
       return newExchange; 
      } 
      String oldBody = oldExchange.getIn().getBody(String.class); 
      String newBody = newExchange.getIn().getBody(String.class); 
      String body = oldBody + newBody; 
      oldExchange.getIn().setBody(body); 
      return oldExchange; 
     } 
    } 
} 

private static String getSftpConfigString() { 
     return "{{sftp.hostname}}/{{sftp.dir}}?" 
       + "username={{sftp.username}}" 
       + "&password={{sftp.password}}" 
       + "&tempPrefix=.temp." 
       + "&fileExist=Append" 
       ; 
} 

private static String getKafkaConfigString() { 
     return "brokers={{mh.brokers}}" 
      + "&saslMechanism={{mh.saslMechanism}}" 
      + "&securityProtocol={{mh.securityProtocol}}" 
      + "&sslProtocol={{mh.sslProtocol}}" 
      + "&sslEnabledProtocols={{mh.sslEnabledProtocols}}" 
      + "&sslEndpointAlgorithm={{mh.sslEndpointAlgorithm}}" 
      + "&saslJaasConfig={{mh.saslJaasConfig}}" 
      + "&groupId={{mh.groupId}}" 
      ; 
} 

答えて

2

ありませんあなたがすることはできません。 Kafkaは、X秒ごとにバックグラウンドで自動コミットを実行します(これを設定できます)。

camel-kafkaには手動コミットのサポートはありません。また、アグリゲータがカフカ消費者から分離され、コミットを実行するコンシューマが分離されているため、これは不可能です。

関連する問題